管道简单代码是研究员:
source = env.addSource(kafkaConsumer)
.map(func).setParallelism(2).sink()
如何确定出局的顺序?
首先,假设示例中的其他所有内容都具有 1 的并行度,并且只有 map 函数将并行运行。(尽管要实际实现这一点,它必须在某个地方进行配置;默认并行度高于 1。
我们还假设您的 Kafka 使用者正在读取具有一个分区的单个主题,并且您正在询问如何实现保留输入中存在的顺序的并行转换。
有了这些假设,答案是你无能为力。映射运算符的两个实例之间存在争用,非并行接收器将以任意方式交错这两个传入流。
如果流记录以某种方式标记,例如使用升序时间戳或 ID,那么您可以假设引入一些缓冲并重新建立原始排序,无论是在自定义接收器中还是在映射和接收器运算符之间的非并行 RichCoMap 函数中。
另一方面,如果您的源以某种方式分区或键控,并且您只需要在每个键的基础上维护或建立排序,那么有更好的答案。