Akka流动滑动窗口以控制降低发射源以通过SourceQueue下沉



update :我将问题放在测试项目中,以详细解释我的意思

==============================================================================

我有Akka源可以从数据库表中读取,然后将一些密钥组降低。但是,似乎在我应用减少功能后,数据永远不会发送到接收器,它将限制降低,因为上游总是有数据来源。

我阅读了一些帖子,并尝试了分组和滑动,但是它不如我想的那样起作用,它只是将消息分组到更大的部分,但永远不会暂停上游并发出散落。以下是Akka流中的代码2.5.2

源降低代码:

source = source
  .groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS))
  .sliding(3, 1)
  .mapConcat(i -> i)
  .mapConcat(i -> i)
  .groupBy(2000000, i -> i.getEntityName())
  .map(i -> new Pair<>(i.getEntityName(), i))
  .reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;})
  .map(i -> i.second())
  .mergeSubstreams();

水槽并运行:

Sink<Object, CompletionStage<Done>> sink = 
        Sink.foreach(i -> System.out.println(i))
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left());
run.run(materIalizer);

我也尝试了.take(predicated(;我使用计时器切换谓词值True和false,但似乎只有第一个切换到False,当我切换回到True时,它不是在上游重新启动。

请提前帮助我!

=============================================================

更新

有关元素类型的信息

添加我想要的东西:我有类调用SystemCodeTracking包含2个属性(id, entityName)

我将拥有对象列表: (1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3")

我想将EntityName分组,然后总结ID,因此,我希望看到的结果是遵循

("table1" 1+4),("table3", 3+5),("table2", 2)

我现在正在做的代码遵循

source
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName)
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId()))
.scan(....)

我现在的问题更多地是关于如何构建扫描唯一状态我应该做吗?

scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId()))

所以您想要什么,如果我理解一切都很好:

  • 首先,iD组
  • 然后按时间窗口和在此时间窗口中进行分组,总结所有systemCodeTracking.getId()

在第一部分中,您需要groupBy。对于第二部分groupedWithin。但是,它们的工作不相同:第一个会给您子花,而第二个会给您带来列表的流程。

因此,我们必须以不同的方式处理它们。

首先,让我们为您的列表编写一个简化器:

private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception {
    if (list.isEmpty()) {
        throw new Exception();
    } else {
        SystemCodeTracking building = list.get(0);
        building.setId(0L);
        list.forEach(next -> building.setId(building.getId() + next.getId()));
        return building;
    }
}

因此,对于列表中的每个元素,我们将building.id递增以获取我们想要的值。

现在您只需要做

Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source
    .groupBy(20000, SystemCodeTracking::getEntityName) // group by name
    .groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS)   // for a given name, group by time window (or by packs of 100)
    .filterNot(List::isEmpty)                          // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer)
    .map(this::reduceList)                             // reduce each list to sum the ids
    .log("====== doing reduceing ")                    // log each passing element using akka logger, rather than `System.out.println`
    .mergeSubstreams()                                 // merge back all elements with different names

最新更新