跳至主要內容

26.Java Stream2

悟空约 7245 字大约 24 分钟...

source: https://blog.csdn.net/yy339452689/article/details/110956119open in new window

1.前言

Java 8 的另一大亮点 Streamopen in new window,它与 java.ioopen in new window 包里的 InputStream 和 OutputStream 是完全不同的概念。

Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便利、高效的聚合操作(aggregate operation),或者大批量数据操作 (bulk data operation)。

Stream API 借助于同样新出现的 Lambdaopen in new window 表达式,极大的提高编程效率和程序可读性。同时它提供串行和并行两种模式进行汇聚操作,并发模式能够充分利用多核处理器的优势,使用 fork/join 并行方式来拆分任务和加速处理过程。

1.1 为什么要用 Stream

我个人总结有如下几个特点:

  • 有高效率的并行操作
  • 有多中功能性的聚合操作
  • 函数式编程,使代码更加简洁,提高编程效率

1.2 什么是聚合操作

举个例子,例如我们现在有一个模块的列表需要做如下处理:

  • 客户每月平均消费金额
  • 最昂贵的在售商品
  • 本周完成的有效订单(排除了无效的)
  • 取十个数据样本作为首页推荐

以上这些操作,你可以理解为就是对一个列表集合的聚合操作啦,类似于 SQL 里面的(count()、sum()、avg()....)!

有一些操作,有人可能会说,可以在 SQL 语句中完成过滤分类!首先不说 SQL 不能实现的功能,即使 SQL 能够实现,但是数据库毕竟是用来读写数据的,主要功能是用于数据落地存储的。并不是用来做大量的逻辑处理的,所以不能为了图方便,而忽略了性能方面的损耗!所以,相比之下,有一些列表操作我们必须在程序中做逻辑处理!那如果我们用之前的 java 处理方式,得像如下操作一样:

for(int i=0;i<10;i++){
    if(....){
    
    } else{
    
    }
 
}

那如果用 Stream 来处理的话,可能就只有如下简单几行:

List<String> list = Arrays.asList("hello","world","stream");

Stream<String> stream = list.stream();

Stream<String> parallelStream = list.parallelStream();

所以,代码不仅简洁了,阅读起来也会很是方便!

2. 正文

2.1 Stream 操作分类

Stream 的操作可以分为两大类:中间操作、终结操作

中间操作可分为:

  • **无状态(Stateless)操作:**指元素的处理不受之前元素的影响
  • **有状态(Stateful)操作:**指该操作只有拿到所有元素之后才能继续下去

终结操作可分为:

  • **短路(Short-circuiting)操作:**指遇到某些符合条件的元素就可以得到最终结果
  • **非短路(Unshort-circuiting)操作:**指必须处理完所有元素才能得到最终结果

Stream 结合具体操作,大致可分为如下图所示:

2.2 Stream API 使用

接下来,我们将按各种类型的操作,对一些常用的功能 API 进行一一讲解:

2.2.1 Stream 构成与创建

2.2.1.1 流的构成

当我们使用一个流的时候,通常包括三个基本步骤:

获取一个数据源(source)→ 数据转换 → 执行操作获取想要的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道。

如下图所示:

2.2.1.2 流的创建

  • 通过 java.util.Collection.stream() 方法用集合创建流
String[] array = {"h", "e", "l", "l", "o"};
Stream<String> arrayStream = Arrays.stream(array);
  • 使用java.util.Arrays.stream(T[] array)方法用数组创建流
Stream<Integer> stream1 = Stream.of(1, 2, 3, 4, 5, 6);
 
Stream<Integer> stream2 = Stream.iterate(0, (x) -> x + 2).limit(3);
stream2.forEach(System.out::println);
 
Stream<Double> stream3 = Stream.generate(Math::random).limit(3);
stream3.forEach(System.out::println)
 
0.9620319103852426
0.8303672905658537
0.09203215202737569
  • Stream的静态方法:of()、iterate()、generate()
public static void main(String[] args) {
    List<Integer> list = Arrays.asList(6, 7, 3, 8, 1, 2);
    Stream<Integer> stream = list.stream();
    stream.filter(x -> x > 5).forEach(System.out::println);
}
 
 
 

  • streamparallelStream的简单区分

stream是顺序流,由主线程按顺序对流执行操作,而**parallelStream是并行流**,内部以多线程并行执行的方式对流进行操作,需要注意使用并行流的前提是流中的数据处理没有顺序要求(会乱序,即使用了 forEachOrdered**)**。例如筛选集合中的奇数,两者的处理不同之处:

