通过读取 kafka 的详细信息来创建动态 flink 窗口



假设 Kafka 消息包含 flink 窗口大小配置。

我想阅读来自 Kafka 的消息并在 flink 中创建一个全局窗口。

问题陈述:

我们可以通过使用广播流来处理上述情况吗?

还有其他方法可以支持上述情况吗?

Flink 的窗口 API 不支持动态更改窗口大小。

您需要做的是使用进程函数实现自己的窗口。在本例中为 KeyedBroadcastProcessFunction,其中窗口配置是广播的。

你可以检查 Flink 训练,了解如何使用 KeyedProcessFunction 实现时间窗口的示例(复制如下(:

public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
// Keyed, managed state, with an entry for each window.
// There is a separate MapState object for each sensor.
private MapState<Long, Integer> countInWindow;
boolean eventTimeProcessing;
int durationMsec;
/**
* Create the KeyedProcessFunction.
* @param eventTime whether or not to use event time processing
* @param durationMsec window length
*/
public PseudoWindow(boolean eventTime, int durationMsec) {
this.eventTimeProcessing = eventTime;
this.durationMsec = durationMsec;
}
@Override
public void open(Configuration config) {
MapStateDescriptor<Long, Integer> countDesc =
new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
countInWindow = getRuntimeContext().getMapState(countDesc);
}
@Override
public void processElement(
KeyedDataPoint<Double> dataPoint,
Context ctx,
Collector<KeyedDataPoint<Integer>> out) throws Exception {
long endOfWindow = setTimer(dataPoint, ctx.timerService());
Integer count = countInWindow.get(endOfWindow);
if (count == null) {
count = 0;
}
count += 1;
countInWindow.put(endOfWindow, count);
}
public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
long time;
if (eventTimeProcessing) {
time = dataPoint.getTimeStampMs();
} else {
time = System.currentTimeMillis();
}
long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);
if (eventTimeProcessing) {
timerService.registerEventTimeTimer(endOfWindow);
} else {
timerService.registerProcessingTimeTimer(endOfWindow);
}
return endOfWindow;
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
// Get the timestamp for this timer and use it to look up the count for that window
long ts = context.timestamp();
KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
out.collect(result);
countInWindow.remove(timestamp);
}
} 

最新更新