可以通过阅读GCS存储桶来更新每个窗口的数据流程副业



我目前正在通过从GCS存储桶中读取过滤信息并将其作为侧面输入传递到管道的不同阶段,以便过滤输出来创建PCOLLECTIONVIEW。如果GCS存储桶中的文件更改,我希望当前运行的管道使用此新的过滤器信息。如果我的过滤器更改,是否可以在每个新的数据窗口上更新此PCOLLECTIONVIEW?我以为我可以在开始围栏中做到这一点,但我无法弄清楚如何或是否可能。如果可能的话,你能举个例子。

PCollectionView<Map<String, TagObject>> 
    tagMapView =
        pipeline.apply(TextIO.Read.named("TagListTextRead")
                                  .from("gs://tag-list-bucket/tag-list.json"))
                .apply(ParDo.named("TagsToTagMap").of(new Tags.BuildTagListMapFn()))
                .apply("MakeTagMapView", View.asSingleton());
PCollection<String> 
    windowedData =
        pipeline.apply(PubsubIO.Read.topic("myTopic"))
                .apply(Window.<String>into(
                              SlidingWindows.of(Duration.standardMinutes(15))
                                            .every(Duration.standardSeconds(31))));
PCollection<MY_DATA> 
    lineData = windowedData
        .apply(ParDo.named("ExtractJsonObject")
            .withSideInputs(tagMapView)
            .of(new ExtractJsonObjectFn()));

您可能想要"最多使用1分钟的过滤器"作为侧面输入的东西(因为在理论上,文件可以经常,不可预测,并且独立地更改从您的管道中 - 因此,实际上无法将文件的更改与管道的行为完全同步)。

这是我能够提出的(授予,相当笨拙的)解决方案。它依赖于一个事实,即侧面输入也被隐式由窗口键入。在此解决方案中,我们将创建一个侧面输入窗口到1分钟的固定窗口中,每个窗口将包含标签映射的单个值,该窗口是从过滤器文件派生的,作为该窗口中的某个瞬间。

PCollection<Long> ticks = p
  // Produce 1 "tick" per second
  .apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
  // Window the ticks into 1-minute windows
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  // Use an arbitrary per-window combiner to reduce to 1 element per window
  .apply(Count.globally());
// Produce a collection of tag maps, 1 per each 1-minute window
PCollectionView<TagMap> tagMapView = ticks
  .apply(MapElements.via((Long ignored) -> {
    ... manually read the json file as a TagMap ...
  }))
  .apply(View.asSingleton());

这种模式(反对缓慢更改的外部数据作为侧面输入)正在反复出现,我在这里提出的解决方案远非完美,我希望我们在编程模型中对此有更好的支持。我已经提出了一个梁jira问题来跟踪此问题。

最新更新