Реализация сообщений keep-alive в Netty с использованием WriteTimeoutHandler
Я использую Netty 3.2.7. Я пытаюсь написать функциональность в моем клиенте, чтобы, если никакие сообщения не записываются через определенное количество времени (скажем, 30 секунд), сообщение "keep-alive" отправляется на сервер.
После некоторого копания я обнаружил, что WriteTimeoutHandler должен позволить мне сделать это. Я нашел это объяснение здесь: https://issues.jboss.org/browse/NETTY-79.
Пример, приведенный в документации Netty:
public ChannelPipeline getPipeline() {
// An example configuration that implements 30-second write timeout:
return Channels.pipeline(
new WriteTimeoutHandler(timer, 30), // timer must be shared.
new MyHandler());
}
В моем тестовом клиенте я сделал именно это. В MyHandler я также переопределил метод exceptionCaught():
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
if (e.getCause() instanceof WriteTimeoutException) {
log.info("Client sending keep alive!");
ChannelBuffer keepAlive = ChannelBuffers.buffer(KEEP_ALIVE_MSG_STR.length());
keepAlive.writeBytes(KEEP_ALIVE_MSG_STR.getBytes());
Channels.write(ctx, Channels.future(e.getChannel()), keepAlive);
}
}
Независимо от того, какая длительность клиент ничего не пишет в канал, метод exceptionCaught(), который я переопределял, никогда не вызывается.
Глядя на источник WriteTimeoutHandler, его реализация writeRequested():
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
long timeoutMillis = getTimeoutMillis(e);
if (timeoutMillis > 0) {
// Set timeout only when getTimeoutMillis() returns a positive value.
ChannelFuture future = e.getFuture();
final Timeout timeout = timer.newTimeout(
new WriteTimeoutTask(ctx, future),
timeoutMillis, TimeUnit.MILLISECONDS);
future.addListener(new TimeoutCanceller(timeout));
}
super.writeRequested(ctx, e);
}
Здесь, кажется, в этой реализации говорится: "Когда запрашивается запись, сделайте новый тайм-аут. Когда запись завершится успешно, отмените таймаут".
Используя отладчик, похоже, что это то, что происходит. Как только запись завершается, таймаут отменяется. Это не то поведение, которое я хочу. Поведение, которое я хочу, это: "Если клиент не написал никакой информации на канал в течение 30 секунд, бросьте WriteTimeoutException."
Итак, разве это не то, что для WriteTimeoutHandler? Вот как я интерпретировал это из того, что я читал в Интернете, но реализация, похоже, не работает таким образом. Я использую это неправильно? Должен ли я использовать что-то еще? В нашей версии Mina того же клиента, который я пытаюсь переписать, я вижу, что метод sessionIdle() переопределяется для достижения желаемого поведения, но этот метод недоступен в Netty.
Ответы
Ответ 1
Я бы предложил добавить IdleStateHandler, а затем добавить пользовательскую реализацию IdleStateAwareUpstreamHandler, который может реагировать на состояние бездействия. Это очень хорошо работает для меня во многих проектах.
В javadocs перечислены следующие примеры, которые можно использовать в качестве основы для вашей реализации:
public class MyPipelineFactory implements ChannelPipelineFactory {
private final Timer timer;
private final ChannelHandler idleStateHandler;
public MyPipelineFactory(Timer timer) {
this.timer = timer;
this.idleStateHandler = new IdleStateHandler(timer, 60, 30, 0);
// timer must be shared.
}
public ChannelPipeline getPipeline() {
return Channels.pipeline(
idleStateHandler,
new MyHandler());
}
}
// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends IdleStateAwareChannelHandler {
@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
if (e.getState() == IdleState.READER_IDLE) {
e.getChannel().close();
} else if (e.getState() == IdleState.WRITER_IDLE) {
e.getChannel().write(new PingMessage());
}
}
}
ServerBootstrap bootstrap = ...;
Timer timer = new HashedWheelTimer();
...
bootstrap.setPipelineFactory(new MyPipelineFactory(timer));
...
Ответ 2
Для Netty 4.0 и новее вы должны расширить ChannelDuplexHandler, как в примере из документации IdleStateHandler:
// An example that sends a ping message when there is no outbound traffic
// for 30 seconds. The connection is closed when there is no inbound traffic
// for 60 seconds.
public class MyChannelInitializer extends ChannelInitializer<Channel> {
@Override
public void initChannel(Channel channel) {
channel.pipeline().addLast("idleStateHandler", new IdleStateHandler(60, 30, 0));
channel.pipeline().addLast("myHandler", new MyHandler());
}
}
// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingMessage());
}
}
}
}