我有一个Apache flink用例,其工作原理如下:
我有数据事件通过第一个流传入。每个事件的一部分都是外键,我希望从第二个流中获得数据。例如:我正在第一个流中获取主要城市的数据,该流具有城市代码,我需要通过第二个流流传输的该城市代码随时间的平均温度。不可能对所有可能的城市进行温度流式传输,我们必须请求我们需要数据的城市。
因此,我们需要一些方法来";"通知";我们需要这个城市的数据的第二个流源";"推";当我们在第一条溪流中第一次遇到它时。
如果可以从第一个流中完成此通知,这将很容易。问题是,第二个流是通过一个websocket到达我们这里的,其中的一部分是一个控制通道,我们必须通过它来发出请求,所以必须从第二个流量发出请求。
- 检查第一个流中的事件。读取城市代码x
- 我们看到这个城市代码了吗?如果没有,请通知第二个流,我们需要城市代码x的数据
- 第二个流向x的数据源发送消息
- 数据开始流入城市x,该城市用于连接下游
如果第一个流的通知是可能的,这将很容易——我本可以从第二步开始,这样数据就开始在第二个流中流动。但这是不可能的,因为请求需要在提供第二个流的同一个websocket连接上发送。
我已经探索过使用CoProcessFunction或RichCoMapFunction来实现这一点,但尚不清楚如何实现。我看到了一些广播状态模式的例子,但即使这样似乎也不符合用例。
有人能帮我提出一些可能的解决方案吗?
所以我使用了副输出流的建议。感谢@whatisinthame和@kkrugler的建议。
仍在试图弄清楚细节,但以下是的摘要
- 从通知流(流1(创建一个辅助输出流(流1-1(
- 使用
KeyedProcessFunction
的扩展类(TempRequester
(来处理侧输出流1-1并从中创建流2。KeyedProcessFunction具有websocket连接 - 在
KeyedProcessFunction
的open
方法中,创建到websocket的连接(握手等(。具有ListState
状态以保留城市代码列表 - 在
TempRequester
的processElement
功能中,检查来自侧输出流1-1的城市代码。如果存在于ListState中,则不执行任何操作。否则,通过websocket控制通道发送消息,请求城市数据,并将代码添加到ListState
。创建一个进程计时器(这是一次(,在500毫秒左右后启动。websocket服务器非常频繁地写入临时数据,并将其保存在队列中 - 在
onTimer
方法中,检查队列,读取数据并推出(out.collect...
(。再次创建计时器。因此,从本质上讲,一旦第一个城市代码进入,我们就会创建一个每500毫秒运行一次的计时器,并将接收到的记录转储到第二个流中
现在第一个和第二个流可以在下游连接(我使用了表API(。
不确定这是否是最优雅的解决方案,但它确实奏效了。谢谢你的建议。
这是大致的主代码:
DataStream<Event> notificationStream =
env.addSource(this.notificationSource)
.returns(TypeInformation.of(Event.class));
notificationStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
final OutputTag<String> outputTag = new OutputTag<String>("cities-seen"){};
SingleOutputStreamOperator<Event> mainDataStream = notificationStream.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(
Event value,
Context ctx,
Collector<Event> out) throws Exception {
// emit data to regular output
out.collect(value);
// emit data to side output
ctx.output(outputTag, event.cityCode);
}
});
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
DataStream<TemperatureData> temperatureStream = sideOutputStream
.keyBy(value -> value)
.process(new TempRequester());
temperatureStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
// set up the Java Table API and rest of SQL based joins ...
以及TempRequester
(ProcessFunction
(的近似代码:
public static class TempRequester extends KeyedProcessFunction<String, String, TemperatureData> {
private ListState<String> allCities;
private volatile boolean running = true;
//This is the queue for requesting city codes
private BlockingQueue<String> messagesToSend = new ArrayBlockingQueue<>(100);
//This is the queue for receiving temperature data
private ConcurrentLinkedQueue<TemperatureData> messages = new ConcurrentLinkedQueue<TemperatureData>();
private static final int TIMEOUT = 500;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
allCities = getRuntimeContext().getListState(new ListStateDescriptor<>("List of cities seen", String.class));
... rest of websocket client setup code ...
}
@Override
public void close() throws Exception {
running = false;
super.close();
}
private boolean initialized = false;
@Override
public void processElement(String cityCode, Context ctx, Collector<TemperatureData> collector) throws Exception {
boolean citycodeFound = StreamSupport.stream(allCities.get().spliterator(), false)
.anyMatch(s -> cityCode.equals(s));
if (!citycodeFound) {
allCities.add(cityCode);
messagesToSend.put(.. add city code ..);
if (!initialized) {
ctx.timerService().registerProcessingTimeTimer(ctx.timestamp()+ TIMEOUT);
initialized = true;
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<TemperatureData> out) throws Exception {
TemperatureData p;
while ((p = messages.poll()) != null) {
out.collect(p);
}
ctx.timerService().registerProcessingTimeTimer(ctx.timestamp() + TIMEOUT);
}
}