我正在学习三叉戟框架。Trident Stream
上有几种方法用于批处理中的聚合元组,包括允许使用Aggregator
接口执行元组的状态映射的方法。但不幸的是,不存在一个内置的对应物来额外持久化映射状态,就像其他 9 个重载一样 persistentAggregate()
,仅以 Aggregator
作为参数。
那么,如何通过组合较低级别的三叉戟和风暴抽象和工具来实现所需的功能?探索API非常困难,因为几乎没有Javadoc文档。
换句话说,persistentAggregate()
方法允许通过更新一些持久状态来结束流处理:
stream of tuples ---> persistent state
我想更新持久状态并顺便发出不同的元组:
stream of tuples ------> stream of different tuples
with
persistent state
Stream.aggregate(Fields, Aggregator, Fields)
不提供容错:
stream of tuples ------> stream of different tuples
with
simple in-memory state
您可以使用方法 TridentState#newValuesStream() 从状态创建新流。这将允许您检索聚合值的流。
为了便于说明,我们可以通过添加此方法和调试过滤器来改进 Trident 文档中的示例:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.newValuesStream().each(new Fields("count"), new Debug());
运行此拓扑将输出(到控制台)聚合计数。
希望对你有帮助