当然,除了直接创建并行流,还可以通过parallel()把顺序流转换成并行流:

public class People {
    private String name;
    private int age;
    ...省略get,set方法
}
 

Stream.of("小王:18","小杨:20").map(new Function<String, People>() {
     @Override
     public People apply(String s) {
         String[] str = s.split(":");
         People people = new People(str[0],Integer.valueOf(str[1]));
         return people;
     }
 }).forEach(people-> System.out.println("people = " + people));
}

2.2.2 无状态(Stateless)操作

  • filter筛选,是按照一定的规则校验流中的元素,将符合条件的元素提取到新的流中的操作。
List<String> output = wordList.stream().
map(String::toUpperCase).
collect(Collectors.toList());

流程解析图如下:

举个栗子:

public static void main(String[] args) {
    List<String> list1 = Arrays.asList("m,k,l,a", "1,3,5,7");
    List<String> listNew = list1.stream().flatMap(s -> {
        
        String[] split = s.split(",");
        Stream<String> s2 = Arrays.stream(split);
        return s2;
    }).collect(Collectors.toList());
 
    System.out.println("处理前的集合:" + list1);
    System.out.println("处理后的集合:" + listNew);
}
 


处理前的集合:["m,k,l,a", "1,3,5,7"]
处理后的集合:["m", "k", "l", "a", "1", "3", "5", "7"]
  • 映射 (map、flatMap、peek)

①map:一个元素类型为 T 的流转换成元素类型为 R 的流,这个方法传入一个 Function 的函数式接口,接收一个泛型 T,返回泛型 R,map 函数的定义,返回的流,表示的泛型是 R 对象;

简言之:将集合中的元素 A 转换成想要得到的 B

Stream<String> stream = Stream.of("hello", "felord.cn");
stream.peek(System.out::println);

流程解析图如下:

举个栗子:

Stream<String> stream = Stream.of("hello", "felord.cn");
stream.peek(System.out::println).collect(Collectors.toList());
 

hello
felord.cn

或如下(众多姿势,任君选择):

IntStream mapToInt(ToIntFunction<? super T> mapper);
 
LongStream mapToLong(ToLongFunction<? super T> mapper); 
 
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
 
IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper);
 
LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper);
 
DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper);

②flatMap:接收一个函数作为参数,将流中的每个值都换成另一个流,然后把所有流连接成一个流。

简言之:与 Map 功能类似,区别在于将结合 A 的流转换成 B 流

Stream<String> stream = Stream.of("hello", "felord.cn");
stream.mapToInt(s->s.length()).forEach(System.out::println);
 
 

流程解析图如下:

举个栗子:

public static void main(String[] args) {
 Stream.of(5, 1, 2, 6, 3, 7, 4).unordered().forEach(System.out::println);
     Stream.of(5, 1, 2, 6, 3, 7,4).unordered().parallel().forEach(System.out::println);
}
 
 
//两次输出结果对比(方便比较,写在一起)
第一遍:          第二遍:
//第一行代码输出   //第一行代码输出
5                 5
1                 1
2                 2
6                 6
3                 3
7                 7
4                 4
 
//第二行代码输出   //第二行代码输出
3                 3
6                 6
4                 7
7                 5
2                 4
1                 1
5                 2

③peek:peek 操作接收的是一个 Consumer<T> 函数。顾名思义 peek 操作会按照 Consumer<T> 函数提供的逻辑去消费流中的每一个元素,同时有可能改变元素内部的一些属性。

Stream<String> stream = Stream.of("1", "3","4","10","4","6","23","3");
stream.distinct().forEach(System.out::println);

这里我们要提一下这个 Consumer<T> ,以理解什么是消费。

Consumer<T> 是一个函数接口。一个抽象方法 void accept(T t) 意为接受一个 T 类型的参数并将其消费掉。其实消费给我的感觉就是 “用掉” ,自然返回的就是 void 。通常 “用掉” T 的方式为两种:

  • T 本身的 void 方法 比较典型的就是 setter

  • 把 T 交给其它接口(类)的 void 方法进行处理 比如我们经常用的打印一个对象 System.out.println(T)

操作流程解析图如下:

下面我们来看个栗子:

Stream<T> sorted();
 
Stream<T> sorted(Comparator<? super T> comparator);

执行之后,控制台并没有输出任何字符串!纳尼??

