是否可以使用两者中存在的密钥连接两个单独的pubsubio无界PCollections?我尝试用以下操作来完成任务:
read(fiststream(& read(secondstream( -> flatten->生成以加入 ->使用sessign窗口将它们聚集在一起 ->组合 ->组合键,然后用固定尺寸的windows rewindow使用固定尺寸的windows->窗口。
编辑:
这是我创建的管道代码。我遇到两个问题:
- 什么都没有写给磁盘
- 管道开始真正不稳定 - 它随机减慢了某些步骤的处理。特别是组。即使我使用10个DataFlow Worker,它也无法跟上摄入速度。
我需要每秒处理〜10 000会议。每个会话包括1或2个事件,然后需要关闭。
PubsubIO.Read<String> auctionFinishedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE)
.fromTopic("projects/authentic-genre-152513/topics/auction_finished");
PubsubIO.Read<String> auctionAcceptedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE)
.fromTopic("projects/authentic-genre-152513/topics/auction_accepted");
PCollection<String> auctionFinishedStream = p.apply("ReadAuctionFinished", auctionFinishedReader);
PCollection<String> auctionAcceptedStream = p.apply("ReadAuctionAccepted", auctionAcceptedReader);
PCollection<String> combinedEvents = PCollectionList.of(auctionFinishedStream)
.and(auctionAcceptedStream).apply(Flatten.pCollections());
PCollection<KV<String, String>> keyedAuctionFinishedStream = combinedEvents
.apply("AddKeysToAuctionFinished", WithKeys.of(new GenerateKeyForEvent()));
PCollection<KV<String, Iterable<String>>> sessions = keyedAuctionFinishedStream
.apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardMinutes(1)))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
.apply(GroupByKey.create());
PCollection<SodaSession> values = sessions
.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, SodaSession> () {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
c.output(new SodaSession("auctionid", "stattedat"));
}
}));
PCollection<SodaSession> windowedEventStream = values
.apply("ApplyWindowing", Window.<SodaSession>into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))
))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
);
AvroIO.Write<SodaSession> avroWriter = AvroIO
.write(SodaSession.class)
.to("gs://storage/")
.withWindowedWrites()
.withFilenamePolicy(new EventsToGCS.PerWindowFiles("sessionsoda"))
.withNumShards(3);
windowedEventStream.apply("WriteToDisk", avroWriter);
我找到了一个有效的解决方案。由于我的一个集合与另一个集合的大小不成比例,因此我使用侧输入来加快分组操作。这是我解决方案的概述:
- 阅读两个事件流。
- 将它们弄平为单个pcollection。
- 使用滑动窗口大小(可关闭的会话持续时间 会话最大长度,每个可关闭的会话持续时间(。
- 再次分区收集。
- 从较小的pcollection创建PCOLLECTIONVIEW。
- 使用Sideinput加入两个流,并与上一个步骤中创建的视图。
- 将会话写入磁盘。
它处理4000个事件/秒流(较大的一个( 60事件/秒流在1-2数据流工作人员与〜15个工人一起使用时,与Groupby一起使用时。