来自Kafka的Spark DStream总是从头开始



看看我对解决方案的接受答案的最后一条评论

我配置了一个这样的DStream

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "kafka1.example.com:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"group.id" -> "mygroup",
"specific.avro.reader" -> true,
"schema.registry.url" -> "http://schema.example.com:8081"
)
val stream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
Subscribe[String, DataFile](topics, kafkaParams)
)

虽然这有效并且我按预期获得了DataFile,但当我停止并重新运行作业时,它总是从主题的开头开始。我怎样才能实现它继续上次关闭的地方?

跟进 1

正如Bhima Rao Gogineni的回答一样,我像这样更改了我的配置:

val consumerParams =
Map("bootstrap.servers" -> bootstrapServerString,
"schema.registry.url" -> schemaRegistryUri.toString,
"specific.avro.reader" -> "true",
"group.id" -> "measuring-data-files",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"enable.auto.commit" -> (false: JavaBool),
"auto.offset.reset" -> "earliest")

我设置了流:

val stream = KafkaUtils.
createDirectStream(ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, DataFile](List(inTopic), consumerParams))

然后我处理它:

stream.
foreachRDD { rdd =>
... // Do stuff with the RDD - transform, produce to other topic etc.
// Commit the offsets
log.info("Committing the offsets")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)        
}

但它仍然总是在重新运行时从头开始。

以下是我的卡夫卡日志的摘录:

跑步:

[2018-07-04 07:47:31,593] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 22 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,594] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,599] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 23 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:06,690] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 131488999 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:06,690] INFO [Log partition=data-0, dir=E:confluent-4.1.1datakafka] Rolled new log segment at offset 131488999 in 1 ms. (kafka.log.Log)
[2018-07-04 07:48:10,788] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Member consumer-1-262ece09-93c4-483e-b488-87057578dabc in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 24 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:45,761] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 153680971 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:45,763] INFO [Log partition=data-0, dir=E:confluent-4.1.1datakafka] Rolled new log segment at offset 153680971 in 3 ms. (kafka.log.Log)
[2018-07-04 07:49:24,819] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 175872864 (kafka.log.ProducerStateManager)
[2018-07-04 07:49:24,820] INFO [Log partition=data-0, dir=E:confluent-4.1.1datakafka] Rolled new log segment at offset 175872864 in 1 ms. (kafka.log.Log)

下次运行:

[2018-07-04 07:50:13,748] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 24 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,749] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,754] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 25 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Member consumer-1-906c2eaa-f012-4283-96fc-c34582de33fb in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 26 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)

跟进 2

我使保存偏移量更加冗长,如下所示:

// Commit the offsets
log.info("Committing the offsets")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
if(offsetRanges.isEmpty) {
log.info("Offset ranges is empty...")
} else {
log.info("# offset ranges: %d" format offsetRanges.length)
}
object cb extends OffsetCommitCallback {
def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata],
exception: Exception): Unit =
if(exception != null) {
log.info("Commit FAILED")
log.error(exception.getMessage, exception)
} else {
log.info("Commit SUCCEEDED - count: %d" format offsets.size())
offsets.
asScala.
foreach {
case (p, omd) =>
log.info("partition = %d; topic = %s; offset = %d; metadata = %s".
format(p.partition(), p.topic(), omd.offset(), omd.metadata()))
}
}
}
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, cb)

我得到这个例外:

2018-07-04 10:14:00 ERROR DumpTask$:136 - Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:163)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:182)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:209)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122)
at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

我应该如何解决这个问题?

使用新的Spark Kafka Connect API,我们可以尝试异步提交。

读取偏移量并在完成该过程后提交。

Kafka 配置相同:

enable.auto.commit=false

auto.offset.reset=earliestauto.offset.reset=latest--如果 Kafka 主题中没有可用的上次提交偏移量,则此配置生效>则它将基于此配置从开始或结束读取偏移量。

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

这是来源: https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

Spark 提供了两个 API 来读取来自 kafka 的消息。

来自 Spark 文档

方法1:基于接收器的方法

此方法使用接收器来接收数据。接收器是使用 Kafka 实现的 高级使用者 API。与所有接收器一样,接收自 Kafka 通过 Receiver 存储在 Spark 执行器中,然后作业 由Spark Streaming启动处理数据。

方法

2:直接方法(无接收器(

Spark中引入了这种新的无接收器"直接"方法。 1.3 确保更有力的端到端保证。这种方法不是使用接收器来接收数据,而是定期查询 Kafka 每个主题+分区中的最新偏移量,以及相应的 定义每个批次中要处理的偏移范围。当作业到 处理数据启动,Kafka的简单消费者API用于 从 Kafka 读取定义的偏移量范围(类似于读取文件 从文件系统(。
请注意,此方法的一个缺点是 它不会更新 Zookeeper 中的偏移量,因此基于 Zookeeper Kafka 监视工具不会显示进度。但是,您可以访问 此方法在每个批次中处理的偏移量和更新 动物园管理员自己

在您的情况下,您使用的是直接方法,因此您需要自己处理消息偏移量并指定要读取消息的偏移量范围。或者,如果您希望 zookeeper 处理您的消息偏移量,那么您可以使用KafkaUtils.createStream()API 使用基于接收器的方法。

你可以在 Spark 文档中找到更多关于如何处理 kafka 偏移量的信息。

最新更新