Apache Beam/ Google Cloud Dataflow - 在管道中定期加载参考表的任何解决方案?



我需要使用apache beam API实现一个管道,并将在Google Cloud Dataflow上运行管道。管道逻辑如下所示:

  1. 读取来自 Kafka 的实时摄取的无限事件(称之为"RawEvent")
  2. 从 Google BigQuery 加载一个引用表(这个表每天更新,所以每天在某个时间点管道都需要加载它,称之为"RefTable")
  3. 对于每个 RawEvent,
  4. 如果 id 显示在 RefTable 中,则丢弃 RawEvent,否则将其添加到最终输出中。

所以基本上我需要让 RefTable 作为管道中的"静态参考"停留一天,然后每天重新加载它。

我想知道加载 RefTable 作为侧输入是否是正确的解决方案。有几个问题我无法回答,这真的让我感到困惑。我想知道是否有人能指出我正确的方向,特别是关于管道引擎盖下发生的事情。:

  1. 侧输入是否会在管道生命周期内保持并可用?特别是在我的情况下,如果我通过BigQueryIO加载我的 RefTable,使其成为侧面输入,它是否可用于所有无界的 RawEvent?或者就像如果窗口过去了,那么 RefTable 就消失了? (另外,我什至不知道我是否需要在 RefTable 上应用窗口/触发,因为它是有界数据)

  2. 输入的另一侧是无界的 RawEvent,需要窗口/触发。那么,是否也需要对 RefTable 进行窗口化/触发?

  3. 如何在管道中指定 RefTable 需要每天重新加载一次?

更新:根据@jkff的黑客方式,我想出了以下工作版本。它可以做的是:每 1 分钟加载一次 RefTable,这非常接近我的目标,但还没有。

我无法在我的"Load Ref Table"ParDo中使用BigQueryIO的方法。因此,我必须以某种方式通过使用 BigQuery 客户端库重新发明轮子。然后它增加了更多麻烦,担心异常处理等。

通过使用固定窗口生成"即时报价",我的"过滤器"转换只会在窗口结束时触发。实际上,我想要的是每 1 小时(或更长时间)重新加载 RefTable,并且每次完全加载 RefTable 时(通常只需不到 1 分钟即可完成加载),过滤器应该启动并应用于任何具有较晚时间戳的原始数据。现在,使用我的代码,如果我将刻度窗口从 1 分钟更改为 1 小时,我需要等待 1 小时才能触发"过滤器"转换,即使实际加载 RefTable 只需要不到一分钟的时间。

假设有一种方法可以在"ticks"窗口结束之前提前触发"过滤器",我如何根据 RefTable 加载的补充来定义触发器?(我的意思是,每当 RefTable 全部加载时触发"过滤器",并且还)

4,最后但并非最不重要的导入是:当下一次重新加载即将开始时,我需要保留新的原始数据,直到重新加载完成。 因为所有新数据都在下一个小时内出现,以便根据刷新的 RefTable 进行过滤。

您能否对此提供任何见解。真的很感激!

PCollection<Long> ticks = p
// Produce 1 "tick" per 1 second
.apply(GenerateSequence.from(0).withRate(1L,Duration.standardSeconds(1)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))
)
// Use an arbitrary per-window combiner to reduce to 1 element per window
.apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());
// Produce a collection of  maps, 1 per each 1-minute window
PCollectionView<Map<String,String>> banedDeviceMapView = ticks
.apply("Load Ref Table"
,ParDo.of(new DoFn<Long,KV<String,String>>(){
@ProcessElement
public void processElement(ProcessContext c)
{
TableId table = TableId.of("project","dataset","RefTable");
TableResult tableData =
BIGQUERY_CLIENT.listTableData(table,GetSchema());
Map<String,String> resultMap = new HashMap();
for (FieldValueList row : tableData.iterateAll()) {
Object key = row.get("HardwareId").getValue();
if(key!=null)
{
String hardwareId = (String)key;
resultMap.putIfAbsent(hardwareId,hardwareId);
}
}
int num = 0;
for (Map.Entry<String, String> entry : resultMap.entrySet()) {
c.output(KV.of(entry.getKey(),entry.getValue()));
num++;
}
System.out.println(num + " banded devices have been loaded.");
}
})
)
.apply(View.asMap());

PCollection<KafkaRecord<String, GPS>> rawLoad = p.apply("Read Events from Kafka"
, KafkaIO.<String, GPS>read()
.withBootstrapServers("localhost:9092")
.withTopic(SOURCE_GPS_TOPIC_NAME)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(GPSEventAvroDeserializer.class)
.updateConsumerProperties(ImmutableMap.of(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
);
PCollection<KV<String, GPS>> validGps = rawLoad.apply("Extract Gps from Kafka", ParDo.of(
new DoFn<KafkaRecord<String, GPS>, KV<String, GPS>>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.print("KafkaRecord: KV.of("+ c.element().getKV().getKey());
System.out.println("," + c.element().getKV().getValue().getSpeed() + ")");
c.output(c.element().getKV());
}
}))
.apply("Windowing Raw Events",Window.into(FixedWindows.of(Duration.standardSeconds(1)))
)
.apply("Filter",ParDo.of(
new DoFn<KV<String,GPS>,KV<String,GPS>>(){
@ProcessElement
public void processElement(ProcessContext c){
Map<String,String> bandedDevices = c.sideInput(banedDeviceMapView);
String deviceId = c.element().getKey();
System.out.print("Checking device: "+ deviceId);
System.out.println(" - in bandedDevices? " + bandedDevices.containsKey(deviceId));
if(!bandedDevices.containsKey(deviceId)){
c.output(c.element());
}else{
System.out.println("Device " + deviceId + " is removed from results");
}
}
}).withSideInputs(banedDeviceMapView)
);

请参阅此JIRA问题,其中讨论了此问题,并链接到提出了临时解决方案的StackOverflow问题。

最新更新