getBatch从MQTTextStreamSource返回的DataFrame不具有isStreaming=true



我尝试将MQTT与PySpark结构化流一起使用。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession 
.builder 
.appName("Test") 
.master("local[4]") 
.getOrCreate()
# Custom Structured Streaming receiver
lines = spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic","uwb/distances")
.option('brokerUrl', 'tcp://127.0.0.1:1883')
.load()
# Split the lines into words
words = lines.select(explode(split(lines.value, ' ')).alias('word'))
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts 
.writeStream 
.outputMode('complete') 
.format('console') 
.start()
query.awaitTermination()

错误消息:

Logical Plan:
Aggregate [word#7], [word#7, count(1) AS count#11L]
+- Project [word#7]
+- Generate explode(split(value#2,  )), false, [word#7]
+- StreamingExecutionRelation org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource@383ccec1, [value#2, timestamp#3]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource@383ccec1 did not have isStreaming=true

我不明白我的代码出了什么问题。此外,根据这篇文章,结构化流2.1.0实际上得到了Bahir MQTT的支持。我也试过Spark 2.2.1,但也遇到了同样的问题。

这就是我运行代码的方式:

spark-submit 
--jars lib/spark-streaming-mqtt_2.11-2.2.1.jar, 
lib/spark-sql-streaming-mqtt_2.11-2.2.1.jar, 
lib/org.eclipse.paho.client.mqttv3-1.2.0.jar 
TestSpark.py

我该如何解决这个问题?

我下载了Spark 2.2.0二进制文件,并执行了如下代码:

~/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit 
--jars lib/spark-streaming-mqtt_2.11-2.2.1.jar, 
lib/spark-sql-streaming-mqtt_2.11-2.2.1.jar, 
lib/org.eclipse.paho.client.mqttv3-1.2.0.jar 
TestSpark.py

这解决了问题。以前我只更改MQTT jar文件的版本,例如spark-streaming-MQTT_2.11-2.2.1.jar,但显然这还不够。

相关内容

  • 没有找到相关文章

最新更新