Skip to content

Java8 Stream<T> 接口:流和并发计算实例

现在的将来发展越来越偏向于函数式了。主要是因为我们在开发Java的时候经常会有很多业务逻辑。很多类型,很多数据需要处理。通过数学严格定义的函数式编程,加上方便的管道运算,再加上各种各样在操作,构成了我们今天丰富多彩的Stream<T>生态。

Java8的Stream<T>对一个元素序列提供顺序和并行运算。最重要的4个能力是:

  1. 支持函数式编程
  2. 提供管道运算能力
  3. 提供并发计算能力
  4. 提供大量的操作

关联面试题:

  1. 什么是流?
  2. ::运算符的作用?
  3. Java 8的Stream价值是什么?
  4. 创建Stream有几种方法?
  5. coding:利用parallel并发执行任务?

管道运算

**流是随着时间产生的数据序列。**我们写程序的时候还有很多种流。比如文件流、字符流、网络流、信息流、订阅流……

Stream<T>也是流,它为序列提供顺序、并发运算。

image-20210126000627892

管道是组织流计算的一种手段,Linux的管道连接一个进程的输出到另一个进程的输入。Stream底层也是管道在支持。每一步操作,都是一个管道。这个管道。连接上游的输出,和下游的输入。Stream的管道设计。是从数据源开始连接上很多中间操作(intermediate operation),对数据进行变换。最后接上一个终止操作(terminal operation)对结果进行运算。和转换。

例子1: Map + Filter

@Test
public void test_mapfilter() {
    Stream.of(1,2,3,4,5,6)
        .map(x -> x.toString())
        .map(x -> x + x)
        .map(x -> x + x + x)
        //                .map(x -> Integer.parseInt(x))
        .map(Integer::parseInt)
        .forEach(x -> {
            System.out.println(x);
        });
}

Stream.of是一个数据源的工厂,用来构造流。除此之外容器(collection)类型都可以用stream() 方法直接构造流。

函数式编程最关心的就是类型。map将一个类型映射成另一个类型。forEach是终止操作。

例子2: Map + Filter + Reduce

image-20210130103159117

上面例子中的result是OptionalInt类型。Optional<T> 类型代表一个可能有值,可能没有值的箱子(Monad)。isPresent 可以判断这个箱子是否有值。因为T必须是对象,因此OptionalInt是针对int封装的Optional类型。

下面构造了一个空流,返回的OptionalInt的isPresent值为false 。可以用orElseGet 设置默认值。

@Test
public void test_mapfilterreduce(){
    //var result = Stream.of(1,2,3,4,5,)
    var result = IntStream.of()
        .map(x -> x * x)
        .filter(x -> x < 20)
        .reduce( Math::max);
    //                    .orElse(0);
    //.reduce(0, Integer::min);
    System.out.println(result.isPresent());
    System.out.println(result.orElseGet(() -> 0));
}

关于更多 Monad,我们在下一节讨论。

Stateful vs Stateless

状态是一个很宽泛的概念。比如内存中的值、寄存器中的值发生变化我们就称之为状态的变化。比如说变量的数据、类型的成员都可以称之为状态。

下面的程序中,sorted是一个中间操作。他对原始数据进行排序,内部的实现中sorted 会改变原始数据的顺序。这种我们称之为stateful ,也就是有状态操作。

@Test
public void test_mutation() {
    var stream = Stream.of(1,3,5,2,3,4,5,6).sorted();
    stream.forEach(System.out::println);
}

相比之下,map操作,不改变任何状态。只是将一个类型映射成另一个类型,我们称之为无状态操作(stateless)。

flatMap

@Test
public void test_flatMap(){
    // String -> Stream<R>
    var set = Stream.of("My", "Mine")
        .flatMap(str -> str.chars().mapToObj(i -> (char)i))
        .collect(Collectors.toSet());
    System.out.println(set.stream().collect(Collectors.toList()));
}

并行计算

并行、并发等概念会在并发编程部分详细介绍。

流经过parallel或者parallelStream运算之后,内部会创建ForkJoinPool组织并行计算。通常是以CPU核数-1数量创建线程。也可以自己提供ForkJoinPool。

下面程序中我们自己提供了ForkJoinPool,约定线程数为2。更多细节,请参考视频讲解。

@Test
public void test_parallel() throws ExecutionException, InterruptedException {

    var r = new Random();
    var list = IntStream.range(0, 1_000_000)
        .map(x -> r.nextInt(10_000_000))
        .boxed()
        .collect(Collectors.toList());

    var t0 = System.currentTimeMillis();
    System.out.println(list.stream().max((a, b) -> a - b));
    System.out.println("time:" + (System.currentTimeMillis() - t0));

    // 1000
    var pool = new ForkJoinPool(2);
    var t1 = System.currentTimeMillis();
    var max = pool.submit(() -> list.parallelStream().max((a , b) -> a -b)).get();

    //        list.stream().parallel().max((a , b) -> a -b);

    // 15
    // Spliter 1024 -> Thread0 1024 -> Thread1

    System.out.println("time:" + (System.currentTimeMillis() - t1) + ",max:" + max);
}

文章来源于自己总结和网络转载,内容如有任何问题,请大佬斧正!联系我