MST

星途 面试题库

面试题:Java CompletableFuture allOf在高并发场景下的深入应用

在一个高并发的电商系统中,有如下操作:查询商品库存、查询商品价格、查询商品促销信息,这三个操作需要并行执行,然后根据结果计算商品最终可售价格。假设每秒有数千个这样的请求,为了保证系统的稳定性和性能,在使用CompletableFuture的allOf方法实现时,需要考虑资源限制(如数据库连接数、线程池大小等)、分布式缓存一致性以及如何优雅地处理瞬时的高负载。请详细阐述你的设计方案及实现思路,并对关键代码进行解释。
39.8万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

设计方案及实现思路

  1. 资源限制
    • 数据库连接数:使用连接池(如HikariCP)来管理数据库连接。配置合适的最大连接数、最小空闲连接数等参数,避免过多的数据库连接耗尽资源。例如,根据系统的硬件资源和预估的并发请求数,设置最大连接数为100,确保每个数据库操作都从连接池中获取连接,操作完成后及时归还。
    • 线程池大小:创建一个自定义的线程池来执行CompletableFuture的异步任务。线程池的大小需要根据系统的CPU核心数、I/O操作特性等进行合理配置。例如,对于I/O密集型任务,可以设置线程池大小为CPU核心数的2 - 3倍。使用ThreadPoolExecutor来创建线程池,并设置合适的拒绝策略(如ThreadPoolExecutor.CallerRunsPolicy,当线程池饱和时,让调用者线程执行任务,避免任务丢失)。
  2. 分布式缓存一致性
    • 对于商品库存、价格和促销信息,使用分布式缓存(如Redis)。为了保证缓存一致性,采用如下策略:
      • 读写策略:在写操作时,先更新数据库,再删除缓存,这样可以保证下次读取时从数据库加载最新数据并更新缓存。在高并发场景下,可能会出现缓存击穿、缓存穿透和缓存雪崩问题。
      • 缓存击穿:针对热点数据过期时可能出现的高并发访问穿透到数据库的问题,可以使用互斥锁(如Redis的SETNX命令实现分布式锁),在缓存过期时,只有一个线程能获取锁去加载数据并更新缓存,其他线程等待,避免大量请求同时访问数据库。
      • 缓存穿透:对于查询不存在的数据穿透到数据库的问题,可以在缓存中设置一个空值(并设置较短的过期时间),避免每次都查询数据库。也可以使用布隆过滤器,提前判断数据是否存在,减少无效查询。
      • 缓存雪崩:为了防止大量缓存同时过期导致数据库压力骤增,可以为缓存设置随机过期时间,避免集中过期。
  3. 处理瞬时高负载
    • 限流:使用令牌桶算法或漏桶算法对请求进行限流。例如,使用Guava的RateLimiter实现令牌桶限流,设置每秒允许通过的请求数(如1000个请求/秒),超过限流阈值的请求可以返回友好的提示信息,如“系统繁忙,请稍后重试”。
    • 熔断降级:引入熔断机制(如Hystrix或Sentinel),当某个服务(如查询商品库存服务)出现高延迟或错误率过高时,自动熔断该服务,不再调用实际的数据库查询,而是返回一个默认值或快速失败,避免故障扩散,保证系统的整体可用性。

