MST

星途 面试题库

面试题:Redis与MySQL数据同步策略之高级难度题

假设在高并发场景下,Redis缓存失效且MySQL负载较高,如何设计一种数据同步策略,既能保证数据一致性,又能降低MySQL的压力?请详细阐述设计思路和实现步骤。
43.0万 热度难度
数据库Redis

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 缓存预热:系统启动时,预先加载热点数据到Redis缓存中,减少缓存失效的概率。这样在高并发请求时,大部分请求能直接从缓存获取数据,降低对MySQL的压力。
  2. 双写一致性:在数据更新时,同时更新MySQL和Redis。但为防止更新过程中出现异常导致数据不一致,需要采取一些补偿机制。
  3. 缓存失效处理:当缓存失效时,不直接查询MySQL,而是先尝试从本地缓存(如Guava Cache等)获取数据。若本地缓存也没有,则使用分布式锁(如Redisson实现的分布式锁)保证只有一个线程去查询MySQL并更新Redis,其他线程等待,避免大量并发请求穿透到MySQL。
  4. 异步更新:将对MySQL的写操作异步化,通过消息队列(如Kafka)来接收数据更新请求,这样可以平滑高并发写请求,减轻MySQL的压力。同时,消费者从消息队列中读取数据并更新MySQL和Redis,保证数据一致性。

实现步骤

  1. 缓存预热实现
    • 在系统启动时,编写初始化代码。例如,在Spring Boot应用中,可以使用CommandLineRunner接口。
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class CachePreloader implements CommandLineRunner {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @Override
        public void run(String... args) throws Exception {
            // 假设这里从MySQL查询热点数据
            // 实际应用中替换为真实查询逻辑
            Object hotData = getHotDataFromMySQL();
            redisTemplate.opsForValue().set("hotDataKey", hotData);
        }
    
        private Object getHotDataFromMySQL() {
            // 从MySQL查询热点数据逻辑
            return null;
        }
    }
    
  2. 双写一致性实现
    • 在更新数据的业务逻辑中,先更新MySQL,再更新Redis。
    @Service
    public class DataService {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        public void updateData(String id, Object newData) {
            // 更新MySQL
            String updateSql = "UPDATE your_table SET data =? WHERE id =?";
            jdbcTemplate.update(updateSql, newData, id);
            // 更新Redis
            redisTemplate.opsForValue().set("data:" + id, newData);
        }
    }
    
    • 为处理更新异常,可使用事务管理,并添加补偿机制。例如,若Redis更新失败,记录日志并通过定时任务重试更新Redis。
  3. 缓存失效处理实现
    • 引入本地缓存,例如Guava Cache。
    import com.google.common.cache.CacheBuilder;
    import com.google.common.cache.CacheLoader;
    import com.google.common.cache.LoadingCache;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    @Component
    public class LocalCache {
    
        private LoadingCache<String, Object> localCache = CacheBuilder.newBuilder()
              .maximumSize(1000)
              .expireAfterWrite(10, TimeUnit.MINUTES)
              .build(new CacheLoader<String, Object>() {
                    @Override
                    public Object load(String key) throws Exception {
                        return null; // 实际应用中从MySQL加载数据
                    }
                });
    
        public Object getFromLocalCache(String key) {
            try {
                return localCache.get(key);
            } catch (ExecutionException e) {
                return null;
            }
        }
    }
    
    • 使用分布式锁处理缓存穿透。以Redisson为例:
    import org.redisson.api.RLock;
    import org.redisson.api.RedissonClient;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    
    import java.util.concurrent.TimeUnit;
    
    @Service
    public class DataFetchService {
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
        @Autowired
        private RedissonClient redissonClient;
    
        public Object getData(String key) {
            Object data = redisTemplate.opsForValue().get(key);
            if (data == null) {
                data = localCache.getFromLocalCache(key);
                if (data == null) {
                    RLock lock = redissonClient.getLock("lock:" + key);
                    try {
                        if (lock.tryLock(100, 10, TimeUnit.SECONDS)) {
                            data = getFromMySQL(key);
                            redisTemplate.opsForValue().set(key, data);
                            localCache.put(key, data);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        if (lock.isHeldByCurrentThread()) {
                            lock.unlock();
                        }
                    }
                }
            }
            return data;
        }
    
        private Object getFromMySQL(String key) {
            // 从MySQL查询数据逻辑
            return null;
        }
    }
    
  4. 异步更新实现
    • 配置消息队列,以Kafka为例,在Spring Boot中配置application.properties
    spring.kafka.bootstrap-servers=your_kafka_bootstrap_servers
    spring.kafka.consumer.group-id=data-update-group
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    • 生产者发送数据更新消息:
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    
    @Service
    public class KafkaProducer {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void sendUpdateMessage(String topic, String message) {
            kafkaTemplate.send(topic, message);
        }
    }
    
    • 消费者接收消息并更新MySQL和Redis:
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class KafkaConsumer {
    
        @Autowired
        private JdbcTemplate jdbcTemplate;
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        @KafkaListener(topics = "data-update-topic", groupId = "data-update-group")
        public void handleUpdateMessage(String message) {
            // 解析消息,获取更新数据和ID等信息
            // 假设解析后得到id和newData
            String id = parseIdFromMessage(message);
            Object newData = parseDataFromMessage(message);
            // 更新MySQL
            String updateSql = "UPDATE your_table SET data =? WHERE id =?";
            jdbcTemplate.update(updateSql, newData, id);
            // 更新Redis
            redisTemplate.opsForValue().set("data:" + id, newData);
        }
    
        private String parseIdFromMessage(String message) {
            // 解析ID逻辑
            return null;
        }
    
        private Object parseDataFromMessage(String message) {
            // 解析数据逻辑
            return null;
        }
    }