消息队列管理
- 使用无界队列:在Netty的
ChannelPipeline
中,可以选择使用无界队列(如LinkedBlockingQueue
)来暂存消息。这样在停机过程中,新接收到的消息能继续进入队列等待处理,而不会因为队列已满而被丢弃。例如在自定义的ChannelInboundHandler
中,声明一个无界队列成员变量:
private final BlockingQueue<Object> messageQueue = new LinkedBlockingQueue<>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
messageQueue.put(msg);
}
- 自定义消息处理逻辑:在停机时,启动一个独立的线程来处理队列中的剩余消息。这个线程持续从队列中取出消息并处理,直到队列为空。
private final Thread queueProcessor = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Object msg = messageQueue.take();
// 处理消息逻辑
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
- 消息持久化:对于重要的消息,可以在进入队列时将其持久化到磁盘(如使用文件系统或数据库)。在停机后重启时,从持久化存储中恢复消息并重新处理。例如使用
FileOutputStream
将消息写入文件:
private void persistMessage(Object msg) {
try (FileOutputStream fos = new FileOutputStream("message.log", true)) {
ObjectOutputStream oos = new ObjectOutputStream(fos);
oos.writeObject(msg);
} catch (IOException e) {
e.printStackTrace();
}
}
连接关闭时机
- 设置关闭策略:使用
ChannelFutureListener
来监听连接关闭的结果。在调用channel.close()
方法后,添加监听器以确保连接完全关闭。
ChannelFuture future = channel.close();
future.addListener(ChannelFutureListener.CLOSE);
- 等待消息处理完成:在关闭连接之前,先等待消息队列中的消息处理完毕。可以通过
CountDownLatch
来实现线程间的同步。例如:
private final CountDownLatch latch = new CountDownLatch(1);
// 在消息处理完成逻辑中调用
latch.countDown();
// 在关闭连接前等待
try {
latch.await();
channel.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
- 优雅关闭的顺序:首先停止接收新的连接,然后处理完当前已接收的所有消息,最后关闭所有连接。可以通过
ServerBootstrap
的group
来实现。例如:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
...
ChannelFuture f = b.bind(port).sync();
// 停止接收新连接
bossGroup.shutdownGracefully();
// 处理完现有消息
workerGroup.shutdownGracefully();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}