Netty 心跳消息

2021-08-30 10:26:07

我需要知道你是否还活着,只好在发现有一段时间没有你消息之后主动询问你,因为你活在我的内存里。


TCP 本身有心跳机制,但是时间太久了,对于一个需要维持长连接的服务器来说太久了,特别是对于推送服务器要实现更精细化的心跳机制。

Netty 提供了一个空闲检测工具类 IdleStateHandler,在空闲一段时间没有收发消息后可以触发事件,在触发的调用函数里可以任意施为,比如发送心跳消息,或者断开。另外 ReadTimeouthandlerWriteTimeoutHandler 在发现没有消息后会直接断开连接,这并不是我们需要的,一般没有消息收发的未必是消息断开了,我们还要通过发消息确认,维持一个连接的活性。

挂载

首先在初始化器中挂载上空闲检测器

//heartbeat ,这里每 10 秒没消息就会触发
p.addLast(new IdleStateHandler(0, 0, 10, TimeUnit.SECONDS)); 

处理器

挂载成功后,在实现了 userEventTriggered 接口的处理器中会得到触发。在该函数中检测到事件类型和空闲类型,满足需要后可以根据需要发送心跳消息。

public class HeartbeatHandler extends ChannelDuplexHandler {
private final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // TODO Auto-generated method stub
        //logger.warn("userEventTriggered channel {}",ctx.channel().id());

        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            logger.warn("userEventTriggered {} {}",ctx.channel().id(),e.state());

            Map<String, Object> pingMap = new Hashtable<String, Object>();
            pingMap.put("func", "ping");
            ctx.writeAndFlush(pingMap).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);  
           
        }
        else{
            logger.error("userEventTriggered {} {}",ctx,evt);
            super.userEventTriggered(ctx, evt);
        }
		
    }
}

在这里将所有空闲消息都直接送了一个确认消息,在发送时如果发现连接已经断开会自动触发中断消息,在中断处理函数中可以将这个节点做下线处理。

具体的逻辑可以根据业务灵活的定制。

Copyright tg-blog 京ICP备15066502号-2