关键代码解释

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.util.concurrent.RateLimiter;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class ProductPriceCalculator {
    private static final JedisPool jedisPool;
    private static final ExecutorService executorService;
    private static final RateLimiter rateLimiter;
    private static final AtomicInteger requestCount = new AtomicInteger(0);

    static {
        // 初始化Redis连接池
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(100);
        jedisPool = new JedisPool(jedisPoolConfig, "localhost", 6379);

        // 初始化线程池
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        executorService = new ThreadPoolExecutor(
                corePoolSize,
                corePoolSize * 2,
                10L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1000),
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 初始化限流
        rateLimiter = RateLimiter.create(1000.0); // 每秒允许1000个请求
    }

    public static double calculateFinalPrice(int productId) {
        if (!rateLimiter.tryAcquire()) {
            throw new RuntimeException("系统繁忙,请稍后重试");
        }
        requestCount.incrementAndGet();
        try (Jedis jedis = jedisPool.getResource()) {
            // 从缓存获取数据
            String stockCacheKey = "product:stock:" + productId;
            String priceCacheKey = "product:price:" + productId;
            String promotionCacheKey = "product:promotion:" + productId;

            String stockStr = jedis.get(stockCacheKey);
            String priceStr = jedis.get(priceCacheKey);
            String promotionStr = jedis.get(promotionCacheKey);

            CompletableFuture<Integer> stockFuture = CompletableFuture.supplyAsync(() -> {
                if (stockStr != null) {
                    return Integer.parseInt(stockStr);
                } else {
                    // 从数据库查询库存
                    int stock = queryStockFromDB(productId);
                    jedis.setex(stockCacheKey, 3600, String.valueOf(stock)); // 设置缓存1小时过期
                    return stock;
                }
            }, executorService);

            CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> {
                if (priceStr != null) {
                    return Double.parseDouble(priceStr);
                } else {
                    // 从数据库查询价格
                    double price = queryPriceFromDB(productId);
                    jedis.setex(priceCacheKey, 3600, String.valueOf(price)); // 设置缓存1小时过期
                    return price;
                }
            }, executorService);

            CompletableFuture<Double> promotionFuture = CompletableFuture.supplyAsync(() -> {
                if (promotionStr != null) {
                    return Double.parseDouble(promotionStr);
                } else {
                    // 从数据库查询促销信息
                    double promotion = queryPromotionFromDB(productId);
                    jedis.setex(promotionCacheKey, 3600, String.valueOf(promotion)); // 设置缓存1小时过期
                    return promotion;
                }
            }, executorService);

            CompletableFuture<Void> allFutures = CompletableFuture.allOf(stockFuture, priceFuture, promotionFuture);

            return allFutures.thenApplyAsync(unused -> {
                int stock = stockFuture.join();
                double price = priceFuture.join();
                double promotion = promotionFuture.join();
                // 计算最终可售价格
                return calculatePrice(stock, price, promotion);
            }, executorService).join();
        } catch (Exception e) {
            // 处理异常,可进行熔断降级操作
            return -1;
        } finally {
            requestCount.decrementAndGet();
            if (requestCount.get() == 0) {
                executorService.shutdown();
            }
        }
    }

    private static int queryStockFromDB(int productId) {
        // 实际的数据库查询库存逻辑
        return 100;
    }

    private static double queryPriceFromDB(int productId) {
        // 实际的数据库查询价格逻辑
        return 100.0;
    }

    private static double queryPromotionFromDB(int productId) {
        // 实际的数据库查询促销信息逻辑
        return 10.0;
    }

    private static double calculatePrice(int stock, double price, double promotion) {
        // 实际的价格计算逻辑
        return price - promotion;
    }
}
  1. 初始化部分
    • Redis连接池:创建JedisPool并配置最大连接数为100,用于管理Redis连接。
    • 线程池:根据CPU核心数创建ThreadPoolExecutor,核心线程数为CPU核心数的2倍,最大线程数为核心线程数的2倍,设置队列容量为1000,并采用CallerRunsPolicy拒绝策略。
    • 限流:使用RateLimiter创建一个每秒允许1000个请求的令牌桶。
  2. calculateFinalPrice方法
    • 限流:使用rateLimiter.tryAcquire()判断是否允许请求通过,不通过则抛出异常。
    • 缓存查询与异步任务:从Redis缓存中获取商品库存、价格和促销信息。如果缓存中没有,则通过CompletableFuture.supplyAsync异步从数据库查询并更新缓存。每个异步任务都提交到自定义的线程池executorService中执行。
    • 并行执行与结果合并:使用CompletableFuture.allOf等待所有异步任务完成,然后通过thenApplyAsync计算最终可售价格。
    • 异常处理与资源清理:在try - catch块中处理可能的异常,并在finally块中减少请求计数,当请求计数为0时关闭线程池。

通过以上设计方案和代码实现,可以在高并发电商系统中保证系统的稳定性和性能,合理处理资源限制、缓存一致性以及瞬时高负载问题。