当我执行这段代码时谈论Stream
s。
public class Main {
public static void main(String[] args) {
Stream.of(1,2,3,4,5,6,7,8,9)
.peek(x->System.out.print("nA"+x))
.limit(3)
.peek(x->System.out.print("B"+x))
.forEach(x->System.out.print("C"+x));
}
}
我得到这个输出
A1B1C1
A2B2C2
A3B3C3
因为将我的流限制为前三个组件会强制操作 A、B 和 C 仅执行三次。
尝试使用skip()
方法对最后三个元素执行类似的计算,显示出不同的行为:这
public class Main {
public static void main(String[] args) {
Stream.of(1,2,3,4,5,6,7,8,9)
.peek(x->System.out.print("nA"+x))
.skip(6)
.peek(x->System.out.print("B"+x))
.forEach(x->System.out.print("C"+x));
}
}
输出此
A1
A2
A3
A4
A5
A6
A7B7C7
A8B8C8
A9B9C9
在这种情况下,为什么要执行操作 A1 到 A6?它一定与 limit 是一个短路的有状态中间操作有关,而 skip 不是,但我不明白这个属性的实际含义。难道只是">跳过之前的每一个动作都被执行,而限制前的不是每个人都被执行"吗?
这里有两个流管道。
这些流管道分别包含一个源、多个中间操作和一个终端操作。
但是中间操作是懒惰的。这意味着除非下游操作需要某个项目,否则不会发生任何操作。当它这样做时,中间操作将执行生成所需项所需的所有操作,然后再次等待,直到请求另一个项,依此类推。
终端操作通常是"急切的"。也就是说,他们要求流中完成所需的所有项目。
因此,您应该真正将管道视为forEach
向它后面的流询问下一个项目,该流询问它后面的流,依此类推,一直到源。
考虑到这一点,让我们看看我们的第一个管道有什么:
Stream.of(1,2,3,4,5,6,7,8,9)
.peek(x->System.out.print("nA"+x))
.limit(3)
.peek(x->System.out.print("B"+x))
.forEach(x->System.out.print("C"+x));
因此,forEach
要求第一项。这意味着"B"peek
需要一个项目,并要求limit
输出流提供它,这意味着limit
需要询问"A"peek
,它转到源。给出一个项目,一直到forEach
,你得到第一行:
A1B1C1
forEach
要求另一件物品,然后是另一件。每次,请求都会沿流向上传播并执行。但是当forEach
请求第四项时,当请求到达limit
时,它知道它已经给出了允许它提供的所有项目。
因此,它不会要求"A"窥视另一个项目。它立即指示其项目已用尽,因此不再执行任何操作并终止forEach
。
第二个管道中会发生什么?
Stream.of(1,2,3,4,5,6,7,8,9)
.peek(x->System.out.print("nA"+x))
.skip(6)
.peek(x->System.out.print("B"+x))
.forEach(x->System.out.print("C"+x));
同样,forEach
要求第一项。这是传播回来的。但是当它到达skip
时,它知道它必须从上游要求6个项目,然后才能通过下游。因此,它从"A"peek
上游发出请求,在不向下游传递的情况下使用它,发出另一个请求,依此类推。因此,"A"窥视获得 6 个项目请求并生成 6 个打印,但这些项目不会传递下来。
A1
A2
A3
A4
A5
A6
在 skip
提出的第 7 个请求中,该项目被传递到"B"窥视,并从它传递到forEach
,因此完成完整打印:
A7B7C7
然后就像以前一样。现在,每当skip
收到请求时,都会向上游请求一个项目并将其传递给下游,因为它"知道"它已经完成了跳过工作。因此,其余的打印件正在穿过整个管道,直到源耗尽。
流管道的流畅表示法是导致这种混淆的原因。这样想:
limit(3)
所有流水线操作都被惰性地计算,除了 forEach()
,这是一个终端操作,它会触发"管道的执行"。
执行管道时,中间流定义不会对"之前"或"之后">发生的情况做出任何假设。他们所做的只是获取输入流并将其转换为输出流:
Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("nA"+x));
Stream<Integer> s3 = s2.limit(3);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));
s4.forEach(x->System.out.print("C"+x));
-
s1
包含 9 个不同的Integer
值。 -
s2
查看传递它的所有值并打印它们。 -
s3
将前 3 个值传递给s4
,并在第三个值之后中止管道。s3
不会生成其他值。这并不意味着管道中没有更多值。s2
仍然会生成(和打印(更多值,但没有人请求这些值,因此执行停止。 -
s4
再次查看传递它的所有值并打印它们。 -
forEach
消耗和打印s4
传递给它的任何内容。
这样想一想。整个流完全懒惰。只有终端操作主动从管道中提取新值。从 s4 <- s3 <- s2 <- s1
中提取 3 个值后,s3
将不再产生新值,并且不再从s2 <- s1
中提取任何值。虽然s1 -> s2
仍然能够产生4-9
,但这些值永远不会从管道中提取,因此永远不会被s2
打印。
skip(6)
有了skip()
,同样的事情就会发生:
Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9);
Stream<Integer> s2 = s1.peek(x->System.out.print("nA"+x));
Stream<Integer> s3 = s2.skip(6);
Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x));
s4.forEach(x->System.out.print("C"+x));
-
s1
包含 9 个不同的Integer
值。 -
s2
查看传递它的所有值并打印它们。 -
s3
使用前 6 个值,"跳过它们">,这意味着前 6 个值不会传递给s4
,只有后续值是。 -
s4
再次查看传递它的所有值并打印它们。 -
forEach
使用和打印传递给它的任何s4
。
这里重要的是,s2
不知道剩余的管道跳过了任何值。 s2
独立于之后发生的事情窥视所有值。
再比如:
请考虑此博客文章中列出的此管道
IntStream.iterate(0, i -> ( i + 1 ) % 2)
.distinct()
.limit(10)
.forEach(System.out::println);
当您执行上述操作时,程序将永远不会停止。为什么?因为:
IntStream i1 = IntStream.iterate(0, i -> ( i + 1 ) % 2);
IntStream i2 = i1.distinct();
IntStream i3 = i2.limit(10);
i3.forEach(System.out::println);
这意味着:
-
i1
生成无限数量的交替值:0
、1
、0
、1
、0
、1
、... -
i2
消耗以前遇到过的所有值,只传递"新">值,即总共有 2 个值来自i2
。 -
i3
传递 10 个值,然后停止。
该算法永远不会停止,因为i3
等待i2
在0
和1
后再产生 8 个值,但这些值永远不会出现,而i1
永远不会停止向i2
馈送值。
管道中的某个时刻,生成了 10 多个值并不重要。重要的是,i3
从未见过这10个价值观。
要回答您的问题:
难道只是"跳过之前的每一个动作都被执行,而限制前的不是每个人都被执行"吗?
不。执行skip()
或limit()
之前的所有操作。在两次执行中,您都会得到A1
- A3
.但是limit()
可能会使管道短路,一旦发生感兴趣的事件(达到限制(,就会中止价值消耗。
单独查看蒸汽操作完全是亵渎神明,因为这不是评估流的方式。
谈到 limit(3(,它是一个短路操作,这是有道理的,因为考虑一下,无论limit
之前和之后是什么操作,在流中设置限制都会在获得 n 个元素后停止迭代,直到极限操作,但这并不意味着只会处理 n 个流元素。以这个不同的流操作为例
public class App
{
public static void main(String[] args) {
Stream.of(1,2,3,4,5,6,7,8,9)
.peek(x->System.out.print("nA"+x))
.filter(x -> x%2==0)
.limit(3)
.peek(x->System.out.print("B"+x))
.forEach(x->System.out.print("C"+x));
}
}
将输出
A1
A2B2C2
A3
A4B4C4
A5
A6B6C6
这似乎是正确的,因为 limit 正在等待 3 个流元素通过操作链,尽管处理了 6 个流元素。
所有流都基于拆分器,基本上有两种操作:前进(向前移动一个元素,类似于迭代器(和拆分(将自己划分在任意位置,适合并行处理(。您可以随时停止获取输入元素(由limit
完成(,但您不能只是跳转到任意位置(Spliterator
界面中没有此类操作(。因此skip
操作需要实际从源中读取第一个元素才能忽略它们。请注意,在某些情况下,您可以执行实际跳转:
List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9);
list.stream().skip(3)... // will read 1,2,3, but ignore them
list.subList(3, list.size()).stream()... // will actually jump over the first three elements
也许这个小图有助于获得一些关于如何处理流的自然"感觉"。
第一行=>8=>=7=
...===
描绘了溪流。元素 1..8 从左向右流动。有三个"窗口":
- 在第一个窗口(
peek A
(中,您可以看到所有内容 - 在第二个窗口(
skip 6
或limit 3
(中,一种过滤被完成。第一个或最后一个元素被"消除" - 表示不传递以进行进一步处理。 - 在第三个窗口中,您只能看到那些传递的项目
┌────────────────────────────────────────────────────────────────────────────┐
│ │
│▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸ │
│ 8 7 6 5 4 3 2 1 │
│▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸ │
│ │ │ │ │
│ │ skip 6 │ │
│ peek A limit 3 peek B │
└────────────────────────────────────────────────────────────────────────────┘
可能不是这个解释中的所有内容(甚至可能不是任何东西(在技术上都是完全正确的。但是当我这样看到它时,我很清楚哪些项目到达了哪个级联指令。