使用Spark Streaming的FIFO处理



我有一个用例,其中我必须以FIFO方式处理事件。这些是机器生成的事件。每台机器每30秒生成一个事件。对于特定的机器,我们需要根据FIFO方式处理事件。

我们每天需要处理大约2.4亿个事件。对于如此大规模的应用,我们需要使用Kafka+Spark Streaming

从Kafka文档中,我了解到我们可以使用消息的关键字段将消息路由到特定的主题分区。这确保了我可以使用机器id作为密钥,并确保来自特定机器的所有消息都进入同一主题分区。

50%的问题已解决。

处理端的问题来了。

KafkaDirect方法的sparkDocumentation说RDD分区相当于Kafka分区。

所以当我执行rdd.foreachPartition时,任务是否按顺序迭代?

是否确保RDD的分区始终位于一个执行器中?

是否确保foreachPartition任务只由一个线程对整个分区执行?

请帮忙。

假设您不使用任何重新划分数据的运算符(例如,repartitionreduceByKeyreduceByKeyAndWindow…)。

所以当我执行rdd.foreachPartition时,任务是否以有序的方式迭代?

是。它按照Kafka分区中的顺序处理数据。

是否确保RDD的分区始终位于一个执行器中?

是。如果不启用speculation,则只有一个执行器(任务)在处理分区。如果速度太慢,speculation可能会启动另一个任务来运行同一个分区。

是否确保foreachPartition任务只由一个线程对整个分区执行?

是。它逐个处理一个分区中的数据。

从Kafka文档中,我了解到我们可以使用消息的关键字段将消息路由到特定的主题分区。这确保了我可以使用机器id作为密钥,并确保来自特定机器的所有消息都进入同一主题分区。

在向Kafka发布数据时,您不需要使用机器id。使用null作为密钥,Kafka将在内部使用哈希分区方案将数据适当地发送到不同的Kafka主机。

这是处理端的问题。

Gotcha:当你在spark中处理时,它不会有全局订单。示例:有5个事件(按时间排序):e0(最早)、e1、e2、e3、e4(最新)

这些被路由到不同的kafka分区:

Kakfa Partition P0: e0, e3 Kafka Partition P1: e1, e2, e4

因此,当你在你的spark工作中阅读时,你会在一个RDD中得到e0, e3,在另一个RDD中得到e1, e2, e4,按这个顺序。

如果你想要全局排序,(e0,e1,e2,e3,e4),你需要在kafka中写入单个分区。但随后您将失去分区容忍度,并遇到一些性能问题(需要调整生产者和消费者)。3000个事件/秒应该可以,但这也取决于你的kafka集群。

@zsxwing已经回答了您的其他问题(请参阅)

最新更新