我目前正在通过从GCS存储桶中读取过滤信息并将其作为侧面输入传递到管道的不同阶段,以便过滤输出来创建PCOLLECTIONVIEW。如果GCS存储桶中的文件更改,我希望当前运行的管道使用此新的过滤器信息。如果我的过滤器更改,是否可以在每个新的数据窗口上更新此PCOLLECTIONVIEW?我以为我可以在开始围栏中做到这一点,但我无法弄清楚如何或是否可能。如果可能的话,你能举个例子。
PCollectionView<Map<String, TagObject>>
tagMapView =
pipeline.apply(TextIO.Read.named("TagListTextRead")
.from("gs://tag-list-bucket/tag-list.json"))
.apply(ParDo.named("TagsToTagMap").of(new Tags.BuildTagListMapFn()))
.apply("MakeTagMapView", View.asSingleton());
PCollection<String>
windowedData =
pipeline.apply(PubsubIO.Read.topic("myTopic"))
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(15))
.every(Duration.standardSeconds(31))));
PCollection<MY_DATA>
lineData = windowedData
.apply(ParDo.named("ExtractJsonObject")
.withSideInputs(tagMapView)
.of(new ExtractJsonObjectFn()));
您可能想要"最多使用1分钟的过滤器"作为侧面输入的东西(因为在理论上,文件可以经常,不可预测,并且独立地更改从您的管道中 - 因此,实际上无法将文件的更改与管道的行为完全同步)。
这是我能够提出的(授予,相当笨拙的)解决方案。它依赖于一个事实,即侧面输入也被隐式由窗口键入。在此解决方案中,我们将创建一个侧面输入窗口到1分钟的固定窗口中,每个窗口将包含标签映射的单个值,该窗口是从过滤器文件派生的,作为该窗口中的某个瞬间。
PCollection<Long> ticks = p
// Produce 1 "tick" per second
.apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Count.globally());
// Produce a collection of tag maps, 1 per each 1-minute window
PCollectionView<TagMap> tagMapView = ticks
.apply(MapElements.via((Long ignored) -> {
... manually read the json file as a TagMap ...
}))
.apply(View.asSingleton());
这种模式(反对缓慢更改的外部数据作为侧面输入)正在反复出现,我在这里提出的解决方案远非完美,我希望我们在编程模型中对此有更好的支持。我已经提出了一个梁jira问题来跟踪此问题。