我正在尝试评估apache flink是否可用于分布式事件驱动系统(仅一次)。用例是用户注册了订阅,并希望更改为其他订阅。
当用户单击提交按钮时,有两个单独的进程异步运行。一个进程取消现有订阅,而另一个进程注册新订阅。触发这两个事件后,将发送电子邮件通知。
我已经设法使用 RabbitMQ 连接器在 apache flink 中创建两个流。当我尝试使用滑动窗口将这些流连接在一起时,窗口中的每张幻灯片都会复制事件。我尝试在加入的流上设置值状态描述符,但这似乎在窗口过后不会过期。
此外,我需要检测流中未配对的事件,并将此事件发送到不同的 RabbitMQ 接收器,以应对由于进程未成功完成而未触发事件的情况。
您对如何实现上述功能有任何提示/想法吗?
final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
final RMQConnectionConfig rmqConnectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("admin")
.setPassword("password")
.build();
final DataStream<String> cancellation = environment
.addSource(new RMQSource<>(rmqConnectionConfig, "scratchpad-cancellation", true, new SimpleStringSchema()))
.setParallelism(1);
final DataStream<String> subscription = environment
.addSource(new RMQSource<>(rmqConnectionConfig, "scratchpad-subscription", true, new SimpleStringSchema()))
.setParallelism(1);
cancellation
.join(subscription)
.where(value -> value).equalTo(value -> value)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.seconds(15)))
.apply((left, right) -> left)
.keyBy(value -> value)
.process(new ProcessFunction<String, String>() {
private ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class);
private ValueState<Boolean> state;
@Override
public void open(Configuration parameters) {
state = this.getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
if (BooleanUtils.isNotTrue(state.value())) {
state.update(true);
out.collect(value);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES));
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) {
state.clear();
}
})
.print()
.setParallelism(1);
environment.execute();
如果窗口输出上有重复的值,则可以在已定义的滑动窗口之后将reduce函数添加到另一个窗口中,在大多数情况下应该是engough。但我认为这应该是一个比这更好的解决方案,但我们需要一个示例来改进您的代码。
另一方面,如果您需要检测非配对事件,我认为您需要使用 CoGroup 运算符,而不是使用连接。