嗨,我正在尝试运行一个flink作业,它应该处理以下的传入数据。在keyBy()
之后的流程运算符中,应根据数据中的某些属性需要太多时间。即使传入数据具有不同的ID(用于keyBy()
流),过程功能中的较长处理代码会阻止其他传入数据。我的意思是整个流。
SingleOutputStreamOperator<Envelope> processingStream = deviceStream
.map(e -> (Envelope) e)
.keyBy((KeySelector<Envelope, String>) value -> value.eventId) // key by scenarios
.process(new RuleProcessFunction());
在RuleProcessFunction.java
中:
...
@Override
public void processElement(Envelope value, Context ctx, Collector<Envelope> out) throws Exception {
//handleEvent(value, ctx, out);
if (value.getEventId().equals("I")) {
System.out.println("hello i");
for (long i = 0; i < 10000000000L; i++) {
}
}
out.collect(value);
}
我希望长期运行的代码块不应阻止整个流。我知道可以阻止IO情况的异步功能,但我不知道这是正确的解决方案。
由于您没有从卡桑德拉(Cassandra)这样的外部数据库中提取数据,因此我认为您不需要使用asyncfunction。
您正在使用单个并行性运行flink作业。尝试增加并行性,以便一个核心对所有处理以及接收数据不负责。当然,如果您这样做,仍然可能会有背压。因为如果负责从源中摄取数据的核心要比运行流程函数flink的背压处理的核心更快地读取数据的速度将减慢摄入速度。