面试题答案
一键面试设计模式层面
- 引入中间件解耦:
- 采用消息队列(如Kafka、RabbitMQ等)作为消息的中转站。在观察者模式中,被观察对象不再直接通知观察者,而是将消息发送到消息队列。观察者从消息队列中拉取消息进行处理。这样可以有效地解耦被观察对象和观察者,避免高并发时直接通知造成的性能问题,同时消息队列自身具备一定的消息缓冲能力,可缓解消息积压。
- 分层架构优化:
- 构建分层架构,将消息处理逻辑进行分层。例如,分为接入层、业务逻辑层和数据持久层。在接入层对高并发的消息进行初步的流量控制和过滤,减少无效消息进入业务逻辑层。业务逻辑层专注于消息的业务处理,数据持久层负责将处理结果持久化。通过分层,不同层可以针对自身特点进行优化,如接入层可以采用限流算法,业务逻辑层可以采用多线程或线程池来提高处理效率。
- 使用异步通知:
- 被观察对象在状态变化时,通过线程池或异步任务框架(如Java的CompletableFuture)将通知任务异步化。这样被观察对象不会因为等待观察者处理消息而阻塞,能快速返回继续执行其他任务,提高系统的并发处理能力。同时,对于异步通知的消息,可以通过队列等方式进行有序处理,避免消息处理混乱。
代码实现角度
- 优化观察者注册与通知:
- 在注册观察者时,使用线程安全的集合(如ConcurrentHashMap)来存储观察者列表,以确保在高并发环境下注册和移除观察者的操作线程安全。
- 在通知观察者时,采用迭代器遍历观察者列表,并且在遍历过程中处理可能出现的观察者动态移除情况,避免ConcurrentModificationException。例如:
Set<Observer> observers = Collections.synchronizedSet(new HashSet<>()); public void notifyObservers() { synchronized (observers) { Iterator<Observer> iterator = observers.iterator(); while (iterator.hasNext()) { Observer observer = iterator.next(); try { observer.update(this); } catch (Exception e) { // 处理异常,如记录日志,可能需要移除异常的观察者 iterator.remove(); } } } }
- 消息队列集成:
- 以Kafka为例,在代码中集成Kafka生产者和消费者。生产者将消息发送到Kafka主题,消费者从主题中拉取消息进行处理。
- 生产者代码示例:
import org.apache.kafka.clients.producer.*; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); String topic = "test-topic"; String message = "Hello, Kafka!"; ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println("Message sent to partition " + metadata.partition() + " at offset " + metadata.offset()); } } }); producer.close(); } }
- 消费者代码示例:
import org.apache.kafka.clients.consumer.*; import java.util.Collections; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); String topic = "test-topic"; consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("Received message: " + record.value()); } } } }
- 异步处理优化:
- 使用Java的线程池来处理观察者的更新逻辑。例如,使用ThreadPoolExecutor创建线程池:
这样可以利用多线程并行处理观察者的更新,提高系统的并发处理能力。同时要注意合理设置线程池的参数,如核心线程数、最大线程数等,以避免资源耗尽或过度竞争。import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ObserverThreadPool { private static final ExecutorService executorService = Executors.newFixedThreadPool(10); public void notifyObserversAsynchronously() { for (Observer observer : observers) { executorService.submit(() -> { observer.update(this); }); } } }