Java8 Stream<T>
接口:流和并发计算实例
现在的将来发展越来越偏向于函数式了。主要是因为我们在开发Java的时候经常会有很多业务逻辑。很多类型,很多数据需要处理。通过数学严格定义的函数式编程,加上方便的管道运算,再加上各种各样在操作,构成了我们今天丰富多彩的Stream<T>
生态。
Java8的Stream<T>
对一个元素序列提供顺序和并行运算。最重要的4个能力是:
- 支持函数式编程
- 提供管道运算能力
- 提供并发计算能力
- 提供大量的操作
关联面试题:
- 什么是流?
- ::运算符的作用?
- Java 8的Stream价值是什么?
- 创建Stream有几种方法?
- coding:利用parallel并发执行任务?
管道运算
**流是随着时间产生的数据序列。**我们写程序的时候还有很多种流。比如文件流、字符流、网络流、信息流、订阅流……
Stream<T>
也是流,它为序列提供顺序、并发运算。
管道是组织流计算的一种手段,Linux的管道连接一个进程的输出到另一个进程的输入。Stream底层也是管道在支持。每一步操作,都是一个管道。这个管道。连接上游的输出,和下游的输入。Stream的管道设计。是从数据源开始连接上很多中间操作(intermediate operation),对数据进行变换。最后接上一个终止操作(terminal operation)对结果进行运算。和转换。
例子1: Map + Filter
@Test
public void test_mapfilter() {
Stream.of(1,2,3,4,5,6)
.map(x -> x.toString())
.map(x -> x + x)
.map(x -> x + x + x)
// .map(x -> Integer.parseInt(x))
.map(Integer::parseInt)
.forEach(x -> {
System.out.println(x);
});
}
Stream.of是一个数据源的工厂,用来构造流。除此之外容器(collection)类型都可以用stream()
方法直接构造流。
函数式编程最关心的就是类型。map将一个类型映射成另一个类型。forEach是终止操作。
例子2: Map + Filter + Reduce
上面例子中的result是OptionalInt类型。Optional<T>
类型代表一个可能有值,可能没有值的箱子(Monad)。isPresent
可以判断这个箱子是否有值。因为T必须是对象,因此OptionalInt是针对int封装的Optional类型。
下面构造了一个空流,返回的OptionalInt的isPresent值为false
。可以用orElseGet
设置默认值。
@Test
public void test_mapfilterreduce(){
//var result = Stream.of(1,2,3,4,5,)
var result = IntStream.of()
.map(x -> x * x)
.filter(x -> x < 20)
.reduce( Math::max);
// .orElse(0);
//.reduce(0, Integer::min);
System.out.println(result.isPresent());
System.out.println(result.orElseGet(() -> 0));
}
关于更多 Monad,我们在下一节讨论。
Stateful vs Stateless
状态是一个很宽泛的概念。比如内存中的值、寄存器中的值发生变化我们就称之为状态的变化。比如说变量的数据、类型的成员都可以称之为状态。
下面的程序中,sorted是一个中间操作。他对原始数据进行排序,内部的实现中sorted
会改变原始数据的顺序。这种我们称之为stateful
,也就是有状态操作。
@Test
public void test_mutation() {
var stream = Stream.of(1,3,5,2,3,4,5,6).sorted();
stream.forEach(System.out::println);
}
相比之下,map操作,不改变任何状态。只是将一个类型映射成另一个类型,我们称之为无状态操作(stateless)。
flatMap
@Test
public void test_flatMap(){
// String -> Stream<R>
var set = Stream.of("My", "Mine")
.flatMap(str -> str.chars().mapToObj(i -> (char)i))
.collect(Collectors.toSet());
System.out.println(set.stream().collect(Collectors.toList()));
}
并行计算
并行、并发等概念会在并发编程部分详细介绍。
流经过parallel或者parallelStream运算之后,内部会创建ForkJoinPool组织并行计算。通常是以CPU核数-1数量创建线程。也可以自己提供ForkJoinPool。
下面程序中我们自己提供了ForkJoinPool,约定线程数为2。更多细节,请参考视频讲解。
@Test
public void test_parallel() throws ExecutionException, InterruptedException {
var r = new Random();
var list = IntStream.range(0, 1_000_000)
.map(x -> r.nextInt(10_000_000))
.boxed()
.collect(Collectors.toList());
var t0 = System.currentTimeMillis();
System.out.println(list.stream().max((a, b) -> a - b));
System.out.println("time:" + (System.currentTimeMillis() - t0));
// 1000
var pool = new ForkJoinPool(2);
var t1 = System.currentTimeMillis();
var max = pool.submit(() -> list.parallelStream().max((a , b) -> a -b)).get();
// list.stream().parallel().max((a , b) -> a -b);
// 15
// Spliter 1024 -> Thread0 1024 -> Thread1
System.out.println("time:" + (System.currentTimeMillis() - t1) + ",max:" + max);
}