这是因为流的生命周期有三个阶段:

  • 起始生成阶段。

  • 中间操作会逐一获取元素并进行处理。可有可无。所有中间操作都是惰性的,因此,流在管道中流动之前,任何操作都不会产生任何影响。

  • 终端操作。通常分为 最终的消费foreach 之类的)和 归纳collect)两类。还有重要的一点就是终端操作启动了流在管道中的流动。

所以,上面的代码是因为缺少了终端操作,因此,我们改成如下即可:

Stream<Integer> stream = Stream.of(3,1,10,16,8,4,9);
stream.sorted().forEach(System.out::println);

**重点:**peek VS map

他们最大的区别是:

peek 操作 一般用于不想改变流中元素本身的类型或者只想元素的内部状态时;

map 则用于改变流中元素本身类型,即从元素中派生出另一种类型的操作。

④mapToInt、mapToLong、mapToDouble、flatMapToDouble、flatMapToInt、flatMapToLong

以上这些操作是 map 和 flatMap 的特例版,也就是针对特定的数据类型进行映射处理。其对应的方法接口如下:

Stream<Integer> stream = Stream.of(3,1,10,16,8,4,9);
stream.limit(3).forEach(System.out::println);

此处就不全部单独说明了,取一个操作举例说明一下其用法:

Stream<Integer> stream = Stream.of(3,1,10,16,8,4,9);
stream.skip(3).forEach(System.out::println);

并且这些指定类型的流,还有另外一些常用的方法,也是很好用的,可以参考:IntStreamopen in new windowLongStreamopen in new windowDoubleStreamopen in new window

  • 无序化(unordered)

unordered()操作不会执行任何操作来显式地对流进行排序。它的作用是消除了流必须保持有序的约束,从而允许后续操作使用不必考虑排序的优化。

举个栗子:

Stream<Integer> stream = Stream.of(3,1,10,16,8,4,9);
System.out.println("result="+stream.anyMatch(s->s==2));
 

result=false

以上结果,可以看到,虽然用了**unordered()**,但是第一个循环里的数据顺序并没有被打乱;是不是很好奇?

您可以在 **Java 8 文档open in new window**中有一下一段内容:

对于顺序流,顺序的存在与否不会影响性能,只影响确定性。如果流是顺序的,则在相同的源上重复执行相同的流管道将产生相同的结果;

如果是非顺序流,重复执行可能会产生不同的结果。 对于并行流,放宽排序约束有时可以实现更高效的执行

在流有序时, 但用户不特别关心该顺序的情况下,使用 unordered 明确地对流进行去除有序约束可以改善某些有状态或终端操作的并行性能。

2.2.3 有状态(Stateful)操作

  • **distinct:返回由该流的不同元素组成的流(根据 Object.equals(Object));distinct()使用 hashCode()和 equals()方法来获取不同的元素。**因此,我们的类必须实现 hashCode()和 equals()方法。
Stream<Integer> stream = Stream.of(3,1,10,16,8,4,9);
System.out.println("result="+stream.allMatch(s->s>=1));
 
 

result=true

**简言之:就是去重;**下面看下流程解析图:

举个栗子:

Stream<Integer> stream = Stream.of(3,1,10,16,8,4,9);
System.out.println("result="+stream.noneMatch(s -> s>=17 ));
 
 

result=true

可以发现,重复的数字会被剔除掉!那么如果需要对自定义的对象进行过滤,则需要重写对象的 equals 方法即可 !

另外有一个细节可以看到,去重之后还是按照原流中的排序顺序输出的,所以是有序的!

  • sorted:返回由该流的元素组成的流,并根据自然顺序排序

该接口有两种形式:无参和有参数,如:

Stream<Integer> stream = Stream.of(3,1,10,16,8,4,9);
System.out.println("result="+stream.findFirst().get());
 

result=3
 
 

System.out.println("result="+stream.filter(s-> s > 3).findFirst().get());
 

result=10

那区别其实就在于:传入比较器的参数,可以自定义这个比较器,即自定义比较规则**。**

举个栗子:

List<String> strAry = Arrays.asList( "Jhonny", "David", "Jack", "Duke", "Jill","Dany","Julia","Jenish","Divya");
 
String result = strAry.parallelStream().filter(s->s.startsWith("J")).findAny().get();
System.out.println("result = " + result);
 

result = Jill
  • limit:获取流中 n 个元素返回的流

这个很好理解,和 mysql 的中的 limit 函数一样的效果,返回指定个数的元素流。

List<String> strAry = Arrays.asList( "Jhonny", "David", "Jack", "Duke", "Jill","Dany","Julia","Jenish","Divya");
 
