为什么风暴流的这个明确定义不起作用,而隐含的定义起作用



给定一个使用Stream API的简单Apache Storm Topology,初始化Stream有两种方法:


版本1-隐式声明

StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5)
.print();

结果:如预期的那样,它只打印整数>5.


版本2-显式声明

Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
integerStream.filter(x -> x > 5);
integerStream.print();

结果:这不起作用-打印了所有元组,包括整数<5.


问题:为什么这个显式声明不能正常工作,以及如何解决这个问题?


拓扑在本地集群上运行,其中IntSpout只是一个简单的喷口,它通过以下命令发出随机整数:

StormTopology topo = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", new HashMap<>(), topo);

这是因为integerStream.filter(x -> x > 5);返回了一个您忽略的新流。

这项工作:

Stream<Integer> integerStream = builder.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1);
Stream<Integer> filteredStream = integerStream.filter(x -> x > 5);
filteredStream.print();

在您的第一个示例中还有一个语法错误。它在第四行的末尾多了一个分号。

StreamBuilder builder = new StreamBuilder();
builder
.newStream(new IntSpout(), new ValueMapper<Integer>(0), 1)
.filter(x -> x > 5) // <= there was a semicolon here 
.print();

相关内容

  • 没有找到相关文章

最新更新