如何在三叉戟中映射具有持久状态的元组



我正在学习三叉戟框架。Trident Stream 上有几种方法用于批处理中的聚合元组,包括允许使用Aggregator接口执行元组的状态映射的方法。但不幸的是,不存在一个内置的对应物来额外持久化映射状态,就像其他 9 个重载一样 persistentAggregate() ,仅以 Aggregator 作为参数。

那么,如何通过组合较低级别的三叉戟和风暴抽象和工具来实现所需的功能?探索API非常困难,因为几乎没有Javadoc文档。

换句话说,persistentAggregate()方法允许通过更新一些持久状态来结束流处理:

stream of tuples ---> persistent state

我想更新持久状态并顺便发出不同的元组:

stream of tuples ------> stream of different tuples
                  with
            persistent state

Stream.aggregate(Fields, Aggregator, Fields)不提供容错:

stream of tuples ------> stream of different tuples
                  with
          simple in-memory state

您可以使用方法 TridentState#newValuesStream() 从状态创建新流。这将允许您检索聚合值的流。

为了便于说明,我们可以通过添加此方法和调试过滤器来改进 Trident 文档中的示例:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
    new Values("the cow jumped over the moon"),
    new Values("the man went to the store and bought some candy"),
    new Values("four score and seven years ago"),
    new Values("how many apples can you eat"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();        
topology.newStream("spout1", spout)
    .each(new Fields("sentence"), new Split(), new Fields("word"))
    .groupBy(new Fields("word"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
    .newValuesStream().each(new Fields("count"), new Debug());

运行此拓扑将输出(到控制台)聚合计数。

希望对你有帮助

相关内容

  • 没有找到相关文章

最新更新