我正在扩展Apache Flink的KeyedProcessFunction
来定义工作流。我的工作流程由大约 10-15 个处理器组成。所有其他处理器 collector.collect(T( 在 1 秒内完成。而在最坏的情况下,一个特定的ProcessFuntion需要超过150秒。此进程函数发出与其他进程函数相同类型的有效负载。有效负载的大小也与其他处理器非常相似。我还在每个keyedProcessFunction之后依赖KeyBy((。KeyBy(( 对所有 processfunction 都有相同的定义,并且在整个 workFlow 中依赖于相同的属性。
如何调试/解决收集器占用这么多时间的问题?
我正在使用 Flink 1.8.0。
public class AProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
@Override
public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
try {
if(Contant.Foo.equals(foo.y)) {
collect(foo, out);
return;
}
work(foo);
collectEventTimePayload(foo, out);
} catch (Exception e) {
log.error("error occurred while processing {} with exception", x, e);
}
}
@Timed(aspect = "ProcessFunctionWork")
private void work(Foo foo) {
//some business logic. In worst casem time taken is 400 ms.
}
@Timed(aspect = "AProcessFunctionCollector")
private void collect(Foo foo, Collector<Foo> out) {
out.collect(foo);
}
@Timed(aspect = "AProcessFunctionEventTimeCollector")
private void collectEventTimePayload(Foo foo, Collector<Foo> out) {
if(CollectionUtils.isNotEmpty(foo.ids())){
collect(foo, out);
}
}
}
public class BProcessFunction extends KeyedProcessFunction<String, Foo, Foo> {
private final ProviderWorker providerWorker;
@Override
public void processElement(Foo foo, Context ctx, Collector<Foo> out) {
try {
if(!handleResourceIdExceptions(foo, out)) {
Optional<Foo> workedFoo = providerWorker.get(foo.getEventType())
.work(foo);
if (workedFoo.isPresent()) {
collectorCollect(workedFoo.get(), out);
return;
}
}
collectorCollect(foo, out);
} catch (Exception e) {
log.error("error occurred while processing {} with exception", foo, e);
}
}
@Timed(aspect = "BProcessFunctionCollector")
private void collectorCollect(Foo foo, Collector<Foo> out) {
out.collect(foo);
}
}
AProcessFunction.collect(( 在最坏的情况下需要 150 秒。而BProcessFunction需要<30ms。 我的工作流程是
dataStream.keyBy(fooKeyByFunction).process(someOtherProcessFunction).keyBy(fooKeyByFunction).process(aProcessFunction).keyBy(fooKeyByFunction).process(bProcessFunction).keyBy(fooKeyByFunction).process(cProcessFunction).keyBy(fooKeyByFunction).sink(sink);
收集器方法究竟是做什么的?它是否包括消息写入缓冲区之前的时间,或者它包括下一个任务的输入缓冲区被填充之前的时间?
Collector.collect
以阻塞方式将数据写入缓冲区,这些缓冲区通过网络异步发送到相应的任务。因此,所需的时间取决于序列化时间 + 如果所有缓冲区都用完了,则等待空闲缓冲区的时间。缓冲区只有在通过网络发送到下游任务后才可用。如果该任务遇到瓶颈,则意味着缓冲区无法立即发送并背压。
在您的情况下,我怀疑您确实有背压(在 Web UI 中很容易看到(,并且缓冲区需要很长时间才能可用。背压有两种常见情况:
- 从接收器:如果写入数据所需的时间比生成数据花费的时间长,则最终整个 DAG 将背压,并且整个处理速度将变为接收器的写入速度。解决方案可能是使用不同的水槽或加强目标系统。
- 从数据倾斜:其中一个键使用几个值占主导地位的键。这意味着您的整个并行性实际上归结为这几个值。每个键只能由一个子任务处理(一致性保证(。然后,此子任务将重载,而该特定流程函数的其他子任务或多或少处于空闲状态。此处的解决方案是使用不同的键或使用一些支持预聚合的聚合。
在这两种情况下,起点都是缩小 Web UI 的问题范围。很乐意为您提供更多信息。
注意:从您的消息来源来看,我认为根本不需要keyBy
。如果没有keyBy
,您可能会获得更好的并行性,并且应该更快。