String result = strAry.parallelStream().filter(s->s.startsWith("J")).findAny().get();
System.out.println("result = " + result);
 
//输出
result = Jill
或
result = Julia

流程解析图如下:

举个栗子:

List<String> strAry = Arrays.asList( "Jhonny", "David", "Jack", "Duke", "Jill","Dany","Julia","Jenish","Divya");
 
strAry.stream().forEach(s-> {
if("Jack".equalsIgnoreCase(s)) System.out.println(s);
});
 

Jack
  • skip:在丢弃流的第一个n元素之后,返回由该流的其余元素组成的流。

简言之:跳过第 n 个元素,返回其后面的元素流;

String name = "Jack";
strAry.stream().forEach(s-> {
if(name.equalsIgnoreCase(s)) name = "Jackson";
});

流程解析图:

举个栗子:

Stream.of("AAA,","BBB,","CCC,","DDD,").parallel().forEach(System.out::print);
System.out.println("\n______________________________________________");
Stream.of("AAA,","BBB,","CCC,","DDD").parallel().forEachOrdered(System.out::print);
System.out.println("\n______________________________________________");
Stream.of("DDD,","AAA,","BBB,","CCC").parallel().forEachOrdered(System.out::print);
 
//输出为:
 
CCC,DDD,BBB,AAA,
______________________________________________
AAA,BBB,CCC,DDD
______________________________________________
DDD,AAA,BBB,CCC

2.2.4 短路(Short-circuiting)操作

  • anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true;
Object [] toArray();
 
<A> A[] toArray(IntFunction<A[]> generator);

举个栗子:

List<String> strList = Arrays.asList( "Jhonny", "David", "Jack", "Duke", "Jill","Dany","Julia","Jenish","Divya");
 
Object [] strAryNoArg = strList.stream().toArray();
String [] strAry = strList.stream().toArray(String[]::new);
  • allMatch:Stream 中全部元素符合传入的 predicate,返回 true;
Optional<T> reduce(BinaryOperator<T> accumulator);
 
T reduce(T identity, BinaryOperator<T> accumulator);
 
<U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);
 
 

@FunctionalInterface
public interface BinaryOperator<T> extends BiFunction<T,T,T> {

}
 
@FunctionalInterface
public interface BiFunction<T, U, R> {
R apply(T t, U u);

}

举个栗子:

boolean foundAny = false;
T result = null;
for (T element : this stream) {
    if (!foundAny) {
        foundAny = true;
        result = element;
    }
    else
        result = accumulator.apply(result, element);
}
return foundAny ? Optional.of(result) : Optional.empty();
  • noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true.
List<Integer> num = Arrays.asList(1, 2, 4, 5, 6, 7);
 
*原接口一比一原汁原味写法*
Integer integer = num.stream().reduce(new BinaryOperator<Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) {
    System.out.println("x:"+a);
        return a + b;
    }
}).get();
System.out.println("resutl:"+integer);
 
 
*等效写法一*
Integer result = num.stream().reduce((x, y) -> {
    System.out.println("x:"+x);
    return x + y;
}).get();
System.out.println("resutl:"+result);
 
 
*等效的普通写法*
boolean flag = false;
int temp = 0;
for (Integer integer : num) {
if(!flag){
temp = integer;
flag = true;
}else {
System.out.println("x:"+temp);
temp += integer;
}
}
 
System.out.println("resutl:"+temp);

举个栗子:

x:1
x:3
x:7
x:12
x:18
resutl:25
  • findFirst:用于返回满足条件的第一个元素(但是该元素是封装在 Optional 类中)

关于 Optional 可以点这里**:【Java 8 系列】Java 开发者的判空利器 -- Optionalopen in new window**

T result = identity;
for (T element : this stream){
    result = accumulator.apply(result, element)
}
return result;

举个栗子:

List<Integer> num = Arrays.asList(1, 2, 4, 5, 6, 7);
 
*一比一原汁原味写法*
Integer integer = num.stream().reduce(1,new BinaryOperator<Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) {
    System.out.println("a="+a);
        return a + b;
    }
});
System.out.println("resutl:"+integer);
 
 
*普通for循环写法*
int temp = 1;
for (Integer integer : num) {
System.out.println("a="+temp);
temp += integer;
}
 
System.out.println("resutl:"+temp);
  • findAny:返回流中的任意元素(但是该元素也是封装在 Optional 类中)
a=1
a=2
a=4
a=8
a=13
a=19
resutl:26

举个栗子:

List<Integer> num = Arrays.asList(1, 2, 3, 4, 5, 6);
List<Integer> other = new ArrayList<>();
other.addAll(Arrays.asList(7,8,9,10));
 
