Hazelcast Jet 0.6.1 DAG vs PipeLine API



>我为 DAG 创建了以下示例代码来理解聚合。似乎,slidingWindow顶点不会发出任何记录。

不知道,这里出了什么问题。.

public DAG buildDAG() {
DAG dag = new DAG();
SlidingWindowPolicy winPolicy = slidingWinPolicy(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS);
Vertex source = dag.newVertex("source", SourceProcessors.streamRemoteMapP(getRemoteSourceName(),
getClientConfig(), START_FROM_OLDEST,  WatermarkGenerationParams.noWatermarks()));

Vertex slidingWindow = dag.newVertex("aggregate-to-sliding-win",
aggregateToSlidingWindowP(
singletonList((v) ->  getUserID((Entry<String, CacheEntry<Record>>)v)),
singletonList((v) ->  getTimeStamp((Entry<String, CacheEntry<Record>>)v)),
TimestampKind.EVENT,
winPolicy,
counting(),
TimestampedEntry::new));

Vertex peekOP = dag.newVertex("peekOP", DiagnosticProcessors.writeLoggerP());
Vertex peekOP1 = dag.newVertex("peekOP1", DiagnosticProcessors.writeLoggerP());
Vertex sink = dag.newVertex("sink", SinkProcessors.writeFileP("c:\\data\\op1.txt"));
return dag
.edge(between(source, peekOP))
.edge(between(peekOP, slidingWindow))
.edge(between(slidingWindow,peekOP1))
.edge(between(peekOP1, sink));
}   

同样,我为用于聚合的管道 API 创建了以下示例代码。

这很好用。这将打印文本文件中的记录。

private Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String, CacheEntry<AuditLogRecord>>remoteMapJournal("cache_AuditLog", getClientConfig(), START_FROM_OLDEST))
.addTimestamps((v) ->  getTimeStamp(v), 3000)
.peek()
.groupingKey((v) -> Tuple2.tuple2(getUserID(v),getTranType(v)))
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting())
.map((v)-> getMapKey(v))
.drainTo(Sinks.files("c:\data\op.txt"));
return p;
}

你能帮我更正DAG定义吗?

存在多个问题:

  1. WatermarkGenerationParams.noWatermarks((:要获得窗口处理器的任何输出,您需要水印。使用wmGenParams((v) -> getTimeStamp(v), limitingLag(3000), emitByFrame(winPolicy), -1)

  2. DiagnosticProcessors.writeLoggerP()是一个水槽。它接收项目但不发出任何项目。要查看顶点,请将处理器供应商包装在peekInputP( /* original supplier */ )peekOutputP

  3. slidingWindow的边缘必须是distributedpartitioned。没有这些,你会得到结果,但不正确的结果。

DAG API 适用于使用管道 API 无法实现的高级用例。随着每个 Jet 版本的发布,使用 DAG API 的需求都会减少。如示例所示,管道 API 更易于编写且更简洁。

最新更新