MST

星途 面试题库

面试题:Java并行流底层原理与自定义扩展

深入阐述Java并行流的底层工作原理,包括Fork/Join框架的工作机制。如果需要对并行流进行自定义扩展,以支持特定的数据处理逻辑(例如一种新的数据类型的并行操作),你会如何设计和实现?请详细说明设计思路和关键代码片段。
25.3万 热度难度
编程语言Java

知识考点

AI 面试

面试题答案

一键面试

Java并行流的底层工作原理

  1. 并行流概述:Java 8引入的并行流允许将流操作并行化执行,从而利用多核处理器的优势提高处理效率。它基于Fork/Join框架实现并行计算。
  2. 底层工作原理
    • 数据分割:并行流将数据源(如集合)分割成多个子任务。例如对于一个ArrayList,会按照一定规则(如平均分割)分成若干个部分。
    • 任务提交:这些子任务被提交到Fork/Join线程池中的工作线程。每个工作线程负责处理一个或多个子任务。
    • 执行与合并:工作线程独立执行子任务,完成后将结果合并。例如在reduce操作中,每个子任务计算局部结果,最终合并这些局部结果得到全局结果。

Fork/Join框架的工作机制

  1. 任务划分Fork/Join框架采用分治算法。一个大任务被递归地分割成小任务,直到小任务可以直接计算出结果。例如计算数组元素总和,大任务是计算整个数组和,小任务可能是计算数组的一个子片段的和。
  2. Fork操作:当一个任务无法直接计算时,它会将自己分割成多个子任务,并为每个子任务创建一个新的ForkJoinTask对象,然后将这些子任务提交到ForkJoinPool中异步执行。
  3. Join操作:任务在等待子任务完成时会调用join方法,该方法会阻塞当前线程直到所有子任务完成,并收集它们的结果。例如在归并排序中,对两个子数组排序后,通过join操作将排序后的子数组合并。
  4. 工作窃取算法Fork/Join框架中的工作线程使用工作窃取算法。当一个工作线程完成自己的任务后,会从其他繁忙的工作线程队列中窃取任务来执行,以充分利用CPU资源。

对并行流进行自定义扩展以支持特定数据处理逻辑

  1. 设计思路
    • 定义新的数据类型:首先定义需要支持并行操作的新数据类型,例如自定义一个MyDataType类。
    • 实现Spliterator接口:为新数据类型实现Spliterator接口。Spliterator负责定义如何分割数据,以及如何遍历和处理这些数据。例如对于MyDataType组成的集合,实现trySplit方法来分割数据,tryAdvance方法来处理单个元素。
    • 创建自定义流:通过StreamSupport类创建自定义流。使用StreamSupport.stream方法,传入实现好的Spliterator对象,并指定是否为并行流。
    • 定义操作逻辑:为自定义流定义特定的数据处理逻辑,如自定义的mapfilter等操作。这些操作可以基于MyDataType的特点进行设计。
  2. 关键代码片段
    • 定义新的数据类型
class MyDataType {
    private int value;
    public MyDataType(int value) {
        this.value = value;
    }
    public int getValue() {
        return value;
    }
}
  • 实现Spliterator接口
import java.util.Spliterator;
import java.util.function.Consumer;

class MyDataTypeSpliterator implements Spliterator<MyDataType> {
    private MyDataType[] data;
    private int currentIndex;
    private int endIndex;

    public MyDataTypeSpliterator(MyDataType[] data) {
        this.data = data;
        this.currentIndex = 0;
        this.endIndex = data.length;
    }

    @Override
    public boolean tryAdvance(Consumer<? super MyDataType> action) {
        if (currentIndex < endIndex) {
            action.accept(data[currentIndex++]);
            return true;
        }
        return false;
    }

    @Override
    public Spliterator<MyDataType> trySplit() {
        int mid = currentIndex + (endIndex - currentIndex) / 2;
        if (mid <= currentIndex) {
            return null;
        }
        MyDataType[] splitData = new MyDataType[endIndex - mid];
        System.arraycopy(data, mid, splitData, 0, endIndex - mid);
        MyDataTypeSpliterator split = new MyDataTypeSpliterator(splitData);
        endIndex = mid;
        return split;
    }

    @Override
    public long estimateSize() {
        return endIndex - currentIndex;
    }

    @Override
    public int characteristics() {
        return IMMUTABLE | ORDERED | SIZED;
    }
}
  • 创建自定义流
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class CustomParallelStreamExample {
    public static void main(String[] args) {
        MyDataType[] dataArray = new MyDataType[]{new MyDataType(1), new MyDataType(2), new MyDataType(3)};
        MyDataTypeSpliterator spliterator = new MyDataTypeSpliterator(dataArray);
        Stream<MyDataType> parallelStream = StreamSupport.stream(spliterator, true);
        parallelStream.forEach(data -> System.out.println(data.getValue()));
    }
}
  • 定义自定义操作逻辑(以自定义map为例)
import java.util.function.Function;

class MyStreamOps {
    public static <T, R> Stream<R> map(Stream<T> stream, Function<T, R> mapper) {
        return stream.flatMap(t -> {
            R result = mapper.apply(t);
            return Stream.of(result);
        });
    }
}

使用时:

Stream<MyDataType> parallelStream = StreamSupport.stream(spliterator, true);
Stream<Integer> mappedStream = MyStreamOps.map(parallelStream, MyDataType::getValue);
mappedStream.forEach(System.out::println);