Reactor Netty: UDP DNS client example

Rmag Breaking News

This example is to demonstrate

send and receive DNS UDP packets
add netty’s built-in ChannelHandler
using Reactor Netty’s interface to build send and receive action
terminate connection

Code of netty is here and using following library

netty 4.1.107.Final
Project Reactor 3.6.3
Reactor Netty 1.1.16

Code

package example;

import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.dns.*;
import io.netty.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;

import java.net.InetSocketAddress;

public class DnsUdpClient {

private static final String SERVER_HOST = “8.8.8.8”;
private static final int SERVER_PORT = 53;
private static final Logger LOGGER = LoggerFactory.getLogger(DnsUdpClient.class);

// (1)
private static void handleQueryResp(DatagramDnsResponse msg) {
if (msg.count(DnsSection.QUESTION) > 0) {
DnsQuestion question = msg.recordAt(DnsSection.QUESTION, 0);
LOGGER.info(“name: {}”, question.name());
}
for (int i = 0, count = msg.count(DnsSection.ANSWER); i < count; i++) {
DnsRecord r = msg.recordAt(DnsSection.ANSWER, i);
if (r.type() == DnsRecordType.A) {
//just print the IP after query
DnsRawRecord raw = (DnsRawRecord) r;
LOGGER.info(“{}”, NetUtil.bytesToIpAddress(ByteBufUtil.getBytes(raw.content())));
}
}
}

public static void main(String[] args) {
UdpClient client = UdpClient.create()
.host(SERVER_HOST).port(SERVER_PORT) // (2)
.wiretap(true) // (3)
/*
// (4)
.doOnChannelInit((observer, channel, remoteAddress) -> {
Connection c = Connection.from(channel);
c.addHandlerLast(new DatagramDnsQueryEncoder());
c.addHandlerLast(new DatagramDnsResponseDecoder());
LOGGER.info(“pipeline={}”, channel.pipeline());
});
*/

// (5)
.doOnConnected(c -> {
c.addHandlerLast(new DatagramDnsQueryEncoder());
c.addHandlerLast(new DatagramDnsResponseDecoder());
LOGGER.info(“pipeline={}”, c.channel().pipeline());
});

Connection conn = client.connectNow(); // (6)

// (7)
conn.inbound().receiveObject()
.doOnNext(obj -> {
DatagramDnsResponse r = (DatagramDnsResponse) obj;
LOGGER.info(“response={}”, obj);
handleQueryResp(r);
})
.doOnError(err -> LOGGER.error(String.valueOf(err)))
.subscribe();

// (8)
conn.outbound()
.sendObject(new DatagramDnsQuery(null, new InetSocketAddress(SERVER_HOST, SERVER_PORT), 0x42b7)
.setRecord(
DnsSection.QUESTION,
new DefaultDnsQuestion(“www.google.com”, DnsRecordType.A)
))
.then().subscribe();

conn.outbound()
.sendObject(new DatagramDnsQuery(null, new InetSocketAddress(SERVER_HOST, SERVER_PORT), 0x42b8)
.setRecord(
DnsSection.QUESTION,
new DefaultDnsQuestion(“projectreactor.io”, DnsRecordType.A)
))
.then().subscribe();

// (9)
conn.onReadIdle(5000, () -> {
LOGGER.error(“Request time out”);
conn.disposeNow();
});

// (10)
conn.onDispose().block();
}

}

Explanation

(1): for decoding DNS reply

(2): destination IP and port number is necessary when create netty’s io.netty.channel.Channel

(3): a debug ChannelHandler which prints all events related to Channel in DEBUG level will be added into the start of Channel’s pipeline

(4): doOnChannelInit() accepts reactor.netty.ChannelPipelineConfigurer, for configuring the channel pipeline while initializing the channel.

This ChannelPipelineConfigurer is called by reactor.netty.transport.TransportConfig.TransportChannelInitializer#initChannel

@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();

if (config.metricsRecorder != null) {
//…
}

if (config.loggingHandler != null) {
pipeline.addFirst(NettyPipeline.LoggingHandler, config.loggingHandler);
}

ChannelOperations.addReactiveBridge(channel, config.channelOperationsProvider(), connectionObserver);

config.defaultOnChannelInit()
.then(config.doOnChannelInit)
.onChannelInit(connectionObserver, channel, remoteAddress); // <- HERE

pipeline.remove(this);

if (log.isDebugEnabled()) {
log.debug(format(channel, “Initialized pipeline {}”), pipeline.toString());
}
}

To obtain reactor.netty.Connection instance from Netty’s Channel instance, the following static method can be used.

Connection c = Connection.from(channel);

At the beginning of function executing, pipeline contains

pipeline=DefaultChannelPipeline{(reactor.left.loggingHandler = reactor.netty.transport.logging.ReactorNettyLoggingHandler), (TransportConfig$TransportChannelInitializer#0 = reactor.netty.transport.TransportConfig$TransportChannelInitializer), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}

At the end of function executing, pipeline contains

pipeline=DefaultChannelPipeline{(reactor.left.loggingHandler = reactor.netty.transport.logging.ReactorNettyLoggingHandler), (TransportConfig$TransportChannelInitializer#0 = reactor.netty.transport.TransportConfig$TransportChannelInitializer), (DatagramDnsQueryEncoder = io.netty.handler.codec.dns.DatagramDnsQueryEncoder), (DatagramDnsResponseDecoder = io.netty.handler.codec.dns.DatagramDnsResponseDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}

reactor.netty.channel.ChannelOperationsHandler is the bridge connect netty and Reactor Netty library. It must at the end of pipeline.

DatagramDnsQueryEncoder and DatagramDnsResponseDecoder are netty’s build-in ChannelHandler, which response for encoding/decoding raw ByteBuf from/to DNS instances.

(5): doOnConnected() is another way for configuring the channel pipeline. It do the job exactly the same as (4), but it run on Channel’s channelConnected event.

(6): block, perform actual establish connection (but there is no need for establish connection of UDP traffic), and return when connection is setup.

(7): define the action on packet recevied. inbound().receiveObject() returns Flux. Remember to append subscribe() at the end.

(8): there is 2 lines, one for one request sent. then() will returns a Mono which will have complete signal when packet is sent. Also it must be subscribed.

(9): terminate connection if do not receive packet in 5 seconds

(10): get a Mono which will have complete signal when connection is shutdown successfully. block() is blocking the main thread. Main thread must end at last.

Leave a Reply

Your email address will not be published. Required fields are marked *