我的要求是将数据发送到不同的ES接收器(基于数据(。例如:如果数据包含特定信息,则将其发送到sink1,否则将其发送至sink2等(基本上是根据数据将其动态发送到任何一个sink(。我还想单独设置ES sink1,ES sink2,ES sink3等的平行度。
-> Es sink1 (parallelism 4)
Kafka -> Map(Transformations) -> ES sink2 (parallelism 2)
-> Es sink3 (parallelism 2)
在flink中有什么简单的方法可以实现以上目的吗?
我的解决方案:(但不满意(
我可以想出一个解决方案,但有一些中间的kafka主题是我写的(topic1、topic2、topic3(,然后有单独的Essink1、Essink2和ESsink3管道。我想避免写这些中间的卡夫卡主题。
kafka -> Map(Transformations) -> Kafka topics (Insert into topic1,topic2,topic3 based on the data)
Kafka topic1 -> Essink1(parallelism 4)
Kafka topic2 -> Essink2(parallelism 2)
Kafka topic3 -> Essink3(parallelism 2)
您可以使用带有侧输出[2]的ProcessFunction[1]以n方式拆分流,然后将每个侧输出流连接到适当的接收器。然后在每个信宿上调用setParallelism()
[3]。
[1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html#the-processfunction
[2]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
[3]https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html#operator-水平