03-StreamAPI流式数据处理

StreamAPI是Java8新引入的一个特性,它能够构建一个数据的“处理管线”,让数据沿着“起始管线 -> 中间管线 -> 终止管线”一步步处理成我们需要的结构。这种写法相比大量命令式的forif组成的代码逻辑更加简洁和易于理解,因此使用StreamAPI能够更加方便的处理集合中的数据。不过需要注意的是StreamAPI需要Java8,可能许多线上系统还没过渡到这个“新”版本的JDK。

注:我们知道C#有LINQ,它处理集合是相当好用的。虽然LINQ和StreamAPI写法很不一样(C#是用类SQL的指令,Java是函数式的代码),但本质和编写思路其实是一种。

StreamAPI基本特性

StreamAPI有3个基本特性:

  1. Stream自己不会存储元素
  2. Stream不会改变源对象,它们会返回一个持有结果的新Stream
  3. Stream操作是延迟执行的,也就是说当需要结果时才会执行

StreamAPI使用实例

下面我们实现一个例子,从一个装有若干Student学生对象的List中,筛选出student.age值为16的学生,生成一个新的List

如果不使用StreamAPI,我们的代码将是如下这样的:

List<Student> students = Arrays.asList(
        new Student("Tom", 1, 16),
        new Student("Jerry", 1, 16),
        new Student("Lucy", 1, 17),
        new Student("Bob", 2, 17),
        new Student("Kate", 2, 18)
);

List<Student> result = new ArrayList<>();
for (Student s : students) {
    if (s.getAge() == 16) {
        result.add(s);
    }
}

使用StreamAPI后,我们代码逻辑的可读性大大提高了:

List<Student> students = Arrays.asList(
        new Student("Tom", 1, 16),
        new Student("Jerry", 1, 16),
        new Student("Lucy", 1, 17),
        new Student("Bob", 2, 17),
        new Student("Kate", 2, 18)
);

List<Student> result = students.stream()
        .filter((s) -> s.getAge() == 16)
        .collect(Collectors.toList());

前面代码中,其实可以看出,我们使用StreamAPI分三步:

  1. 起始管线:调用stream()得到流对象
  2. 中间管线:调用filter()等中间操作,进行集合对象的过滤、去重、排序等变换
  3. 终止管线:调用collect()终止操作并生成结果

实际开发工作中,Java代码的数据库增删改查、调用接口和集合处理这些业务逻辑几乎占九成以上,然而业务逻辑其实是十分复杂的!使用上面第一种写法,数据结构嵌套N多层,基本写着写着人就傻了,使用StreamAPI则能够较大的提高我们工程代码的可维护性。

常用流操作

这里列举一些常用的流操作,具体可以参考JDK文档。

起始管线

起始管线操作中包括生成流的方法。

方法 说明
stream() 创建串行流对象
parallelStream() 创建并行流对象
Stream.of() 通过给定的一系列对象创建串行流对象

中间管线

中间管线操作中包括对数据流进行处理的方法,如排序、筛选、转换等。

方法 说明
filter(Predicate p) 过滤操作
distinct() 去重操作
limit(long maxSize) 按数量限制
map(Function f) 对每个元素执行映射函数
flatMap(Function f) 对每个元素执行映射函数(和map的区别是flatMap用于将一个元素映射为多个元素)
sorted() 排序操作
sorted(Comparator comp) 自定义排序操作

终止管线

终止管线操作用于将管线中的数据最终转化输出为一个结果。

方法 说明
forEach(Consumer c) 对元素进行迭代,执行指定的逻辑
collect(Collector c) 将流封装成List、Set等形式的结果
max(Comparator c) 取出流中最大值
min(Comparator c) 取出流中最小值
count() 返回流中元素个数

并行流

如果需要处理大量数据,这种计算密集型程序就需要充分考虑是否可以进行多核心的优化,以节约数据的处理时间。使用StreamAPI能够很容易实现分片和并行流,相比我们手动分隔数据和编写多线程代码,写法更简洁而且不容易出错。

下面例子代码和之前筛选学生的逻辑相同,只不过将流的实现换成了并行流。

List<Student> result = students.parallelStream()
        .filter((s) -> {
                System.out.println("Thread ID: " + Thread.currentThread().getId());
                return s.getAge() == 16;
        })
        .collect(Collectors.toList());

并行流内部使用了ForkJoinPool,默认创建的线程数是CPU核心数,因此并行流适用于计算密集型程序,阻塞式IO类型的操作则不应该放在并行流中。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap