Apache Flink使用Java -性能问题



我们有一个用Java编写的flink应用程序,运行在AWS Kinesis Data Analytics上。应用程序从AWS托管服务Kafka (Kafka主题1)读取输入流,然后应用业务逻辑(一些计算),最后将输出写到另一个Kafka主题(Kafka主题2)。

并行度为10,主题有15个分区。期望在5分钟内处理~20K并发数据。但是经过所有的优化之后,我们可以在25分钟内将它的速度提高到~20K并发数据。

你能让我知道是否有任何其他的性能优化可以实现的目标。

Flink异步I/O是否将成为进一步优化的选项?

示例代码:

StreamExecutionEnvironment streamenv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> initialStreamData = streamenv
.addSource(new FlinkKafkaConsumer<>(
TOPIC_NAME, 
new ObjectNodeJsonDeSerializerSchema(),
kafkaConnectProperties);
initialStreamData.print();
DataStream<POJO> rawDataProcess = initialStreamData
.rebalance()
.flatMap(new ReProcessingDataProvider())
.keyBy(value -> value.getPersonId());
rawDataProcess.print();
DataStream<POJO> cgmStream = rawDataProcess
.keyBy(new ReProcessorKeySelector())
.rebalance()
.flatMap(new SgStreamTask());
cgmStream.print();
DataStream<POJO> artfctOverlapStream = null;
artfctOverlapStream = cgmStreamData
.keyBy(new CGMKeySelector())
.countWindow(2, 1)
.apply(new ArtifactOverlapProvider()); //the same person_id key
cgmStreamData.print();
DataStream<POJO> streamWithSgRoc = null;
streamWithSgRoc = artfctOverlapStream
.keyBy(new CGMKeySelector())
.countWindow(7, 1)
.apply(new SgRocProvider()); // the same person_id key 
streamWithSgRoc.print();
DataStream<POJO> cgmExcursionStream = null;
cgmExcursionStream = streamWithSgRoc
.keyBy(new CGMKeySelector())
.countWindow(Common.THREE, Common.ONE)
.apply(new CGMExcursionProviderStream()); //the same person_id key
cgmExcursionStream.print();
cgmExcursionStream
.addSink(new FlinkKafkaProducer<CGMDataCollector>(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC),
new CGMDataCollectorSchema(),
kafkaConnectProperties));

在您所分享的内容中,我没有看到任何可以解释低吞吐量的内容。但是既然你问过异步i/o,我想知道flatmap是否正在做一些外部i/o。如果是这样,那就解释得通了。如果是这种情况,那么使用异步i/o应该会有很大帮助,前提是外部服务可以处理增加的负载。

我也想知道为什么并行度是10,以及这10个插槽有什么资源可用。有足够的地核来维持一切的运转吗?对于15个分区,有5个插槽分别处理两个分区,另外5个插槽分别处理一个分区。5、8和15是比较明显的并行度选择。(当然,如果每个槽也在平面映射中击中外部服务,也需要考虑到这一点。)

查看代码后更新:

你可以做一些简单的事情来加快速度。

要做的一件事是给集群更多的资源。您可以保持并行性不变,但通过将任务管理器放在具有更多内核的机器上,为每个插槽提供更多的内核。

但在此之前,看看优化管道。rebalance和keyBy都非常昂贵,而且您使用它们的次数超出了必要的范围。一个keyBy紧跟着一个rebalance是没有意义的,两个keyBy紧接一个也是没有意义的。

Rebalance对流进行往返重新分区。通常在更改并行性或需要克服数据倾斜时执行。Rebalance几乎从不使用,除非在需要更改并行性时隐式使用。

KeyBy执行基于键的重分区。如果一个键by紧跟着另一个键by,第二个键by将撤消第一个键by所做的一切。

keyBy和rebalance都需要序列化和反序列化每个事件,并通过网络堆栈发送它们。只有在绝对必要的情况下,你才需要这样做。

修复这些rebalance/keyBy问题将减少集群上的工作负载。如果这还不足以达到期望的吞吐量,那么给每个插槽更多的内核(以便管道的各个阶段可以并行运行)应该可以达到目的。

最新更新