面试题答案
一键面试Java并行流的底层工作原理
- 并行流概述:Java 8引入的并行流允许将流操作并行化执行,从而利用多核处理器的优势提高处理效率。它基于
Fork/Join
框架实现并行计算。 - 底层工作原理:
- 数据分割:并行流将数据源(如集合)分割成多个子任务。例如对于一个
ArrayList
,会按照一定规则(如平均分割)分成若干个部分。 - 任务提交:这些子任务被提交到
Fork/Join
线程池中的工作线程。每个工作线程负责处理一个或多个子任务。 - 执行与合并:工作线程独立执行子任务,完成后将结果合并。例如在
reduce
操作中,每个子任务计算局部结果,最终合并这些局部结果得到全局结果。
- 数据分割:并行流将数据源(如集合)分割成多个子任务。例如对于一个
Fork/Join框架的工作机制
- 任务划分:
Fork/Join
框架采用分治算法。一个大任务被递归地分割成小任务,直到小任务可以直接计算出结果。例如计算数组元素总和,大任务是计算整个数组和,小任务可能是计算数组的一个子片段的和。 - Fork操作:当一个任务无法直接计算时,它会将自己分割成多个子任务,并为每个子任务创建一个新的
ForkJoinTask
对象,然后将这些子任务提交到ForkJoinPool
中异步执行。 - Join操作:任务在等待子任务完成时会调用
join
方法,该方法会阻塞当前线程直到所有子任务完成,并收集它们的结果。例如在归并排序中,对两个子数组排序后,通过join
操作将排序后的子数组合并。 - 工作窃取算法:
Fork/Join
框架中的工作线程使用工作窃取算法。当一个工作线程完成自己的任务后,会从其他繁忙的工作线程队列中窃取任务来执行,以充分利用CPU资源。
对并行流进行自定义扩展以支持特定数据处理逻辑
- 设计思路:
- 定义新的数据类型:首先定义需要支持并行操作的新数据类型,例如自定义一个
MyDataType
类。 - 实现
Spliterator
接口:为新数据类型实现Spliterator
接口。Spliterator
负责定义如何分割数据,以及如何遍历和处理这些数据。例如对于MyDataType
组成的集合,实现trySplit
方法来分割数据,tryAdvance
方法来处理单个元素。 - 创建自定义流:通过
StreamSupport
类创建自定义流。使用StreamSupport.stream
方法,传入实现好的Spliterator
对象,并指定是否为并行流。 - 定义操作逻辑:为自定义流定义特定的数据处理逻辑,如自定义的
map
、filter
等操作。这些操作可以基于MyDataType
的特点进行设计。
- 定义新的数据类型:首先定义需要支持并行操作的新数据类型,例如自定义一个
- 关键代码片段:
- 定义新的数据类型:
class MyDataType {
private int value;
public MyDataType(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}
- 实现
Spliterator
接口:
import java.util.Spliterator;
import java.util.function.Consumer;
class MyDataTypeSpliterator implements Spliterator<MyDataType> {
private MyDataType[] data;
private int currentIndex;
private int endIndex;
public MyDataTypeSpliterator(MyDataType[] data) {
this.data = data;
this.currentIndex = 0;
this.endIndex = data.length;
}
@Override
public boolean tryAdvance(Consumer<? super MyDataType> action) {
if (currentIndex < endIndex) {
action.accept(data[currentIndex++]);
return true;
}
return false;
}
@Override
public Spliterator<MyDataType> trySplit() {
int mid = currentIndex + (endIndex - currentIndex) / 2;
if (mid <= currentIndex) {
return null;
}
MyDataType[] splitData = new MyDataType[endIndex - mid];
System.arraycopy(data, mid, splitData, 0, endIndex - mid);
MyDataTypeSpliterator split = new MyDataTypeSpliterator(splitData);
endIndex = mid;
return split;
}
@Override
public long estimateSize() {
return endIndex - currentIndex;
}
@Override
public int characteristics() {
return IMMUTABLE | ORDERED | SIZED;
}
}
- 创建自定义流:
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class CustomParallelStreamExample {
public static void main(String[] args) {
MyDataType[] dataArray = new MyDataType[]{new MyDataType(1), new MyDataType(2), new MyDataType(3)};
MyDataTypeSpliterator spliterator = new MyDataTypeSpliterator(dataArray);
Stream<MyDataType> parallelStream = StreamSupport.stream(spliterator, true);
parallelStream.forEach(data -> System.out.println(data.getValue()));
}
}
- 定义自定义操作逻辑(以自定义
map
为例):
import java.util.function.Function;
class MyStreamOps {
public static <T, R> Stream<R> map(Stream<T> stream, Function<T, R> mapper) {
return stream.flatMap(t -> {
R result = mapper.apply(t);
return Stream.of(result);
});
}
}
使用时:
Stream<MyDataType> parallelStream = StreamSupport.stream(spliterator, true);
Stream<Integer> mappedStream = MyStreamOps.map(parallelStream, MyDataType::getValue);
mappedStream.forEach(System.out::println);