面试题答案
一键面试1. flatMap在并行流中使用需注意的问题
- 数据分区问题:并行流会将数据分成多个部分并行处理。对于
flatMap
操作,如果数据分区不合理,可能导致某些分区任务过重,而其他分区闲置,影响整体性能。例如,BigDataObject
在分区时,若某个分区包含的BigDataObject
中getSubDataList()
返回的SubData
数量远多于其他分区,就会出现负载不均衡。 - 状态共享问题:如果在
flatMap
操作中访问或修改共享状态,会导致数据一致性问题。比如,有一个共享的计数器,在flatMap
内对其进行自增操作,由于并行执行,可能出现竞态条件,导致计数结果不准确。 - 中间操作开销:
flatMap
本身是一个中间操作,它会生成新的流。在并行流中,这种操作的开销会被放大,因为每个分区都可能产生新的流对象,增加了内存和处理负担。
2. 确保数据一致性和高效性的方法
- 负载均衡:使用合适的分区策略。例如,可以根据
BigDataObject
的某些属性进行预分组,使得每个分区的SubData
数量大致相同。如果BigDataObject
中有一个category
属性,可以根据category
进行分区,确保每个分区内同类型的BigDataObject
数量相近,进而使SubData
数量分布均匀。 - 避免共享状态:尽量不要在
flatMap
操作中访问或修改共享状态。如果确实需要统计某些信息,可以使用Atomic
类型或者LongAdder
等线程安全的类来替代普通变量,减少竞态条件。 - 优化中间操作:减少不必要的中间操作。例如,如果在
flatMap
后紧接着有一个过滤操作,可以考虑将过滤逻辑提前到flatMap
操作中,减少生成的中间流数据量。
3. 代码示例说明问题及解决方案
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class BigDataObject {
private List<SubData> subDataList;
public BigDataObject(List<SubData> subDataList) {
this.subDataList = subDataList;
}
public List<SubData> getSubDataList() {
return subDataList;
}
}
class SubData {
private int value;
public SubData(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
public class FlatMapParallelExample {
public static void main(String[] args) {
// 模拟大量数据
List<BigDataObject> bigDataList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
List<SubData> subDataList = new ArrayList<>();
for (int j = 0; j < (int) (Math.random() * 10); j++) {
subDataList.add(new SubData(j));
}
bigDataList.add(new BigDataObject(subDataList));
}
// 问题示例:共享状态导致数据不一致
AtomicInteger count = new AtomicInteger(0);
try {
List<SubData> result = bigDataList.parallelStream()
.flatMap(bigData -> bigData.getSubDataList().stream())
.peek(subData -> count.incrementAndGet())
.collect(Collectors.toList());
System.out.println("Expected count: " + result.size());
System.out.println("Actual count: " + count.get());
} catch (Exception e) {
e.printStackTrace();
}
// 解决方案:使用线程安全的统计方式
long correctCount = bigDataList.parallelStream()
.flatMap(bigData -> bigData.getSubDataList().stream())
.count();
System.out.println("Correct count: " + correctCount);
// 负载均衡示例:根据BigDataObject的某个属性进行预分组
List<SubData> balancedResult = bigDataList.stream()
.collect(Collectors.groupingBy(bigData -> bigData.getSubDataList().size() % 10))
.entrySet().parallelStream()
.flatMap(entry -> entry.getValue().stream().flatMap(bigData -> bigData.getSubDataList().stream()))
.collect(Collectors.toList());
}
}
在上述代码中:
- 共享状态问题示例:在
peek
操作中对共享的AtomicInteger
进行自增,由于并行流的并行执行,count
的值可能不准确,与实际收集到的SubData
数量不一致。 - 解决方案:使用
count
方法直接统计流中的元素数量,避免共享状态的修改,确保统计结果正确。 - 负载均衡示例:通过对
BigDataObject
根据getSubDataList().size() % 10
进行预分组,再并行处理,使得各个分区的任务负载更均衡。