给定一个使用Stream API的简单Apache Storm Topology,初始化Stream有两种方法:
版本1-隐式声明
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5)
.print();
结果:如预期的那样,它只打印整数>5.
版本2-显式声明
Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
integerStream.filter(x -> x > 5);
integerStream.print();
结果:这不起作用-打印了所有元组,包括整数<5.
问题:为什么这个显式声明不能正常工作,以及如何解决这个问题?
拓扑在本地集群上运行,其中IntSpout
只是一个简单的喷口,它通过以下命令发出随机整数:
StormTopology topo = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", new HashMap<>(), topo);
这是因为integerStream.filter(x -> x > 5);
返回了一个您忽略的新流。
这项工作:
Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
filteredStream.print();
在您的第一个示例中还有一个语法错误。它在第四行的末尾多了一个分号。
StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5) // <= there was a semicolon here
.print();