独立的Kafka Spark Sinks(多个制作人和经纪人)



所以我在将 JSON 发送到多个主题和不可靠的 kafka 代理时遇到了 Spark Streaming 中的 Kafka Sinks 问题。以下是代码的某些部分:

val kS = KafkaUtils.createDirectStream[String, TMapRecord]
(ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT))

然后我迭代RDD的

kSMapped.foreachRDD {
rdd: RDD[TMsg] => {
rdd.foreachPartition {
part => {
part.foreach { ........... 

在里面我做

kafkaSink.value.send(kafkaTopic, strJSON)
kafkaSinkMirror.value.send(kafkaTopicMirrorBroker, strJSON)

镜像代理关闭时,整个流应用程序都在等待它,我们不会向主代理发送任何内容。

你会如何处理它?

对于您提出的最简单的解决方案,想象一下我只是跳过了要发送给已关闭的代理的消息(例如,这是案例 1(

对于 CASE 2,我们会做一些缓冲。

附言稍后我将使用 Kafka Mirror,但目前我没有这样的选项,所以我需要在代码中制作一些解决方案。

我发现了这个问题的几个决定:

  1. 您可以在工作线程和检查点上使用引发任何超时异常。Spark 尝试多次重新启动错误任务,spark.task.maxFailures属性中所述。可以增加重试次数。如果流式处理作业在最大重试次数后失败,则只需在代理可用时从检查点重新启动作业。或者,您可以在作业失败时手动停止作业。
  2. 您可以配置背压spark.streaming.backpressure.enabled=true,仅允许接收数据的速度与处理数据的速度一样快。
  3. 您可以将两个结果发送回您的技术 Kafka 主题,稍后使用另一个流式处理作业来处理它。
  4. 您可以为这种情况创建 Hive 或 Hbase 缓冲区,并在以后以批处理模式发送未经处理的数据。

最新更新