加入两个流



是否可以使用两者中存在的密钥连接两个单独的pubsubio无界PCollections?我尝试用以下操作来完成任务:

read(fiststream(& read(secondstream( -> flatten->生成以加入 ->使用sessign窗口将它们聚集在一起 ->组合 ->组合键,然后用固定尺寸的windows rewindow使用固定尺寸的windows->窗口。

编辑:

这是我创建的管道代码。我遇到两个问题:

  1. 什么都没有写给磁盘
  2. 管道开始真正不稳定 - 它随机减慢了某些步骤的处理。特别是组。即使我使用10个DataFlow Worker,它也无法跟上摄入速度。

我需要每秒处理〜10 000会议。每个会话包括1或2个事件,然后需要关闭。

    PubsubIO.Read<String> auctionFinishedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE)
            .fromTopic("projects/authentic-genre-152513/topics/auction_finished");
    PubsubIO.Read<String> auctionAcceptedReader = PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE)
            .fromTopic("projects/authentic-genre-152513/topics/auction_accepted");
    PCollection<String> auctionFinishedStream = p.apply("ReadAuctionFinished", auctionFinishedReader);
    PCollection<String> auctionAcceptedStream = p.apply("ReadAuctionAccepted", auctionAcceptedReader);
    PCollection<String> combinedEvents = PCollectionList.of(auctionFinishedStream)
            .and(auctionAcceptedStream).apply(Flatten.pCollections());
    PCollection<KV<String, String>> keyedAuctionFinishedStream = combinedEvents
            .apply("AddKeysToAuctionFinished", WithKeys.of(new GenerateKeyForEvent()));
    PCollection<KV<String, Iterable<String>>> sessions = keyedAuctionFinishedStream
            .apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardMinutes(1)))
                                            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
            .apply(GroupByKey.create());
    PCollection<SodaSession> values = sessions
            .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, SodaSession> () {
                @ProcessElement
                public void processElement(ProcessContext c, BoundedWindow window) {
                    c.output(new SodaSession("auctionid", "stattedat"));
                }
    }));
    PCollection<SodaSession> windowedEventStream = values
            .apply("ApplyWindowing", Window.<SodaSession>into(FixedWindows.of(Duration.standardMinutes(2)))
                    .triggering(Repeatedly.forever(
                            AfterProcessingTime.pastFirstElementInPane()
                                    .plusDelayOf(Duration.standardMinutes(1))
                    ))
                    .withAllowedLateness(Duration.ZERO)
                    .discardingFiredPanes()
            );
    AvroIO.Write<SodaSession> avroWriter = AvroIO
            .write(SodaSession.class)
            .to("gs://storage/")
            .withWindowedWrites()
            .withFilenamePolicy(new EventsToGCS.PerWindowFiles("sessionsoda"))
            .withNumShards(3);
    windowedEventStream.apply("WriteToDisk", avroWriter);

我找到了一个有效的解决方案。由于我的一个集合与另一个集合的大小不成比例,因此我使用侧输入来加快分组操作。这是我解决方案的概述:

  1. 阅读两个事件流。
  2. 将它们弄平为单个pcollection。
  3. 使用滑动窗口大小(可关闭的会话持续时间 会话最大长度,每个可关闭的会话持续时间(。
  4. 再次分区收集。
  5. 从较小的pcollection创建PCOLLECTIONVIEW。
  6. 使用Sideinput加入两个流,并与上一个步骤中创建的视图。
  7. 将会话写入磁盘。

它处理4000个事件/秒流(较大的一个( 60事件/秒流在1-2数据流工作人员与〜15个工人一起使用时,与Groupby一起使用时。

最新更新