mercredi 17 septembre 2014

Multi threaded UDP server with Netty on Linux

This article is only relevant when Netty is embedded in an application that is intended to run on a Linux distribution that supports native transport (e.g. CentOS >= 6.5).

Bootstrapping your UDP server is as simple as follows:

 Bootstrap bootstrap = new Bootstrap()  
      .group(new EpollEventLoopGroup(workerThreads))  
      .channel(EpollDatagramChannel.class)  
      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
      .handler(channelInitializer);

 ChannelFuture future = bootstrap.bind(host, port).await();  
 if(!future.isSuccess())  
      throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());  

However, no matter the value of workerThreads is, only one Channel will be created and therefore only one thread of the EventLoopGroup will be used. It is generally the right architecture when building a UDP server since UDP is connection- less. However, processing incoming datagrams may involve relatively long operations (e.g. accessing a database) and therefore you would like to take advantage of multi-threading in such case.

As of Netty 4.0.23.Final, there is no "Netty" way to do it, that would for example involve an ExecutorService to dispatch processing of incoming datagrams to multiple threads. You could for example add an inbound handler at the beginning of the pipeline that would be responsible of dispatching incoming datagrams but it may get you in some trouble depending on your pipeline.

A possible solution using native transport is to create multiple Channel that listen on the same port (note the enabled SO_REUSEPORT option when the Bootstrap is initialized).

 Bootstrap bootstrap = new Bootstrap()  
      .group(new EpollEventLoopGroup(workerThreads))  
      .channel(EpollDatagramChannel.class)  
      .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)  
      .option(EpollChannelOption.SO_REUSEPORT, true)  
      .handler(channelInitializer);

 ChannelFuture future;  
 for(int i = 0; i < workerThreads; ++i) {  
      future = bootstrap.bind(host, port).await();  
      if(!future.isSuccess())  
           throw new Exception(String.format("Fail to bind on [host = %s , port = %d].", host, port), future.cause());  
 }  

It is also important to note that this solution works well for datagrams received from different remote transport addresses. The SO_REUSEPORT option is implemented by the kernel and will dispatch received datagrams to the created sockets based on the source transport address. Therefore, datagrams received from the same transport address will be dispatched to the same socket and therefore will be processed serially by the same Channel.

Initializing C3P0 pool at startup using Spring

As of C3P0 version 0.9.5-pre8, the pool manager will not be initialized until the first time getConnection() is called on the DataSource.

I wrote an application server that receives and processes commands. Processing the commands generally requires accessing a database, especially for the first received commands. The problem was the application server took time to process the first command because it was the first time the getConnection() method was called, and therefore the sending client triggered a timeout and removed the "unresponsive" application server from its list of available application servers. This is one use case where one would like to initialize as much as possible at application startup.

If you use Spring JdbcTemplate, one option is to force initialization of the exception translator as follows (note the second argument of the JdbcTemplate constructor):

@Repository("myRepository")  
public class MyRepositoryImpl implements MyRepository {
 
     private JdbcTemplate jdbcTemplate;

     @Autowired  
     public void setDataSource(DataSource dataSource) {  
          jdbcTemplate = new JdbcTemplate(dataSource, false);  
     }

}  

Initializing the exception translator requires accessing the database and therefore at some point will call the getConnection() method which will trigger pool manager initialization.

The main idea is just to call getConnection() at startup to trigger pool manager initialization. Therefore, as long as you can manage to get the getConnection() method called at startup, you are done.

If you don't use Spring but use Java 7, an empty try-with-resource block will be enough:

try(Connection conn = dataSource.getConnection()) {}