连接创建
- 初始化连接数量:在连接池启动时,创建一定数量的初始连接,以避免在高并发请求时频繁创建连接带来的性能开销。例如,通过配置文件设定初始连接数为
initialConnections
。
for (int i = 0; i < initialConnections; i++) {
Socket socket = new Socket(serverAddress, serverPort);
connections.add(socket);
}
- 动态扩展:当连接池中的连接耗尽时,能够动态创建新的连接。设置最大连接数
maxConnections
,防止无限制创建连接导致资源耗尽。
if (connections.size() < maxConnections) {
Socket socket = new Socket(serverAddress, serverPort);
connections.add(socket);
}
连接回收
- 使用完归还:客户端使用完连接后,需要将连接归还给连接池。在连接对象上提供一个
close
方法,实际实现中不是真正关闭连接,而是将其标记为可用并放回连接池。
public void close() {
isInUse = false;
connectionPool.returnConnection(this);
}
- 空闲连接清理:定期检查连接池中的空闲连接,对于长时间未使用的连接,关闭并从连接池中移除,以释放资源。可以使用
ScheduledExecutorService
定时任务来实现。
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
long currentTime = System.currentTimeMillis();
connections.stream()
.filter(conn ->!conn.isInUse() && (currentTime - conn.getLastUseTime() > idleTimeout))
.forEach(conn -> {
try {
conn.close();
connections.remove(conn);
} catch (IOException e) {
e.printStackTrace();
}
});
}, 0, idleCheckInterval, TimeUnit.MILLISECONDS);
连接分配
- 获取连接逻辑:客户端从连接池获取连接时,连接池需要返回一个可用的连接。如果没有可用连接,根据配置决定是等待还是立即返回错误。
public Socket getConnection() throws InterruptedException {
while (true) {
synchronized (connections) {
for (Socket socket : connections) {
if (!socket.isInUse()) {
socket.setInUse(true);
socket.setLastUseTime(System.currentTimeMillis());
return socket;
}
}
if (connections.size() < maxConnections) {
// 创建新连接
Socket socket = new Socket(serverAddress, serverPort);
socket.setInUse(true);
socket.setLastUseTime(System.currentTimeMillis());
connections.add(socket);
return socket;
}
}
if (!waitForConnection) {
throw new NoAvailableConnectionException();
}
Thread.sleep(100);
}
}
处理连接超时
- 连接创建超时:在创建连接时设置连接超时时间,避免长时间等待无响应的服务器。
Socket socket = new Socket();
socket.connect(new InetSocketAddress(serverAddress, serverPort), connectionTimeout);
- 获取连接超时:当客户端从连接池获取连接时,如果设置了等待连接的超时时间,在超时后抛出异常。
public Socket getConnection(long timeout) throws InterruptedException {
long endTime = System.currentTimeMillis() + timeout;
while (true) {
synchronized (connections) {
for (Socket socket : connections) {
if (!socket.isInUse()) {
socket.setInUse(true);
socket.setLastUseTime(System.currentTimeMillis());
return socket;
}
}
if (connections.size() < maxConnections) {
// 创建新连接
Socket socket = new Socket(serverAddress, serverPort);
socket.setInUse(true);
socket.setLastUseTime(System.currentTimeMillis());
connections.add(socket);
return socket;
}
}
long remainingTime = endTime - System.currentTimeMillis();
if (remainingTime <= 0) {
throw new ConnectionTimeoutException();
}
Thread.sleep(100);
}
}