Java8的Stream和Optional
简述
Java 8引入了全新的Stream API。这里的Stream和I/O流不同,它更像具有Iterable的集合类,但行为和集合类又有所不同。
最新添加的Stream API(java.util.stream) 把真正的函数式编程风格引入到Java中。这是目前为止对Java类库最好的补充,因为Stream API可以极大提供Java程序员的生产力,让程序员写出高效率、干净、简洁的代码.
什么是流
Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。
原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。
你可以将 Streams 视为 Java 中第一个充分利用了lambda表达式的强大功能的库,但它没有什么特别奇妙的地方(尽管它被紧密集成到核心 JDK 库中)。Streams 不是该语言的一部分 — 它是一个精心设计的库,充分利用了一些较新的语言特性。
Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。
而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。
- 当使用串行方式去遍历时,每个 item 读完后再读下一个 item。
- 而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。
- Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。
Java 的并行 API 演变历程基本如下:
- 1.0-1.4 中的 java.lang.Thread
- 5.0 中的 java.util.concurrent
- 6.0 中的 Phasers 等
- 7.0 中的 Fork/Join 框架
- 8.0 中的 Lambda
Stream 的另外一大特点是,数据源本身可以是无限的。
集合和流的差异
- 流并不存储其元素
- 流的操作不会修改其数据源
- 流的操作是尽可能惰性执行的
流的创建
Collection接口下的两个方法可以获取流
1
2
3
4
5
6
7
8
9
10
11public interface Collection<E> extends Iterable<E> {
//将集合转换为一个流
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
//产生当前集合中所有元素的顺序流或并行流
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
}数组的静态Stream.of方法
1
2
3
4
5
6
7
8
9
10
11
12
13public interface Stream<T> extends BaseStream<T, Stream<T>> {
//产生一个元素为给定值的流
public static<T> Stream<T> of(T t) {
return StreamSupport.stream(new Streams.StreamBuilderImpl<>(t), false);
}
//产生一个元素为给定值的流,这里是不定长参数
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
}Array.stream(array, from, to)方法可以从数组中位于from(包括)和to(不包括)的元素中创建一个流
1
2
3
4
5
6
7
8
9
10public class Arrays {
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}
public static <T> Stream<T> stream(T[] array) {
return stream(array, 0, array.length);
}
}创建不包含任何元素的流
1
2
3
4
5
6public interface Stream<T> extends BaseStream<T, Stream<T>> {
public static<T> Stream<T> empty() {
return StreamSupport.stream(Spliterators.<T>emptySpliterator(), false);
}
}创建无限流的静态方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public interface Stream<T> extends BaseStream<T, Stream<T>> {
//接受一个不包含任何引元的函数(从技术上将,是一个Supplier<T>接口的对象),创建无限流
public static<T> Stream<T> generate(Supplier<T> s) {
Objects.requireNonNull(s);
return StreamSupport.stream(
new StreamSpliterators.InfiniteSupplyingSpliterator.OfRef<>(Long.MAX_VALUE, s), false);
}
//接受一个“种子”值,以及一个函数(UnaryOperation<T>),产生无限序列
public static<T> Stream<T> iterate(final T seed, final UnaryOperator<T> f) {
Objects.requireNonNull(f);
final Iterator<T> iterator = new Iterator<T>() {
T t = (T) Streams.NONE;
public boolean hasNext() {
return true;
}
public T next() {
return t = (t == Streams.NONE) ? seed : f.apply(t);
}
};
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(
iterator,
Spliterator.ORDERED | Spliterator.IMMUTABLE), false);
}
}Java API中还有大量可以创建流的方法,这里就不一一列举了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16//Collection接口的stream()方法
ArrayList<Integer> arrayList = new ArrayList<>();
arrayList.add(1);
Stream<Integer> stream = arrayList.stream();
//并行流
Stream<Integer> parallelStream = arrayList.parallelStream();
//静态Stream.of()方法
Stream<Integer> stream1 = Stream.of(1,2,3,4);
Stream stream3 = Stream.empty();
//Arrays.stream()方法,把数组中的元素创建一个流
int[] sum = {1,2,3,4,5};
Stream<Integer> stream2 = (Stream<Integer>) Arrays.stream(sum);
流的操作
流的操作类型分为两种:
- Intermediate(转换操作):一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
- Terminal(终止操作):一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。
filter、map、flatMap方法
方法定义如下:
1
2
3
4
5
6
7
8
9
10
11
12public interface Stream<T> extends BaseStream<T, Stream<T>> {
//返回一个流,该流包含与给定谓词匹配的该流的元素。
Stream<T> filter(Predicate<? super T> predicate);
//返回一个流,该流包括将给定函数应用于此流的元素的结果。
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
//返回一个流,该流包括将流中的每个元素替换为通过将提供的映射函数应用于每个元素
//而生成的映射流的内容而得到的结果。
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
}filter解释
filter转换会产生一个流,它的元素与某种条件相匹配。它的引元是
Predicate<T>
,即从T
到boolean
的函数。Predicate<T>
就是一个函数式接口,是一个布尔值函数,即里面的test方法返回的是一个布尔值。1
2
3//转换为只包含长单词的另一个流
List<String> wordList = ...;
Stream<String> longWords = wordList.stream().filter(w -> w.length() > 12);
map方法可以按照某种方式来转换流中的值,传递执行该转换的函数。
示例
1
2
3
4
5
6//将所有单词都转换为小写
Stream<String> lowerCaseWords = words.stream().map(String::toLowerCase);
//也可以使用java8的新特性lambda表达式
//产生的流中包含了所有单词的首字母
Stream<String> firstLetters = words.stream().map(s -> s.substring(0,1));如果我们在一个字符串流上执行映射操作
1
2
3Stream<Stream<String>> result = words.parallelStream().map((s -> s.substring(0,1));
//result: 得到一个流的流
// 比如` [... ["b","o","a","t"], ["a","a","a"], ...]`,
并行map处理的并不是我们想要的结果,如果我们想要摊平它变回一个流
[... "b","o","a","t", "a","a","a", ...]
,我们可以使用flatMap方法。1
Stream<String> flatResult = words.stream().flatMap(s -> s.substring(0,1));
抽取子流和连接流
调用limit方法会返回一个新的流,它在n个元素之后结束(如果原来的流更短,那么就会在流结束时结束)。
- 这个方法对于裁剪无限流的尺寸会显得特别有用
调用skip方法正好相反,它会丢弃前n个元素。
- 例如在将文本分隔为单词时,按照split方法的工作方式,第一个元素是没什么用的空字符串,这时候可以用skip跳过它
concat方法可以将两个流连接起来。当然第一流不应该是无限的,否则第二个流永远都不会得到处理的机会。
方法定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public interface Stream<T> extends BaseStream<T, Stream<T>> {
//返回由该流的元素组成的流,其maxSize长度被截断为不超过长度。
Stream<T> limit(long maxSize);
//在丢弃流的第n个元素之后,返回由该流的其余元素组成的流。
Stream<T> skip(long n);
//创建一个延迟串联的流,其元素是第一个流的所有元素,然后是第二个流的所有元素。
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) {
Objects.requireNonNull(a);
Objects.requireNonNull(b);
Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>(
(Spliterator<T>) a.spliterator(), (Spliterator<T>) b.spliterator());
Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel());
return stream.onClose(Streams.composedClose(a, b));
}
}示例
1
2
3Stream<Double> randoms = Stream.generate(Math::random).limit(100);
Stream<String> words = Stream.of(contents.split("\\PL+")).skip(1);
Stream<String> combined = Stream.concat(stream1, stream2);
其他的流转换
distinct方法会返回一个流,它的元素是原有流中经过剔除重复元素后产生的。这个流能够记住它已经看到过的元素。
sorted方法会产生一个新的流,并会对元素进行排序。其中一种用于操作Comparable元素的流,而另一种可以接受一个Comparator。
peek方法会产生另一个流,它的元素与原来的流中的元素相同,但是在每次获取一个元素时,都会调用一个函数。
方法定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14public interface Stream<T> extends BaseStream<T, Stream<T>> {
//返回由该流的不同元素组成的流(根据 Object.equals(Object))。
Stream<T> distinct();
//返回由该流的元素组成的流,并根据自然顺序排序。
Stream<T> sorted();
//返回由该流的元素组成的流,并根据提供的进行排序Comparator。
Stream<T> sorted(Comparator<? super T> comparator);
//返回由该流的元素组成的流,并在从结果流中消耗元素时对每个元素另外执行提供的操作。
Stream<T> peek(Consumer<? super T> action);
}示例:
1
2
3
4
5//只有一个merrily
Stream<String> uniqueWords = Stream.of("merrily", "merrily", "merrily", "merrily", "gently").distinct();
Stream<String> longestFirst = words.stream().sorted(Compartor.comparing(String::length).reversed());
//实际访问一个元素时,就会打印出来一条消息
Object[] powers = Stream.iterate(1.0, p -> p*2).peek(e -> System.out.println(e).limit(20).toArray());
约简操作
约简是一种终结操作,它们会将流约简为可以在程序中使用的非法值。
count方法就是一种简单约简,它会返回流中元素的数量
max和min方法会返回流中元素的最大值和最小值。
- 另外,这些方法返回的是一个类型
Optional<T>
的值,它要么在其中包装了答案,要么表示没有任何值(因为流碰巧为空)。
- 另外,这些方法返回的是一个类型
findFirst返回的是非空集合中的第一个值。它通常会在与filter组合使用时显得很有用。例如,下面展示了如何找到第一个以字母A开头的单词。如果没有则Optional为空
如果不强调使用第一个匹配,而是使用任意的匹配都可以,那么就可以使用findAny方法。
如果只想知道是否存在匹配,那么可以使用anyMatch。这个方法会接受一个断言引元,因此不需要使用filter。
同样的allMatch和noneMatch方法,它们分别会在所有元素和没有任何元素匹配断言的情况下返回true。
方法定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38public interface Stream<T> extends BaseStream<T, Stream<T>> {
//返回此流中的元素计数。
long count();
//根据提供的返回此流的最小元素 Comparator。
Optional<T> min(Comparator<? super T> comparator);
//根据提供的返回此流的最大元素 Comparator。
Optional<T> max(Comparator<? super T> comparator);
//返回Optional描述此流的第一个元素的描述;Optional如果流为空,则返回null 。
Optional<T> findFirst();
//返回Optional描述流中某些元素的描述;Optional如果流为空,则返回空。
Optional<T> findAny();
//返回此流的任何元素是否与提供的谓词匹配。
boolean anyMatch(Predicate<? super T> predicate);
//返回此流的所有元素是否与提供的谓词匹配。
boolean allMatch(Predicate<? super T> predicate);
//返回此流中是否没有元素与提供的谓词匹配。
boolean noneMatch(Predicate<? super T> predicate);
//使用提供的标识值和关联 累加函数对此流的元素 执行归约,然后返回归约后的值。
T reduce(T identity, BinaryOperator<T> accumulator);
//执行减少有关此流的元件,使用 缔合累积功能,并返回一个Optional描述该减小值,如果有的话。
Optional<T> reduce(BinaryOperator<T> accumulator);
//执行减少有关此流的元件,使用所提供的身份,积累和组合功能。
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
}示例
1
2
3
4
5
6
7
8
9
10
11
12
13Stream<String> words = Stream.of("A","B","C");
System.out.println(words.count());
//输出 :3
Stream<String> words = Stream.of("A","B","C");
System.out.println(words.max(String::compareToIgnoreCase));
//输出 Optional[C]
Stream<String> words = Stream.of("ADD","BDD","CDD");
System.out.println(words.filter(s -> s.startsWith("A")).findFirst());
//输出 Optional[ADD]
boolean aWordStartsWithQ = words.parallel().anyMatch(s -> s.startsWith("A"));
reduce的说明
概述
reduce方法是一种用于从流中计算某个值的通用机制
其最简单的形式将接受一个二元函数,并从前两个元素开始持续应用它。
在并行处理情况下,传入给Reduce等方法的集合类,需要是线程安全的,否则执行结果会与预期结果不一样。
比如求和函数:
1
2
3
4List<Integer> values =...;
Optional<Integer> sum = values.stream().reduce((x,y) -> x+y);
//reduce会计算v0+v1+v2+...,如果流为空,会返回空Optional
//上面也可写成reduce(Integer::sum);如果reduce有一项约简操作op,那么该约简就会产生
v0 op v1 op v2 op ...
, 其中我们调用函数op(vi,vi+1)
写作vi op vi+1
。这项操作应该是可结合的。通常会有一个幺元值e,使得 e op x = x,可以使用这个元素作为计算的起点。然后可以调用第二种形式的reduce,如果流为空,则会返回幺元值,就不再需要处理Optional类了
1
2List<Integer> values =...;
Optional<Integer> sum = values.stream().reduce(0, (x,y) -> x+y);假设你有一个对象流,并且想对某些属性求和,例如字符串流中的所有字符串的长度,那么可以提供一个“累积器”函数
(total, word) -> total + word.length()
。而且需要合并结果,因此需要提供第二个函数执行处理。例如1
int result = words.reduce(0, (total, word) -> total + word.length(), (total1, total2) -> total1 + total2);
参数设置
方法定义
1
2
3
4<U> U reduce(
U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);BiFunction 三个参数可以是一样的也可以不一样;
它是一个函数式接口,包含的函数式方法定义如下:1
2
3
4
5R apply(T t, U u);
BiFunction它接收两个输入返回一个输出;
而Function接收一个输入返回一个输出。BinaryOperator继承自BiFunction的一个接口
BinaryOperator就直接限定了其三个参数必须是一样的;
1
2
3public interface BinaryOperator<T> extends BiFunction<T,T,T>
//它表示的就是两个相同类型的输入经过计算后产生一个同类型的输出。BiConsumer :
也是一个函数式接口,它的定义如下:1
2
3
4
5
6
7
8
9
10//可见它就是一个两个输入参数的Consumer的变种。计算没有返回值。
public interface BiConsumer<T, U> {
/**
* Performs this operation on the given arguments.
*
* @param t the first input argument
* @param u the second input argument
*/
void accept(T t, U u);
}
一个参数的Reduce
一系列数中的正数求和、将序列中满足某个条件的数一起做某些计算等
1
2
3
4
5
6
7
8Optional<T> reduce(BinaryOperator<T> accumulator)
// 表示的含义
T result = a[0];
for (int i = 1; i < n; i++) {
result = accumulator.apply(result, a[i]);
}
return result;求和
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22Stream<Integer> s = Stream.of(1, 2, 3, 4, 5, 6);
/**
* 求和,也可以写成Lambda语法:
* Integer sum = s.reduce((a, b) -> a + b).get();
*/
Integer sum = s.reduce(new BinaryOperator<Integer>() {
public Integer apply(Integer integer, Integer integer2) {
return integer + integer2;
}
}).get();
/**
* 求最大值,也可以写成Lambda语法:
* Integer max = s.reduce((a, b) -> a >= b ? a : b).get();
*/
Integer max = s.reduce(new BinaryOperator<Integer>() {
public Integer apply(Integer integer, Integer integer2) {
return integer >= integer2 ? integer : integer2;
}
}).get();
两个参数的Reduce
相对于一个参数的方法来说,它多了一个T类型的参数;
实际上就相当于需要计算的值在Stream的基础上多了一个初始化的值。
计算的顺序是:identity与a[0]进行二合运算,结果与a[1]再进行二合运算,最终与a[n-1]进行二合运算。
1
2
3
4
5
6
7
8T reduce(T identity, BinaryOperator<T> accumulator)
//n个元素的数组进行运算时,其表达的含义如下:
T result = identity;
for (int i = 0; i < n; i++) {
result = accumulator.apply(result, a[i]);
}
return result;将一个String类型的Stream中的所有元素连接到一起并在最前面添加[value]后返回:
1
2
3
4
5
6
7
8
9
10
11
12
13
14Stream<String> s = Stream.of("test", "t1", "t2", "teeeee", "aaaa", "taaa");
System.out.println(
s.reduce("[value]",new BinaryOperator<String>(){
public String apply(String s, String s1){
return s.conncat(s2);
}
}
)
//以下结果将会是:[value]testt1t2teeeeeaaaataaa
//也可以使用Lambda语法:
System.out.println(s.reduce("[value]", (s1, s2) -> s1.concat(s2)));
三个参数的Reduce
方法定义
1
2
3
4
5<U> U reduce(
U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner
)identity: 一个初始化的值;这个初始化的值其类型是泛型U,与Reduce方法返回的类型一致;
注意此时Stream中元素的类型是T,与U可以不一样也可以一样,这样的话操作空间就大了;
不管Stream中存储的元素是什么类型,U都可以是任何类型,如U可以是一些基本数据类型的包装类型Integer、Long等;或者是String,又或者是一些集合类型ArrayList等;后面会说到这些用法。
accumulator: 其类型是BiFunction,输入是U与T两个类型的数据,而返回的是U类型;
- 也就是说返回的类型与输入的第一个参数类型是一样的,
- 而输入的第二个参数类型与Stream中元素类型是一样的。
combiner: 其类型是BinaryOperator,支持的是对U类型的对象进行操作;
- combiner主要是使用在并行计算的场景;如果Stream是非并行时,第三个参数实际上是不生效的。
非并行:其计算过程与两个参数时的Reduce基本是一致的。
1
2
3
4
5
6//Result的类型是U,而Element的类型是T!如果U与T一样,那么与1.2就是完全一样的;第三个指定null为返回值。
U result = identity;
for (T element:a) {
result = accumulator.apply(result, element);
}
return result;设U的类型是ArrayList,那么可以将Stream中所有元素添加到ArrayList中再返回了,如下示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19/**
* 以下reduce生成的List将会是[aa, ab, c, ad]
* Lambda语法:
* System.out.println(s1.reduce(new ArrayList<String>(), (r, t) -> {r.add(t); return r; }, (r1, r2) -> r1));
*/
Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
System.out.println(s1.reduce(new ArrayList<String>(),
new BiFunction<ArrayList<String>, String, ArrayList<String>>() {
public ArrayList<String> apply(ArrayList<String> u, String s) {
u.add(s);
return u;
}
}, new BinaryOperator<ArrayList<String>>() {
public ArrayList<String> apply(ArrayList<String> strings, ArrayList<String> strings2) {
return strings;
}
}));并行:当Stream是并行时,第三个参数就有意义了,它会将不同线程计算的结果调用combiner做汇总后返回。
注意由于采用了并行计算,前两个参数与非并行时也有了差异!
示例
计算4+1+2+3的结果,其中4是初始值:
并行时的计算结果是18,而非并行时的计算结果是10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/**
* lambda语法:
* System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, (s1, s2) -> s1 + s2
, (s1, s2) -> s1 + s2));
**/
System.out.println(Stream.of(1, 2, 3).parallel().reduce(4, new BiFunction<Integer, Integer, Integer>() {
public Integer apply(Integer integer, Integer integer2) {
return integer + integer2;
}
}
, new BinaryOperator<Integer>() {
public Integer apply(Integer integer, Integer integer2) {
return integer + integer2;
}
}));先分析下非并行时的计算过程;每一步都要依赖前一步的运算结果!
- 第一步计算4 + 1 = 5,
- 第二步是5 + 2 = 7,
- 第三步是7 + 3 = 10。
并行时的计算过程:初始值4是存储在一个变量result中的;并行计算时,线程之间没有影响,因此每个线程在调用第二个参数BiFunction进行计算时,直接都是使用result值当其第一个参数(由于Stream计算的延迟性,在调用最终方法前,都不会进行实际的运算,因此每个线程取到的result值都是原始的4),
- 线程1:1 + 4 = 5;
- 线程2:2 + 4 = 6;
- 线程3:3 + 4 = 7;
- Combiner函数: 5 + 6 + 7 = 18!
示例输出的结果是210!
1
2
3
4
5
6
7
8
9Stream.of(1, 2, 3).parallel().reduce(4, (s1, s2) -> s1 + s2 , (s1, s2) -> s1 * s2);
线程1:1 + 4 = 5;
线程2:2 + 4 = 6;
线程3:3 + 4 = 7;
ombiner函数5 * 6 * 7 = 210。
// 类似于:
Stream.of(1, 2, 3).map(n -> n + 4).reduce((s1, s2) -> s1 * s2);
三个参数误解
三个参数时:
- 第一个参数的类型是ArrayList等对象而非基本数据类型的包装类或者String,
- 第三个函数的处理上可能容易引起误解。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35/**
* 模拟Filter查找其中含有字母a的所有元素,打印结果将是aa ab ad
* lambda语法:
* s1.parallel().reduce(new ArrayList<String>(),
(r, t) -> {if (predicate.test(t)) r.add(t); return r; },
(r1, r2) -> {System.out.println(r1==r2); return r2; }).stream().forEach(System.out::println);
*/
Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
Predicate<String> predicate = t -> t.contains("a");
s1.parallel().reduce(
new ArrayList<String>(),
new BiFunction<ArrayList<String>, String, ArrayList<String>>() {
public ArrayList<String> apply(
ArrayList<String> strings,
String s) {
if (predicate.test(s)) {
strings.add(s);
}
return strings;
}
},
new BinaryOperator<ArrayList<String>>() {
public ArrayList<String> apply(
ArrayList<String> strings,
ArrayList<String> strings2) {
//经过运行后发现是True!
//这是因为每次第二个参数也就是accumulator返回的都是第一个参数中New的ArrayList对象!
//因此combiner中传入的永远都会是这个对象,这样r1与r2就必然是同一样对象!
System.out.println(strings == strings2);
return strings;
}
}).stream().forEach(System.out::println);因此如果按理解的,combiner是将不同线程操作的结果汇总起来,那么一般情况下上述代码就会这样写(lambda):
1
2
3
4
5
6
7Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
//模拟Filter查找其中含有字母a的所有元素,由于使用了r1.addAll(r2),其打印结果将不会是预期的aa ab ad
Predicate<String> predicate = t -> t.contains("a");
s1.parallel().reduce(new ArrayList<String>(),
(r, t) -> {if (predicate.test(t)) r.add(t); return r; },
(r1, r2) -> {r1.addAll(r2); return r1; }).stream().forEach(System.out::println);
collect
collect含义与Reduce有点相似;
1
2
3
4<R> R collect(
Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);supplier:动态的提供初始化的值;创建一个可变的结果容器(JAVADOC);对于并行计算,这个方法可能被调用多次,每次返回一个新的对象;
accumulator:类型为BiConsumer,注意这个接口是没有返回值的;它必须将一个元素放入结果容器中(JAVADOC)。
combiner:类型也是BiConsumer,因此也没有返回值。它与三参数的Reduce类型,只是在并行计算时汇总不同线程计算的结果。它的输入是两个结果容器,必须将第二个结果容器中的值全部放入第一个结果容器中(JAVADOC)。
并行示例
代码
1
2
3
4
5
6
7
8
9
10
11/**
* 模拟Filter查找其中含有字母a的所有元素,打印结果将是aa ab ad
*/
Stream<String> s1 = Stream.of("aa", "ab", "c", "ad");
Predicate<String> predicate = t -> t.contains("a");
System.out.println(
s1.parallel().collect(
() -> new ArrayList<String>(),
(array, s) -> {if (predicate.test(s)) array.add(s); },
(array1, array2) -> array1.addAll(array2))
);理解起来就很容易了:
每个线程都创建了一个结果容器ArrayList,假设每个线程处理一个元素,那么处理的结果将会是[aa],[ab],[],[ad]四个结果容器(ArrayList);最终再调用第三个BiConsumer参数将结果全部Put到第一个List中,因此返回结果就是打印的结果了。
AVADOC中也在强调结果容器(result container)这个,那是否除集合类型,其结果R也可以是其它类型呢?
先看基本类型,由于BiConsumer不会有返回值,如果是基本数据类型或者String,在BiConsumer中加工后的结果都无法在这个函数外体现,因此是没有意义的。
那其它非集合类型的Java对象呢?如果对象中包含有集合类型的属性,也是可以处理的;否则,处理上也没有任何意义,
combiner对象使用一个Java对象来更新另外一个对象?至少目前我没有想到这个有哪些应用场景。
它不同Reduce,Reduce在Java对象上是有应用场景的,就因为Reduce即使是并行情况下,也不会创建多个初始化对象,combiner接收的两个参数永远是同一个对象,如假设有很多人参加会议的记录条,这些记录没有在人本身对象里面存储而在另外一个对象中;人本身对象中只有一个属性是最早参加会议时间,那就可以使用reduce来对这个属性进行更新。当然这个示例不够完美,它能使用其它更快的方式实现,但至少通过Reduce是能够实现这一类型的功能的。
数据收集遍历
iterator
当处理完流,通常会想要查看其元素。此时可以调用iterator方法,它会产生可以用来访问元素的旧式风格的迭代器。
1
2
3
4
5
6
7public interface BaseStream<T, S extends BaseStream<T, S>>
extends AutoCloseable {
//产生一个用于获取当前流中各个元素的迭代器。这是一种终结操作
Iterator<T> iterator();
}
forEach
- 或者可以调用forEach方法,将某个函数应用于每个元素。在并行流上,forEach方法会以任意顺序遍历各个元素。
- 也可以用forEachOrdered方法按照流中的顺序来处理。不过这个方法会丧失并行处理的部分甚至全部优势。
1 | public interface Stream<T> extends BaseStream<T, Stream<T>> { |
遍历输出流中的元素
1
stream.forEach(System.out::println);
toArray方法
可以获得由流的元素构成的数组,但是由于无法在运行时创建泛型数组,所以会返回一个Object[]数组。也可以通过传递构造器引用来获取正确的数组类型。
1
2
3
4//将流转换为数组,获得Object[]数组
Object[] result = stream.toArray();
//传入String的构造器引用,获得String[]数组
String[] result = stream.toArray(String::new);
collect元素收集到集合
针对将流中的元素收集到另一个目标中,有一个便捷方法collect可用,它会接受一个Collector接口的实例。Collectors类提供了大量用于生成公共收集器的工厂方法。调用collect方法可以进行多种操作:
1
2
3
4
5
6
7
8
9
10
11
12
13//将流收集到列表中
List<String> result = stream.collect(Collectors.toList());
//将流收集到集合中
Set<String> result = stream.collect(Collectors.toSet());
//如果想要获取集的具体实现类,可以
TreeSet<String> result = stream.collect(Collectors.toCollection(TreeSet::new));
//如果想要通过连接操作收集流中的所有字符串
String result = stream.collect(Collectors.joining());
//还可以在元素之间添加分隔符
String result = stream.collect(Collectors.joining(","));
基本类型流
之前我们都是将整数收集到
Stream<Integer>
中,尽管很明显,将每个整数都包装到包装器对象中是很低效的。对其他基本类型来说,情况也是一样的,这些基本类型是:double、float、short、char、byte和boolean
。流库中有专门的类型IntStream、LongStream和DoubleStream
,用来直接存储基本类型值。如果想要存储short,char,byte和boolean,可以使用IntStream;
对于float,可以使用DoubleStream。
仅列举一个为例,其余两个API都拥有相似的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53public interface IntStream extends BaseStream<Integer, IntStream> {
//计算当前流的总和
int sum();
//计算当前流的最小值
OptionalInt min();
//计算当前流的最大值
OptionalInt max();
//计算当前流的平均值
OptionalDouble average();
//获取这些结果的所有四种值的对象
IntSummaryStatistics summaryStatistics();
//产生一个由给定元素构成的IntStream
public static IntStream of(int t) {
return StreamSupport.intStream(new Streams.IntStreamBuilderImpl(t), false);
}
//产生一个由给定元素构成的IntStream
public static IntStream of(int... values) {
return Arrays.stream(values);
}
//产生一个由给定范围内的整数构成的IntStream
public static IntStream range(int startInclusive, int endExclusive) {
if (startInclusive >= endExclusive) {
return empty();
} else {
return StreamSupport.intStream(
new Streams.RangeIntSpliterator(startInclusive, endExclusive, false), false);
}
}
//产生一个由给定范围内的整数构成的IntStream
public static IntStream rangeClosed(int startInclusive, int endInclusive) {
if (startInclusive > endInclusive) {
return empty();
} else {
return StreamSupport.intStream(
new Streams.RangeIntSpliterator(startInclusive, endInclusive, true), false);
}
}
//产生用于当前流中的元素的包装器对象流
Stream<Integer> boxed();
//产生一个由当前流中的元素构成的数组
int[] toArray();
}当你拥有一个对象流时,可以用mapToInt、mapToLong和mapToDouble将其转换为基本类型流。
1
2Stream<String> words = ... ;
IntStream lengths = words.mapToInt(String::length);将基本类型流转换为对象流则需要使用boxed方法
1
2
3Stream<Integer> integers = IntStream.range(0,100).boxed();
//range方法可以生成步长为1的整数范围的流
//boxed方法将基本类型流包装为包装类
并行流
在上面流的介绍里,我们谈到的流大多数是串行流,而上文也提到过并行流。在流的创建里,
stream()
方法就是创建串行流,parallelStream()
方法创建的是并行流。那么他们的区别是什么呢?串行流,即单线程执行的;并行流,即多线程执行操作。
在Java中,并行流使用默认的fork-join池(
ForkJoinPool
)来操作流的各个部分,并且该池是所有并行流共享的。1
2
3
4
5
6public interface Collection<E> extends Iterable<E> {
//用当前集合中的元素产生一个并行流
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
}1
2
3
4
5
6
7
8
9public interface BaseStream<T, S extends BaseStream<T, S>>
extends AutoCloseable {
//产生一个与当前流中元素相同的并行流
S parallel();
//产生一个与当前流中元素相同的无序流
S unordered();
}流使得并行处理块操作变得更容易。这个过程几乎是自动的,但是需要遵守一些规则,首先,必须有一个并行流。可以用
Collection.parallelStream()
方法从任何集合中获取一个并行流:而且,parallel方法可以将任意的顺序流转换为并行流。
1
2Stream<String> parallelWords = words.parallelStream();
Stream<String> parallelWords = Stream.of(wordArray).parallel();只要在终结方法执行时,流处于并行模式,那么所有的中间流操作都将被并行化。
并行流的作用
并行流就是支持多线程操作的流,它使得并行处理变得简单。
Stream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作。
举一个简单的栗子看效果:
1
2
3
4
5
6
7
8
9
10
11
12
13//创建一个串行流,并且遍历输出全部元素
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.stream()
.forEach(System.out::println);
// 结果是 1 2 3 4 5 6 7 8 9
//创建一个并行流,并且遍历输出全部元素
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(System.out::println);
//结果是 6 5 7 9 1 8 4 3 2 (但其实结果每次都不一样)上面的栗子可以看到,并行流在进行操作时,将一个大操作分成了多个小操作并行进行,再将结果组合起来,于是输出的结果顺序是任意顺序。
倘若你想要结果按照原来元素的顺序,就上面的例子,你可以这样做
1
2
3
4List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEachOrdered(System.out::println);
//输出 1 2 3 4 5 6 7 8 9**注意:**如果
forEachOrdered()
中间有其他如filter()
的中介操作,会试着平行化处理,然后最终forEachOrdered()会以原数据顺序处理,因此,使用forEachOrdered()
这类的有序处理,可能会(或完全失去)失去平行化的一些优势,实际上中介操作亦有可能如此,例如sorted()方法。
并行流的性能问题
- 要想深入的研究parallelStream,我们必须先了解ForkJoin框架和ForkJoinPool。
简单了解Fork/Join 框架
Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
我们再通过 Fork 和 Join 这两个单词来理解下 Fork/Join 框架,Fork 就是把一个大任务切分为若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算 1+2+。。+10000,可以分割成 10 个子任务,每个子任务分别对 1000 个数进行求和,最终汇总这 10 个子任务的结果。
ForkJoinPool
Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。
它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。
当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。
自动并行化也被运用在Java 8新添加的Stream API中。
1
2
3List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(System.out::println);对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被前文中提到的ForkJoinPool中的通用线程池处理。以上的并行计算逻辑当然也可以使用ThreadPoolExecutor完成,但是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。
并行流的陷阱
从java8开始,并行编程变得很容易,通过并行流(parallelStream),可以很轻松的实现多线程并行处理。但是,这里面有个性能“陷阱”,如果不注意,使用并行流的效果反而更差,这个陷阱是什么呢?
这个陷阱就是,并行流默认都是用同一个默认的ForkJoinPool,这个ForkJoinPool的线程数和CPU的核心数相同。如果是计算密集型的操作,直接使用是没有问题的,因为这个ForkJoinPool会将所有的CPU打满,系统资源是没有浪费的。但是,如果其中还有IO操作或等待操作,这个默认的ForkJoinPool只能消耗一部分CPU,而另外的并行流因为获取不到该ForkJoinPool的使用权,性能将大大降低。可见,默认的ForkJoinPool必须只能处理计算密集型的任务。
Collectors类型
Collector
是一个接口,其中Collectors类有用于多种收集器的工厂方法。
在流里,collect()
方法大量运用到了Collectors里的工厂方法
进行收集操作等,该方法里具体能做到什么取决于Collectiors类。
由于该类型的收集器工厂方法太多了,下面列举了部分方法,不一一举例了,需要用到的时候再详细研究。
收集结果List|Set
下面这些方法在上面流里有提到过,就不再具体举例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49public final class Collectors {
//将元素收集到列表中的收集器
public static <T>
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}
//将元素收集到集中的收集器
public static <T>
Collector<T, ?, Set<T>> toSet() {
return new CollectorImpl<>((Supplier<Set<T>>) HashSet::new, Set::add,
(left, right) -> { left.addAll(right); return left; },
CH_UNORDERED_ID);
}
//将元素收集到任意集合中的收集器
public static <T, C extends Collection<T>>
Collector<T, ?, C> toCollection(Supplier<C> collectionFactory) {
return new CollectorImpl<>(collectionFactory, Collection<T>::add,
(r1, r2) -> { r1.addAll(r2); return r1; },
CH_ID);
}
//连接字符串的收集器
public static Collector<CharSequence, ?, String> joining() {
return new CollectorImpl<CharSequence, StringBuilder, String>(
StringBuilder::new, StringBuilder::append,
(r1, r2) -> { r1.append(r2); return r1; },
StringBuilder::toString, CH_NOID);
}
//连接字符串,并以指定分隔符分隔的收集器
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter) {
return joining(delimiter, "", "");
}
//连接字符串,并以指定分隔符分隔,第一个字符串之前可以有前缀,最后一个字符串有后缀的收集器
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter,
CharSequence prefix,
CharSequence suffix) {
return new CollectorImpl<>(
() -> new StringJoiner(delimiter, prefix, suffix),
StringJoiner::add, StringJoiner::merge,
StringJoiner::toString, CH_NOID);
}
}
收集到映射表Map
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58public final class Collectors {
/**
*产生一个收集器,它会产生一个映射表或并发映射表。keyMapper和valueMapper函数
*会应用于每个收集到的元素上,从而在所产生的映射表中生存一个键值项。默认情况下,当
*两个元素产生相同的键时,会抛出一个IllegalStateException异常。你可以提供一个
*mergeFunction来合并具有相同键的值。默认情况下,其结果是一个HashMap或ConcurrentHashMap。
*你可以提供一个mapSupplier,它会产生所期望的映射表实例
*/
public static <T, K, U>
Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper) {
return toMap(keyMapper, valueMapper, throwingMerger(), HashMap::new);
}
public static <T, K, U>
Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction) {
return toMap(keyMapper, valueMapper, mergeFunction, HashMap::new);
}
public static <T, K, U, M extends Map<K, U>>
Collector<T, ?, M> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier) {
BiConsumer<M, T> accumulator
= (map, element) -> map.merge(keyMapper.apply(element),
valueMapper.apply(element), mergeFunction);
return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_ID);
}
public static <T, K, U>
Collector<T, ?, ConcurrentMap<K,U>> toConcurrentMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper) {
return toConcurrentMap(keyMapper, valueMapper, throwingMerger(), ConcurrentHashMap::new);
}
public static <T, K, U>
Collector<T, ?, ConcurrentMap<K,U>>
toConcurrentMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction) {
return toConcurrentMap(keyMapper, valueMapper, mergeFunction, ConcurrentHashMap::new);
}
public static <T, K, U, M extends ConcurrentMap<K, U>>
Collector<T, ?, M> toConcurrentMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper,
BinaryOperator<U> mergeFunction,
Supplier<M> mapSupplier) {
BiConsumer<M, T> accumulator
= (map, element) -> map.merge(keyMapper.apply(element),
valueMapper.apply(element), mergeFunction);
return new CollectorImpl<>(mapSupplier, accumulator, mapMerger(mergeFunction), CH_CONCURRENT_ID);
}
}
基础数据类型收集器
1 | public final class Collectors { |
群组和分区
1 | public final class Collectors { |
下游收集器
1 | public final class Collectors { |