num.stream().reduce(other,
(x, y) -> { 
System.out.println(JSON.toJSONString(x));
        x.add(y);
        return x;
    },
(x, y) -> { 
        System.out.println("并行才会出现:"+JSON.toJSONString(x));
        return x;
});
 
[7,8,9,10,1]
[7,8,9,10,1,2]
[7,8,9,10,1,2,3]
[7,8,9,10,1,2,3,4]
[7,8,9,10,1,2,3,4,5]
[7,8,9,10,1,2,3,4,5,6]

通过多次执行,我们会发现,其实 findAny 会每次按顺序返回第一个元素。那这个时候,可能会认为 findAny 与 findFirst 方法是一样的效果。其实不然,findAny() 操作,返回的元素是不确定的,对于同一个列表多次调用 findAny() 有可能会返回不同的值。使用 findAny() 是为了更高效的性能。如果是数据较少,串行地情况下,一般会返回第一个结果,如果是并行的情况,那就不能确保是第一个。

我们接着看下面这个例子:

List<Integer> num = Arrays.asList( 4, 5, 6);
List<Integer> other = new ArrayList<>();
other.addAll(Arrays.asList(1,2,3));
 
num.parallelStream().reduce(other,
(x, y) -> { 
        x.add(y);
        System.out.println(JSON.toJSONString(x));
        return x;
    },
(x, y) -> { 
x.addAll(y);
System.out.println("结合:"+JSON.toJSONString(x));
return x;
});
 

[1,2,3,4,5,6]
[1,2,3,4,5,6]
[1,2,3,4,5,6]
结合:[1,2,3,4,5,6,1,2,3,4,5,6]
结合:[1,2,3,4,5,6,1,2,3,4,5,6,1,2,3,4,5,6,1,2,3,4,5,6]
 
[1,2,3,4,6]
[1,2,3,4,6]
[1,2,3,4,6]
结合:[1,2,3,4,6,1,2,3,4,6]
结合:[1,2,3,4,6,1,2,3,4,6,1,2,3,4,6,1,2,3,4,6]
 

[1,2,3,5,4,6]
[1,2,3,5,4,6]
[1,2,3,5,4,6]
结合:[1,2,3,5,4,6,1,2,3,5,4,6]
结合:[1,2,3,5,4,6,1,2,3,5,4,6,1,2,3,5,4,6,1,2,3,5,4,6]

如此可见,在并行流里,findAny 可就不是只返回第一个元素啦!

2.2.5 非短路(Unshort-circuiting)操作

  • forEach:该方法接收一个 Lambda 表达式,然后在 Stream 的每一个元素上执行该表达式

可以理解为我们平时使用的 for 循环,但是较于 for 循环,又略有不同!咱们待会再讲。

<R, A> R collect(Collector<? super T, A, R> collector);
 
<R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);

举个栗子:

List<Integer> numList = Arrays.asList(1,2,3);

numList.stream()
        .collect(()->{ 
        
            System.out.println("构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例");
            System.out.println();
            return new ArrayList();
        }, (a, b) -> {  
            synchronized (Java8DemoServiceImpl.class) {  
                System.out.println("累加器");
                a.forEach(item -> System.out.println("a:" + item));
                System.out.println("b:" + b);
                
                a.add(b);
                
                System.out.println();
            }
        }, (a, b) -> {  
            synchronized (Java8DemoServiceImpl.class) {  
                System.out.println("合并器");
                System.out.println("a:" + JSON.toJSONString(a) + " , " + "b:" + JSON.toJSONString(b));
                
                a.addAll(b);
                System.out.println();  
            }
        })
        .forEach(item -> System.out.println("最终结果项:" + item));

那如果我们把 "Jack" 用在循环外部用一个变量接收,如下操作:

构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例
 
累加器
b:1
 
累加器
a:1
b:2
 
累加器
a:1
a:2
b:3
 
最终结果项:1
最终结果项:2
最终结果项:3

那么此时编辑器则会爆红,

因为 lambdaopen in new window 中,使用的外部变量必须是最终的,不可变的,所以如果我们想要对其进行修改,那是不可能的!如果必须这么使用,可以将外部变量,移至表达式之中使用才行!

  • forEachOrdered:该方法接收一个 Lambda 表达式,然后按顺序在 Stream 的每一个元素上执行该表达式
构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例
构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例
 
构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例
 
累加器
 
b:2
 
累加器
b:1
 
累加器
b:3
 
