如何连接流和数据集?我有一个流,我在一个文件中有一个静态数据。我想使用文件中的数据来丰富流的数据。
示例:在流中,我得到机场代码,在文件中,我有机场的名称和文件中的代码。现在我想将流数据加入到文件中,以形成一个具有机场名称的新流。请提供如何实现这一目标的步骤。
根据具体需求,有很多方法可以使用Flink来实现流富集。https://www.youtube.com/watch?v=cJS18iKLUIY是Konstantin Knauf的一篇精彩演讲,涵盖了许多不同的方法,以及它们之间的权衡。
在富集数据不可变且相当小的简单情况下,我只需要使用RichFlatMap
并在open()
方法中加载整个文件。看起来像这样:
public class EnrichmentWithPreloading extends RichFlatMapFunction<Event, EnrichedEvent> {
private Map<Long, SensorReferenceData> referenceData;
@Override
public void open(final Configuration parameters) throws Exception {
super.open(parameters);
referenceData = loadReferenceData();
}
@Override
public void flatMap(
final Event event,
final Collector<EnrichedEvent> collector) throws Exception {
SensorReferenceData sensorReferenceData =
referenceData.get(sensorMeasurement.getSensorId());
collector.collect(new EnrichedEvent(event, sensorReferenceData));
}
}
您可以在中找到更多其他方法的代码示例https://github.com/knaufk/enrichments-with-flink.
更新:
如果你更愿意预加载一些更大的、分区的引用数据来加入流,那么有几种方法可以实现这一点,其中一些方法在我上面分享的视频和repo中有所介绍。对于那些特定的需求,我建议使用自定义的partitioner;在同一个github回购中有一个例子。其思想是对丰富数据进行分片,并将每个流式事件导向具有相关引用数据的实例。
在我看来,这比试图让Table API作为联接来进行这种特定的丰富更简单。