假设我们有以下代码
Stream<Integer> ints = Stream.of(1, 2, 3);
ints.peek(System.out::println);
ints.forEach(System.out::println);
然后运行,会得到一个异常:
Exception in thread "main" java.lang.IllegalStateException:
stream has already been operated upon or closed
但是为什么呢?
peek
是一个中间操作,所以我认为,它不会运行/启动流本身?只有forEach
这样做,那么为什么当到达forEach
时流已经被操作了?
我还认为,peek
方法将被丢弃,因为它返回一个我不考虑的新Stream
。就像
String str = "hello world";
str.toUppercase();
str.charAt(0); // <-- h not H
peek
是中间操作,这意味着它执行一个操作并产生一个新的流。但是您正在重用相同的流,因此获得IllegalStateException
.
这是Stream
的Javadoc的引用:
流应该在(上操作,调用中间或终端流操作)只出现一次. 这就排除了例如"forked"流,其中同一源提供两个或多个管道,或对同一流的多次遍历。如果流实现检测到流正在被重用,则可能抛出
IllegalStateException
。然而,由于一些流操作可能返回它们的接收者而不是一个新的流对象,因此可能无法在所有情况下检测到重用。
<一口>Amphesys添加一口>
注意:as@Holger已经指出文档声明(参见上面的引用),在某些情况下,中间流操作可能会返回接收到的流(代表前一阶段),而不是生成一个新的流。
如果您像这样编写代码,它将按预期工作:
Stream<Integer> ints = Stream.of(1, 2, 3);
Stream<Integer> ints1 = ints.peek(System.out::println);
ints1.forEach(System.out::println);
这基本上是链的冗长等效:
Stream.of(1, 2, 3) // ints
.peek(System.out::println) // ints1
.forEach(System.out::println);
您对Stream只有在具有终端操作时才会被激活的理解是正确的。您在代码中提供的流有一个终端操作。
但是值得注意的是,异常不是在执行流时发生的,而是在准备时发生的。管道。例如,当Stream实例被初始化时。有一个内部类AbstractPipeline
,我们作为API用户不直接交互,它的目的定义如下:
下面是这个类的一段代码(链接到源代码)):管道的抽象基类类,它们是核心
Stream接口及其原语的实现专门化。
管理河流管道的建设和评估
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed) // <- Note
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true; // <- Note
previousStage.nextStage = this;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
在这个实现中,每个操作都表示为一个阶段。各个阶段都是相互联系的。在构建下一个阶段时,应该提供到下一个阶段的链接,并且上一个阶段的boolean
属性linkedOrConsumed
是指向true
的。如果前一阶段已经被链接(在代码中发生),构造函数会抛出异常。
因此,即使下面的代码缺少终端操作,也会触发IllegalStateException
(JDK 17, HotSpot JVM)。它将在Stream
对象的准备过程中发生(not)当执行管道时)。
Stream<Integer> ints = Stream.of(1, 2, 3);
ints.map(i -> i * i);
ints.takeWhile(i -> i % 2 != 0); // exception would be thrown while initializing this stage
注意:此信息是实现细节,并且被证明纯粹是对OP遇到的行为的解释。带有没有终端操作的Stream的代码片段当然不能保证以这种方式运行(例如,某些实现可能会简单地丢弃它)。这里的底线是一个流应该只被消费一次。