合并器
a:[2] , b:[3]
 
合并器
a:[1] , b:[2,3]
 
最终结果项:1
最终结果项:2
最终结果项:3

该功能其实和 forEach 是很相似的,也是循环操作!那唯一的区别,就在于 forEachOrdered 是可以保证循环时元素是按原来的顺序逐个循环的!

**但是,也不尽其然!因为有的时候,forEachOrdered 也是不能百分百保证有序!**例如下面这个例子:

List<Integer> num = Arrays.asList( 4, 5, 6);
num.stream().max(Integer::compareTo).ifPresent(System.out::println);

可以看到,在并行流时,由于是多线程处理,其实还是无法保证有序操作的!

  • toArray:返回包含此流元素的数组;当有参数时,则使用提供的generator函数分配返回的数组,以及分区执行或调整大小可能需要的任何其他数组
List<Integer> num = Arrays.asList( 4, 5, 6);
num.stream().min(Integer::compareTo).ifPresent(System.out::println);
 

举个栗子:

List<Integer> num = Arrays.asList( 4, 5, 6);
System.out.println(num.stream().count());
 

  • reduce:方法接收一个函数作为累加器,数组中的每个值(从左到右)开始缩减,最终计算为一个值

通过字面意思,可能比较难理解是个什么意思?下面我们先看一个图,熟悉一下这个接口的操作流程是怎样的:

该接口含有 3 种调用方式:

Optional<T> reduce(BinaryOperator<T> accumulator);
 
T reduce(T identity, BinaryOperator<T> accumulator);
 
<U> U reduce(U identity,
                 BiFunction<U, ? super T, U> accumulator,
                 BinaryOperator<U> combiner);
 
 

@FunctionalInterface
public interface BinaryOperator<T> extends BiFunction<T,T,T> {

}
 
@FunctionalInterface
public interface BiFunction<T, U, R> {
R apply(T t, U u);

}

下面举几个栗子,看看具体效果:

(一). 先以 1 个参数的接口为例

为了方便理解,先看下内部的执行效果代码:

boolean foundAny = false;
T result = null;
for (T element : this stream) {
    if (!foundAny) {
        foundAny = true;
        result = element;
    }
    else
        result = accumulator.apply(result, element);
}
return foundAny ? Optional.of(result) : Optional.empty();

再看下具体栗子:

List<Integer> num = Arrays.asList(1, 2, 4, 5, 6, 7);
 
*原接口一比一原汁原味写法*
Integer integer = num.stream().reduce(new BinaryOperator<Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) {
    System.out.println("x:"+a);
        return a + b;
    }
}).get();
System.out.println("resutl:"+integer);
 
 
*等效写法一*
Integer result = num.stream().reduce((x, y) -> {
    System.out.println("x:"+x);
    return x + y;
}).get();
System.out.println("resutl:"+result);
 
 
*等效的普通写法*
boolean flag = false;
int temp = 0;
for (Integer integer : num) {
if(!flag){
temp = integer;
flag = true;
}else {
System.out.println("x:"+temp);
temp += integer;
}
}
 
System.out.println("resutl:"+temp);

执行结果都是:

x:1
x:3
x:7
x:12
x:18
resutl:25

(二)再以 2 个参数的接口为例

先看下内部的执行效果代码:

T result = identity;
for (T element : this stream){
    result = accumulator.apply(result, element)
}
return result;

在看具体栗子:

List<Integer> num = Arrays.asList(1, 2, 4, 5, 6, 7);
 
*一比一原汁原味写法*
Integer integer = num.stream().reduce(1,new BinaryOperator<Integer>() {
    @Override
    public Integer apply(Integer a, Integer b) {
    System.out.println("a="+a);
        return a + b;
    }
});
System.out.println("resutl:"+integer);
 
 
*普通for循环写法*
int temp = 1;
for (Integer integer : num) {
System.out.println("a="+temp);
temp += integer;
}
 
System.out.println("resutl:"+temp);

输出结果都是:

a=1
a=2
a=4
a=8
a=13
a=19
resutl:26

(三)最后 3 个参数的接口为例

这个接口的内部执行效果,其实和 2 个参数的几乎一致。那么第三个参数是啥呢?这是一个 combiner 组合器;

组合器需要和累加器的返回类型需要进行兼容,combiner 组合器的方法主要用在并行操作中

在看具体栗子:

List<Integer> num = Arrays.asList(1, 2, 3, 4, 5, 6);
List<Integer> other = new ArrayList<>();
other.addAll(Arrays.asList(7,8,9,10));
 
