Gatherers 的方法
- windowFixed(windowSize)
- windowSliding(windowSize)
- fold(initial, folder)
- scan(initial, scanner)
- mapConcurrent(mapConcurrent, mapper)
windowFixed(windowSize)
将 Stream 的元素分组到固定大小的窗口中。返回的每个窗口 List
是不可修改的。
import java.util.List;
import java.util.stream.Gatherers;
import java.util.stream.Stream;public class G1 {public static void main(String[] args) {List<List<Integer>> windows =Stream.of(1, 2, 3, 4, 5, 6, 7, 8).gather(Gatherers.windowFixed(3)).toList();System.out.println(windows);// 窗口不能修改,会抛异常 UnsupportedOperationExceptionwindows.get(0).add(1);}
}
输出:
[[1, 2, 3], [4, 5, 6], [7, 8]]
Exception in thread "main" java.lang.UnsupportedOperationExceptionat java.base/java.util.ImmutableCollections.uoe(ImmutableCollections.java:142)at java.base/java.util.ImmutableCollections$AbstractImmutableCollection.add(ImmutableCollections.java:147)at org.example.stream.G1.main(G1.java:13)
windowSliding(windowSize)
滑动窗口
import java.util.List;
import java.util.stream.Gatherers;
import java.util.stream.Stream;public class G2 {public static void main(String[] args) {List<List<Integer>> windows2 =Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(2)).toList();System.out.println(windows2);List<List<Integer>> windows6 =Stream.of(1,2,3,4,5,6,7,8).gather(Gatherers.windowSliding(6)).toList();System.out.println(windows6);}
}
输出
[[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
[[1, 2, 3, 4, 5, 6], [2, 3, 4, 5, 6, 7], [3, 4, 5, 6, 7, 8]]
fold(initial, folder)
public static <T, R> Gatherer<T, ?, R> fold(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> folder);
类似 reduce 的功能,只会将最后的结果发送到 Stream 的下一个阶段,伪代码如下:
public static <T, R> void fold(Supplier<R> initial, BiFunction<? super R, ? super T, ? extends R> folder) {List<T> elements = null;R value = initial.get();for (T e : elements) {value = folder.apply(value, e);}// 将 value 发送到 Stream 的下一个阶段
}
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Gatherers;
import java.util.stream.Stream;public class G3 {public static void main(String[] args) {Optional<String> numberString =Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).gather(Gatherers.fold(() -> "", (string, number) -> string + number)).findFirst();System.out.println(numberString);}
}// 输出
Optional[123456789]
scan(initial, scanner)
public static <T, R> Gatherer<T, ?, R> scan(Supplier<R> initial,BiFunction<? super R, ? super T, ? extends R> scanner);
使用提供的 scanner
执行 Prefix Scan – 增量累积。从 initial
获得的初始值开始,通过将 scanner
应用于当前值和下一个输入元素来获取每个后续值,并将每个后续值发送到 Stream 的下一个阶段。
fold
、scan
的参数是相同的,区别在于 fold
只会将最后的结果发送到下一个阶段,而 scan
会将每个中间结果都发送到下一个阶段。伪代码:
public static <T, R> void scan(Supplier<R> initial, BiFunction<? super R, ? super T, ? extends R> folder) {List<T> elements = null;R value = initial.get();for (T e : elements) {value = folder.apply(value, e);// 将 value 发送到 Stream 的下一个阶段}
}
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Gatherers;
import java.util.stream.Stream;public class G4 {public static void main(String[] args) {List<String> numberStrings =Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).gather(Gatherers.scan(() -> "", (string, number) -> string + number)).toList();System.out.println(numberStrings);}
}
// 输出
[1, 12, 123, 1234, 12345, 123456, 1234567, 12345678, 123456789]
mapConcurrent(mapConcurrent, mapper)
使用虚拟线程以配置的最大并发级数执行与 Stream#map
方法类似的功能。此操作将保留流的顺序。
public static <T, R> Gatherer<T,?,R> mapConcurrent(final int maxConcurrency,final Function<? super T, ? extends R> mapper)
import java.util.List;
import java.util.stream.Gatherers;
import java.util.stream.Stream;public class G5 {public static void main(String[] args) {List<Integer> numberStrings =Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).gather(Gatherers.mapConcurrent(3, (element) -> {System.out.println(Thread.currentThread().isVirtual());return element * element;})).toList();System.out.println(numberStrings);}
}
输出:可以看出确实使用了虚拟线程
true
true
true
true
true
true
true
true
true
[1, 4, 9, 16, 25, 36, 49, 64, 81]