任务重试机制
- 实现原理:当任务执行失败时,按照一定的策略进行重试。可以设置最大重试次数、重试间隔时间等参数,避免无限重试。
- 关键代码片段:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class TaskRetry {
public static CompletableFuture<String> supplyAsyncWithRetry(Supplier<String> supplier, int maxRetries, long retryInterval) {
CompletableFuture<String> future = new CompletableFuture<>();
retry(supplier, future, maxRetries, retryInterval, 0);
return future;
}
private static void retry(Supplier<String> supplier, CompletableFuture<String> future, int maxRetries, long retryInterval, int currentRetry) {
CompletableFuture.supplyAsync(supplier)
.thenAccept(future::complete)
.exceptionally(ex -> {
if (currentRetry < maxRetries) {
try {
TimeUnit.MILLISECONDS.sleep(retryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
retry(supplier, future, maxRetries, retryInterval, currentRetry + 1);
} else {
future.completeExceptionally(ex);
}
return null;
});
}
}
负载均衡
- 实现原理:在多个节点间分配任务,避免单个节点负载过高。可以采用随机、轮询、加权轮询等算法。
- 关键代码片段(以轮询为例):
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class LoadBalancer {
private static List<Supplier<String>> nodeSuppliers = new ArrayList<>();
private static int currentIndex = 0;
public static CompletableFuture<String> supplyAsyncWithLoadBalancer() {
Supplier<String> supplier = nodeSuppliers.get(currentIndex);
currentIndex = (currentIndex + 1) % nodeSuppliers.size();
return CompletableFuture.supplyAsync(supplier);
}
}
与分布式缓存的结合使用
- 实现原理:对于一些频繁请求且结果不经常变化的任务,先从分布式缓存中获取结果。如果缓存中不存在,则执行任务,将结果存入缓存。
- 关键代码片段(以使用Redis为例,需引入Jedis依赖):
import java.util.concurrent.CompletableFuture;
import redis.clients.jedis.Jedis;
public class CacheIntegration {
private static final String CACHE_KEY_PREFIX = "task_result_";
private Jedis jedis;
public CacheIntegration() {
jedis = new Jedis("localhost", 6379);
}
public CompletableFuture<String> supplyAsyncWithCache(Supplier<String> supplier, String taskId) {
String cacheKey = CACHE_KEY_PREFIX + taskId;
String cachedResult = jedis.get(cacheKey);
if (cachedResult != null) {
return CompletableFuture.completedFuture(cachedResult);
} else {
return CompletableFuture.supplyAsync(supplier)
.thenApply(result -> {
jedis.set(cacheKey, result);
return result;
});
}
}
}