num.stream().reduce(other,
(x, y) -> { 
System.out.println(JSON.toJSONString(x));
        x.add(y);
        return x;
    },
(x, y) -> { 
        System.out.println("并行才会出现:"+JSON.toJSONString(x));
        return x;
});
 
 
 
 

[7,8,9,10,1]
[7,8,9,10,1,2]
[7,8,9,10,1,2,3]
[7,8,9,10,1,2,3,4]
[7,8,9,10,1,2,3,4,5]
[7,8,9,10,1,2,3,4,5,6]

我们再讲串行流改成并行流,看下会出现什么结果:

List<Integer> num = Arrays.asList( 4, 5, 6);
List<Integer> other = new ArrayList<>();
other.addAll(Arrays.asList(1,2,3));
 
num.parallelStream().reduce(other,
(x, y) -> { 
        x.add(y);
        System.out.println(JSON.toJSONString(x));
        return x;
    },
(x, y) -> { 
x.addAll(y);
System.out.println("结合:"+JSON.toJSONString(x));
return x;
});
 
 


[1,2,3,4,5,6]
[1,2,3,4,5,6]
[1,2,3,4,5,6]
结合:[1,2,3,4,5,6,1,2,3,4,5,6]
结合:[1,2,3,4,5,6,1,2,3,4,5,6,1,2,3,4,5,6,1,2,3,4,5,6]
 

[1,2,3,4,6]
[1,2,3,4,6]
[1,2,3,4,6]
结合:[1,2,3,4,6,1,2,3,4,6]
结合:[1,2,3,4,6,1,2,3,4,6,1,2,3,4,6,1,2,3,4,6]
 

[1,2,3,5,4,6]
[1,2,3,5,4,6]
[1,2,3,5,4,6]
结合:[1,2,3,5,4,6,1,2,3,5,4,6]
结合:[1,2,3,5,4,6,1,2,3,5,4,6,1,2,3,5,4,6,1,2,3,5,4,6]

我们会发现每个结果都是乱序的,并且多执行几次,都会出现不同的结果。并且第三个参数组合器内的代码也得到了执行!!

这就是因为并行时,使用多线程时顺序性没有保障所产生的结果。通过实践,可以看到:组合器的作用,其实是对参数 2 中的各个线程,产生的结果进行了再一遍的归约操作!

并且仔细看第二遍的执行结果**:每一组都少了一 1 个值!!!**

所以,对于并行流 parallelStream 操作,必须慎用!!

  • collect:称为收集器,是一个终端操作, 它接收的参数是将流中的元素累积到汇总结果的各种方式
<R, A> R collect(Collector<? super T, A, R> collector);
 
<R> R collect(Supplier<R> supplier,
                  BiConsumer<R, ? super T> accumulator,
                  BiConsumer<R, R> combiner);

第一种方式会比较经常使用到,也比较方便使用,现在先看一看里面常用的一些方法:

|
工厂方法

|

返回类型

|

用于

|
| --- | --- | --- |
|

toList

|

List<T>

|

把流中所有元素收集到 List 中

|
|

示例: List<Menu> menus=Menu.getMenus.stream().collect(Collectors.toList());

|
|

toSet

|

Set<T>

|

把流中所有元素收集到 Set 中, 删除重复项

|
|

示例: Set<Menu> menus=Menu.getMenus.stream().collect(Collectors.toSet());

|
|

toCollection

|

Collection<T>

|

把流中所有元素收集到给定的供应源创建的集合中

|
|

示例: ArrayList<Menu> menus=Menu.getMenus.stream().collect(Collectors.toCollection(ArrayList::new));

|
|

Counting

|

Long

|

计算流中元素个数

|
|

示例: Long count=Menu.getMenus.stream().collect(counting);

|
|

SummingInt

|

Integer

|

对流中元素的一个整数属性求和

|
|

示例: Integer count=Menu.getMenus.stream().collect(summingInt(Menu::getCalories));

|
|

averagingInt

|

Double

|

计算流中元素 integer 属性的平均值

|
|

示例: Double averaging=Menu.getMenus.stream().collect(averagingInt(Menu::getCalories));

|
|

Joining

|

String

|

连接流中每个元素的 toString 方法生成的字符串

|
|

示例: String name=Menu.getMenus.stream().map(Menu::getName).collect(joining(“, ”));

|
|

maxBy

|

Optional<T>

|

一个包裹了流中按照给定比较器选出的最大元素的 optional
如果为空返回的是 Optional.empty()

|
|

