整体架构设计
- 文件服务器:负责存储所有文件的主副本,提供文件的读写接口。
- 节点:每个节点都有自己的本地文件存储,需要与文件服务器进行文件同步。
- 同步管理器:在每个节点上运行,负责协调本地文件与文件服务器之间的同步操作。
同步策略
- 增量同步:计算本地文件和服务器文件的差异,只同步有变化的部分。这可以通过比较文件的修改时间、文件大小或者使用哈希值来实现。
- 全量同步:将本地文件全部删除,然后从服务器重新下载所有文件。适用于首次同步或者节点数据严重不一致的情况。
处理异常情况
- 节点故障:当某个节点发生故障时,文件服务器和其他节点可以通过心跳机制检测到。在故障节点恢复后,可以进行一次全量同步,确保数据的一致性。
- 网络中断:同步管理器可以记录同步进度,当网络恢复后,从上次中断的地方继续进行同步。
关键代码片段
- 计算文件哈希值(用于增量同步)
import java.io.File;
import java.io.FileInputStream;
import java.security.MessageDigest;
public class FileHashUtil {
public static String calculateHash(File file) throws Exception {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
try (FileInputStream fis = new FileInputStream(file)) {
byte[] buffer = new byte[1024];
int length;
while ((length = fis.read(buffer)) != -1) {
digest.update(buffer, 0, length);
}
}
byte[] hashBytes = digest.digest();
StringBuilder sb = new StringBuilder();
for (byte b : hashBytes) {
sb.append(String.format("%02x", b));
}
return sb.toString();
}
}
- 同步文件方法
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
public class FileSynchronizer {
private static final String SERVER_ADDRESS = "127.0.0.1";
private static final int SERVER_PORT = 12345;
public static void synchronizeFile(File localFile, String serverFilePath) {
try (Socket socket = new Socket(SERVER_ADDRESS, SERVER_PORT)) {
OutputStream outputStream = socket.getOutputStream();
outputStream.write(("SYNC " + serverFilePath).getBytes());
outputStream.write('\n');
if (localFile.exists()) {
String localHash = FileHashUtil.calculateHash(localFile);
outputStream.write(localHash.getBytes());
outputStream.write('\n');
outputStream.flush();
InputStream inputStream = socket.getInputStream();
byte[] buffer = new byte[1024];
int length;
File tempFile = File.createTempFile("sync", null);
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
while ((length = inputStream.read(buffer)) != -1) {
fileOutputStream.write(buffer, 0, length);
}
}
String serverHash = FileHashUtil.calculateHash(tempFile);
if (!localHash.equals(serverHash)) {
try (FileOutputStream finalOutputStream = new FileOutputStream(localFile)) {
try (FileInputStream tempInputStream = new FileInputStream(tempFile)) {
while ((length = tempInputStream.read(buffer)) != -1) {
finalOutputStream.write(buffer, 0, length);
}
}
}
}
tempFile.delete();
} else {
outputStream.write("NONE".getBytes());
outputStream.write('\n');
outputStream.flush();
InputStream inputStream = socket.getInputStream();
try (FileOutputStream fileOutputStream = new FileOutputStream(localFile)) {
byte[] buffer = new byte[1024];
int length;
while ((length = inputStream.read(buffer)) != -1) {
fileOutputStream.write(buffer, 0, length);
}
}
}
} catch (IOException | Exception e) {
e.printStackTrace();
}
}
}
设计原理
- 文件哈希计算:通过计算文件的哈希值,能够准确判断文件内容是否发生变化,从而实现增量同步。
- 网络通信:使用
Socket
进行节点与服务器之间的通信,发送同步请求和文件数据。
- 临时文件:在同步过程中使用临时文件存储从服务器接收的数据,方便比较哈希值,确保数据完整无误后再替换本地文件。