PySpark和Kafka "Set are gone. Some data may have been missed.."



我在本地模式下使用Spark集群运行PySpark,并试图为Kafka主题编写流式DataFrame。

当我运行查询时,我得到以下消息:

java.lang.IllegalStateException: Set(topicname-0) are gone. Some data may have been missed.. 
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".

这是我的代码:

query = (
output_stream
.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "ratings-cleaned")
.option("checkpointLocation", "checkpoints-folder")
.start()
)
sleep(2)
print(query.status)

自上次运行查询以来,当从源主题中删除了一些消息/偏移量时,通常会显示此错误消息。删除是由于清除策略(如保留时间(造成的。

假设您的主题有偏移量为0、1、2的消息,这些消息都已由应用程序处理。检查点文件存储最后一个偏移量2,以便在下次启动时继续使用偏移量3。

一段时间后,偏移量为3、4、5的消息被生成到主题,但偏移量为0、1、2、3的消息由于保留而从主题中删除。

现在,当重新启动您的spark结构化流作业时,它试图根据检查点文件获取3,但意识到只有偏移量为4的消息可用。在这种情况下,它将抛出此异常。

你可以通过解决这个问题

  • readStream操作中设置.option("failOnDataLoss", "false"),或
  • 删除现有检查点文件

根据结构化流媒体+Kafka集成指南,选项failOnDataLoss描述为:

"当数据可能丢失(例如,主题被删除或偏移量超出范围(时,是否使查询失败。这可能是虚惊一场。当它不能按预期工作时,您可以禁用它。如果由于数据丢失而无法从提供的偏移中读取任何数据,则批处理查询将始终失败">

在上面的答案之上,Bartosz Konieczny发布了一个更详细的原因。错误消息的第一部分表示Set()为空;即一组主题分区(因此是末尾的-0(。这意味着Spark集群订阅的分区已被删除。我的猜测是卡夫卡的设置被重新启动了。Spark查询使用一些默认的检查点文件夹,该文件夹假定Kafka设置没有重新启动。

此错误消息提示检查点存在问题。在开发过程中,这可能是由于使用带有更新查询的旧检查点文件夹造成的。

如果这是在开发环境中,并且不需要保存上一个查询的状态,那么只需删除checkpoint文件夹(在代码示例中为checkpoints-folder(并重新运行查询即可。

相关内容

最新更新