Apache Flink 的异步操作迭代流效果不佳



我使用flink(1.11.1)向外部源请求信息。我有两个不同的管道,共享80%的代码,我有第一个作业是ETL,它开始和结束,第二个是实时处理webhook,并且总是在运行。在第二个管道中,我使用kafka作为源来消费和处理外部源中的更改,并且我有一个步骤,我使用迭代转换与异步i/O操作符,不太好地工作。

在kafka中消耗大量消息一段时间后,迭代开始出现问题,它不迭代但也不关闭迭代器。kafka消费者继续消费消息,元素继续通过管道传输,直到迭代。

我在这里分享代码

DataStream<DataContainer<ConnectionWebhook>> connections = env.addSource(getKafkaConsumer(properties)).setParallelism(1)
.map(new StringKafkaMessageMap()).name("StringKafkaMessageMap")
.map(new KafkaMessageConnectionMap()).name("KafkaMessageConnectionMap");
DataStream<DataContainer<ConnectionWebhook>> verifyConnection = AsyncDataStream.unorderedWait(connections, new VerifyConnection(), 30000, TimeUnit.MILLISECONDS, 1).name("VerifyConnection");
DataStream<DataContainer<ConnectionWebhook>> connectionSuccessfully = verifyConnection.filter(new FilterConnectionWithoutError()).name("FilterConnectionWithoutError");
DataStream<DataContainer<ConnectionWebhook>> connectionUnsuccessfully = verifyConnection.filter(new FilterConnectionWithError()).name("FilterConnectionWithError");
DataStream<DataContainer<Tuple2<ConnectionWebhook, Map<String, Object>>>> connectionUnsuccessfullyError = connectionUnsuccessfully.map(new connectionUnsuccessfullyMap()).name("connectionUnsuccessfullyMap");

DataStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> initialCustomFieldRequest = connectionSuccessfully.map(new InitialCustomFieldMap()).name("InitialCustomFieldMap");
IterativeStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> iterativeCustomField = initialCustomFieldRequest.iterate();
DataStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> customField = AsyncDataStream.unorderedWait(iterativeCustomField, new AsyncCustomField(), 30000, TimeUnit.MILLISECONDS, 1).name("AsyncCustomField");
DataStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> withPendingCustomFields = customField.filter(new WithPendingCustomFields()).name("WithPendingCustomFields");
DataStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> withoutPendingCustomFields = customField.filter(new WithoutPendingCustomFields()).name("WithoutPendingCustomFields");
iterativeCustomField.closeWith(withPendingCustomFields);
DataStream<DataContainer<Tuple2<ConnectionWebhook, Map<String, Object>>>> initialIssueRetrieval = initialCustomFieldRequest.map(new InitialIssueRetrieval()).name("InitialIssueRetrieval");

迭代的一个可能的问题是,一旦在迭代循环中存在回压,它可能会导致僵局,因为来自迭代结束的记录不能被发送到迭代的头部,并且迭代的头部在迭代处理完记录之前无法接收新记录。

通常情况下,你只会遇到这种情况,如果(a)你的迭代为每条记录生成多条记录,或者(b)记录循环多次,因此N个先前记录的组合超过了网络缓冲容量。

一个短期的解决方案是增加你的网络缓冲区的大小,但这并不能解决潜在的问题。我们做了一些hack(在基于dataset的迭代中)来限制传入的记录(在迭代之前创建对源的背压),但它非常糟糕。

最新更新