示例: Optional<Menu> fattest=Menu.getMenus.stream().collect(maxBy(Menu::getCalories));

|
|

minBy

|

Optional<T>

|

一个包裹了流中按照给定比较器选出的最小元素的 optional
如果为空返回的是 Optional.empty()

|
|

示例: Optional<Menu> lessest=Menu.getMenus.stream().collect(minBy(Menu::getCalories));

|
|

Reducing

|

归约操作产生的类型

|

从一个作为累加器的初始值开始, 利用 binaryOperator 与流中的元素逐个结合, 从而将流归约为单个值

|
|

示例: int count=Menu.getMenus.stream().collect(reducing(0,Menu::getCalories,Integer::sum));

|
|

collectingAndThen

|

转换函数返回的类型

|

包裹另一个转换器, 对其结果应用转换函数

|
|

示例: Int count=Menu.getMenus.stream().collect(collectingAndThen(toList(),List::size));

|
|

groupingBy

|

Map<K,List<T>>

|

根据流中元素的某个值对流中的元素进行分组, 并将属性值做为结果 map 的键

|
|

示例: Map<Type,List<Menu>> menuType=Menu.getMenus.stream().collect(groupingby(Menu::getType));

|
|

partitioningBy

|

Map<Boolean,List<T>>

|

根据流中每个元素应用谓语的结果来对项目进行分区

|
|

示例: Map<Boolean,List<Menu>> menuType=Menu.getMenus.stream().collect(partitioningBy(Menu::isType)

|

第二种方式看起来跟 reduce 的三个入参的方法有点类似,也可以用来实现 filter、map 等操作!

流程解析图如下:

举个栗子:

List<Integer> numList = Arrays.asList(1,2,3);

numList.stream()
        .collect(()->{ 
        
            System.out.println("构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例");
            System.out.println();
            return new ArrayList();
        }, (a, b) -> {  
            synchronized (Java8DemoServiceImpl.class) {  
                System.out.println("累加器");
                a.forEach(item -> System.out.println("a:" + item));
                System.out.println("b:" + b);
                
                a.add(b);
                
                System.out.println();
            }
        }, (a, b) -> {  
            synchronized (Java8DemoServiceImpl.class) {  
                System.out.println("合并器");
                System.out.println("a:" + JSON.toJSONString(a) + " , " + "b:" + JSON.toJSONString(b));
                
                a.addAll(b);
                System.out.println();  
            }
        })
        .forEach(item -> System.out.println("最终结果项:" + item));

运行结果:

构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例
 
累加器
b:1
 
累加器
a:1
b:2
 
累加器
a:1
a:2
b:3
 
最终结果项:1
最终结果项:2
最终结果项:3

如果把上述流换成并行流,会得到如下一种结果:

构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例
构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例
 
构造器,返回一个你想用到的任意起始对象!此处返回一个空List为例
 
累加器
 
b:2
 
累加器
b:1
 
累加器
b:3
 
合并器
a:[2] , b:[3]
 
合并器
a:[1] , b:[2,3]
 
最终结果项:1
最终结果项:2
最终结果项:3

可以看到,根据流内的元素个数 n,起了 n 个线程,同时分别执行了构造器、累加器、合并器内代码!与 reduce 的行为方式基本一致!

  • max:根据提供的 Comparator 返回此流的最大元素
Optional<T> max(Comparator<? super T> comparator);

举个栗子:

List<Integer> num = Arrays.asList( 4, 5, 6);
num.stream().max(Integer::compareTo).ifPresent(System.out::println);
 

  • min:根据提供的 Comparator 返回此流的最小元素
Optional<T> min(Comparator<? super T> comparator);

举个栗子:

List<Integer> num = Arrays.asList( 4, 5, 6);
num.stream().min(Integer::compareTo).ifPresent(System.out::println);
 

  • count:返回此流中的元素计数
long count();

举个栗子:

List<Integer> num = Arrays.asList( 4, 5, 6);
System.out.println(num.stream().count());
 

3. 总结

此处给正在学习的朋友两个小提示:

1、对于流的各种操作所属分类,还不够熟悉的,可以直接进入方法的源码接口内,如下,是可以查看到类型说明的:

2、对于并行流 stream().parallel()、parallelStream() 的使用,须慎重使用!使用前须考虑其不确定因素和无序性,考虑多线程所带来的复杂性!!

2020 年已近年末,再过几天就要步入新年啦!工作之余,耗时几天,终于写完了这篇博文!分享不易,希望感兴趣的朋友,可以留言讨论,点赞收藏!

评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.3.0