设计思路
- 连接中断处理:
- 使用
SelectionKey
监听OP_READ
和OP_WRITE
事件时,若发生连接中断,SelectionKey
会失效。在Selector
的循环中,检查SelectionKey
的有效性,若无效则从Selector
中移除相关的Channel
,并关闭相关资源。
- 同时,可以考虑实现重连机制,在一定时间间隔后尝试重新连接服务器。
- 读写超时处理:
- 对于读超时,设置
SocketChannel
的SO_TIMEOUT
属性,当读操作超过设定时间时,会抛出SocketTimeoutException
。在捕获该异常后,可以关闭当前Channel
,并记录日志。同时,可根据业务需求决定是否进行重连。
- 对于写超时,同样设置
SocketChannel
的SO_SNDTIMEO
属性,处理方式与读超时类似,捕获SocketTimeoutException
,关闭Channel
并记录日志,视情况重连。
- 通用异常处理:
- 在
Selector
的循环中,捕获IOException
等通用异常,对Channel
进行安全关闭,并记录详细的异常信息,以便定位问题。
关键代码实现
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class HighAvailableNIOServer {
private static final int PORT = 8080;
private Selector selector;
public HighAvailableNIOServer() throws IOException {
selector = Selector.open();
SocketChannel serverChannel = SocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.connect(new InetSocketAddress("localhost", PORT));
serverChannel.register(selector, SelectionKey.OP_CONNECT);
}
public void start() {
try {
while (true) {
int readyChannels = selector.select();
if (readyChannels == 0) continue;
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isConnectable()) {
connect(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void connect(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
}
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
private void read(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
int bytesRead = channel.read(buffer);
if (bytesRead == -1) {
// 连接中断
System.out.println("Connection closed by client.");
key.cancel();
channel.close();
} else {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println("Received: " + new String(data));
}
} catch (IOException e) {
if (e instanceof java.net.SocketTimeoutException) {
// 读超时
System.out.println("Read timeout occurred. Closing channel.");
key.cancel();
try {
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
} else {
// 其他异常
System.out.println("Error reading from channel: " + e.getMessage());
key.cancel();
try {
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
private void write(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.wrap("Response from server".getBytes());
try {
channel.write(buffer);
} catch (IOException e) {
if (e instanceof java.net.SocketTimeoutException) {
// 写超时
System.out.println("Write timeout occurred. Closing channel.");
key.cancel();
try {
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
} else {
// 其他异常
System.out.println("Error writing to channel: " + e.getMessage());
key.cancel();
try {
channel.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
public static void main(String[] args) {
try {
HighAvailableNIOServer server = new HighAvailableNIOServer();
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}