面试题答案
一键面试设计思路
- 连接管理:
- 使用
AsynchronousSocketChannel
创建异步连接。当连接中断时,启动重连机制。可以设置一个重试次数和重试间隔,通过定时任务(如ScheduledExecutorService
)来尝试重新连接。 - 维护一个连接状态标志,用于判断当前连接是否正常。
- 使用
- 数据传输:
- 为保证数据可靠传输,为每个发送的数据块添加序列号。接收端根据序列号来判断数据是否有序,若发现数据丢失,发送重传请求。
- 发送端维护一个已发送但未确认的数据块列表,当收到重传请求或超时未收到确认时,重传相应的数据块。
- 丢包处理:
- 引入超时机制,对于发送的数据块,设置一个超时时间。若超时未收到确认,判定为丢包,进行重传。
- 接收端定期检查接收数据的序列号连续性,发现不连续则向发送端请求重传丢失的数据块。
关键部分代码示例(Java)
- 连接建立与重连
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ReliableSocket {
private static final String HOST = "127.0.0.1";
private static final int PORT = 8080;
private static final int MAX_RETRY = 5;
private static final int RETRY_INTERVAL = 5; // seconds
private AsynchronousSocketChannel socketChannel;
private boolean isConnected = false;
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public void connect() {
int retryCount = 0;
while (!isConnected && retryCount < MAX_RETRY) {
try {
socketChannel = AsynchronousSocketChannel.open();
Future<Void> future = socketChannel.connect(new InetSocketAddress(HOST, PORT));
future.get();
isConnected = true;
System.out.println("Connected to server.");
} catch (InterruptedException | ExecutionException e) {
retryCount++;
System.out.println("Connection failed, retry attempt " + retryCount);
try {
Thread.sleep(RETRY_INTERVAL * 1000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
} catch (Exception e) {
e.printStackTrace();
}
}
if (!isConnected) {
System.out.println("Failed to connect after multiple attempts.");
}
}
public void handleDisconnect() {
isConnected = false;
System.out.println("Connection lost.");
scheduledExecutorService.schedule(() -> connect(), RETRY_INTERVAL, TimeUnit.SECONDS);
}
}
- 数据发送与重传
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DataSender {
private static final int TIMEOUT = 3; // seconds
private final AsynchronousSocketChannel socketChannel;
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
private final Map<Integer, ByteBuffer> unacknowledgedData = new HashMap<>();
public DataSender(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void sendData(ByteBuffer data, int sequenceNumber) {
unacknowledgedData.put(sequenceNumber, data.duplicate());
Future<Integer> future = socketChannel.write(data);
try {
future.get();
scheduleAckCheck(sequenceNumber);
} catch (Exception e) {
e.printStackTrace();
}
}
private void scheduleAckCheck(int sequenceNumber) {
scheduledExecutorService.schedule(() -> {
if (unacknowledgedData.containsKey(sequenceNumber)) {
System.out.println("Timeout for sequence number " + sequenceNumber + ", retransmitting.");
ByteBuffer data = unacknowledgedData.get(sequenceNumber);
data.rewind();
sendData(data, sequenceNumber);
}
}, TIMEOUT, TimeUnit.SECONDS);
}
public void receiveAck(int sequenceNumber) {
unacknowledgedData.remove(sequenceNumber);
System.out.println("Received ACK for sequence number " + sequenceNumber);
}
}
- 数据接收与重传请求
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class DataReceiver {
private static final int BUFFER_SIZE = 1024;
private final AsynchronousSocketChannel socketChannel;
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
private int expectedSequenceNumber = 0;
public DataReceiver(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void receiveData() {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
Future<Integer> future = socketChannel.read(buffer);
try {
int bytesRead = future.get();
if (bytesRead > 0) {
buffer.flip();
int sequenceNumber = buffer.getInt();
if (sequenceNumber == expectedSequenceNumber) {
// Process data
System.out.println("Received data with sequence number " + sequenceNumber);
expectedSequenceNumber++;
// Send ACK
sendAck(sequenceNumber);
} else {
// Request retransmission
System.out.println("Missing data, expected sequence number " + expectedSequenceNumber + ", received " + sequenceNumber);
requestRetransmission(expectedSequenceNumber);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void sendAck(int sequenceNumber) {
ByteBuffer ackBuffer = ByteBuffer.wrap(("ACK " + sequenceNumber).getBytes());
Future<Integer> future = socketChannel.write(ackBuffer);
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
private void requestRetransmission(int sequenceNumber) {
ByteBuffer requestBuffer = ByteBuffer.wrap(("RETRANSMIT " + sequenceNumber).getBytes());
Future<Integer> future = socketChannel.write(requestBuffer);
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
上述代码实现了在不稳定网络场景下使用AsynchronousSocketChannel
进行可靠数据传输的核心功能,包括连接建立与重连、数据发送与重传以及数据接收与重传请求等部分。实际应用中还需根据具体需求进行完善和优化。