pyspark结构化流量输出接收器作为kafka给出错误



使用kafka 0.9.0和spark 2.1.0-我正在使用pyspark结构化流来计算结果并将其输出Kafka主题。我是指同样的火花文档https://spark.apache.org/docs/latest/sonstrucd-streaming-programming-guide.html#output-modes

现在运行命令

(输出模式在汇总流数据时完成。(

(mydataframe.writeStream
    .outputMode("complete")
    .format("kafka")
    .option("kafka.bootstrap.servers", "x.x.x.x:9092")
    .option("topic", "topicname")
    .option("checkpointLocation","/data/checkpoint/1")
    .start())

它给了我以下错误

 ERROR StreamExecution: Query [id = 0686130b-8668-48fa-bdb7-b79b63d82680, runId = b4b7494f-d8b8-416e-ae49-ad8498dfe8f2] terminated with error
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:73)
    at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:73)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:72)
    at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:88)
    at org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)**

不确定它会期望什么属性值。需要帮助解决此问题。

控制台输出接收器在控制台上产生正确的输出,因此代码似乎可以正常工作。仅当使用kafka作为输出接收器引起此问题

不确定它会期望什么属性值。需要帮助解决此问题。

您的myDataFrame需要一个包含有效载荷(消息(的value列(StringTypeBinaryType(,您要发送到Kafka。

当前您正在尝试写给Kafka,但不要描述要编写哪些数据。

获得这种Colunm的一种方法是使用.withColumnRenamed重命名现有列。如果要编写多个列,通常最好创建一个包含json表示框架的列,可以使用to_json SQL.Function获得。但是要当心.tojson!

spark 2.1.0不支持kafka作为输出接收器。根据文档,它已在2.2.0中引入。

另请参见此答案,该答案链接到提交引入功能的提交,并提供了替代解决方案以及该JIRA,该解决方案添加了文档,以2.2.1。

中添加了文档。

相关内容

最新更新