弗林克上的非阻止流媒体



嗨,我正在尝试运行一个flink作业,它应该处理以下的传入数据。在keyBy()之后的流程运算符中,应根据数据中的某些属性需要太多时间。即使传入数据具有不同的ID(用于keyBy()流),过程功能中的较长处理代码会阻止其他传入数据。我的意思是整个流。

SingleOutputStreamOperator<Envelope> processingStream = deviceStream
    .map(e -> (Envelope) e)
    .keyBy((KeySelector<Envelope, String>) value -> value.eventId) // key by scenarios
    .process(new RuleProcessFunction());

RuleProcessFunction.java中:

...
@Override
public void processElement(Envelope value, Context ctx, Collector<Envelope> out) throws Exception {
    //handleEvent(value, ctx, out);
    if (value.getEventId().equals("I")) {
        System.out.println("hello i");
        for (long i = 0; i < 10000000000L; i++) {
        }
    }
    out.collect(value);
}

我希望长期运行的代码块不应阻止整个流。我知道可以阻止IO情况的异步功能,但我不知道这是正确的解决方案。

由于您没有从卡桑德拉(Cassandra)这样的外部数据库中提取数据,因此我认为您不需要使用asyncfunction。

您正在使用单个并行性运行flink作业。尝试增加并行性,以便一个核心对所有处理以及接收数据不负责。当然,如果您这样做,仍然可能会有背压。因为如果负责从源中摄取数据的核心要比运行流程函数flink的背压处理的核心更快地读取数据的速度将减慢摄入速度。

相关内容

  • 没有找到相关文章

最新更新