使用kafka 0.9.0和spark 2.1.0-我正在使用pyspark结构化流来计算结果并将其输出Kafka主题。我是指同样的火花文档https://spark.apache.org/docs/latest/sonstrucd-streaming-programming-guide.html#output-modes
现在运行命令
(输出模式在汇总流数据时完成。(
(mydataframe.writeStream
.outputMode("complete")
.format("kafka")
.option("kafka.bootstrap.servers", "x.x.x.x:9092")
.option("topic", "topicname")
.option("checkpointLocation","/data/checkpoint/1")
.start())
它给了我以下错误
ERROR StreamExecution: Query [id = 0686130b-8668-48fa-bdb7-b79b63d82680, runId = b4b7494f-d8b8-416e-ae49-ad8498dfe8f2] terminated with error
org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:73)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:73)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:72)
at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:88)
at org.apache.spark.sql.kafka010.KafkaSink.addBatch(KafkaSink.scala:38)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)**
不确定它会期望什么属性值。需要帮助解决此问题。
控制台输出接收器在控制台上产生正确的输出,因此代码似乎可以正常工作。仅当使用kafka作为输出接收器引起此问题
不确定它会期望什么属性值。需要帮助解决此问题。
您的myDataFrame
需要一个包含有效载荷(消息(的value
列(StringType
或BinaryType
(,您要发送到Kafka。
当前您正在尝试写给Kafka,但不要描述要编写哪些数据。
获得这种Colunm的一种方法是使用.withColumnRenamed
重命名现有列。如果要编写多个列,通常最好创建一个包含json表示框架的列,可以使用to_json
SQL.Function获得。但是要当心.tojson!
spark 2.1.0不支持kafka作为输出接收器。根据文档,它已在2.2.0中引入。
另请参见此答案,该答案链接到提交引入功能的提交,并提供了替代解决方案以及该JIRA,该解决方案添加了文档,以2.2.1。