我正在尝试使用结构化流媒体从Kafka代理中读取一些数据,以在Zeppelin Note中显示它。我正在使用Spark 2.4.3,Scala 2.11,Python 2.7,Java 9和Kafka 2.2,启用了SSL,启用了Heroku上的SSL,但获得了StreamingQueryException:"无法构造Kafka消费者"。
我正在使用以下依赖关系(在Spark解释器设置中设置(:
org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3
org.apache.spark:spark-streaming_2.11:2.4.3
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3
我尝试了较旧版本和较新的版本,但是这些版本应该与我正在使用的Spark/Scala版本匹配。
我已经使用简单的Python生产商和消费者成功地从Kafka写了并阅读。
我正在使用的代码:
%pyspark
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
from pyspark.sql.functions import col, expr, when
schema = StructType().add("power", IntegerType()).add("colorR", IntegerType()).add("colorG",IntegerType()).add("colorB",IntegerType()).add("colorW",IntegerType())
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", "/home/ubuntu/kafka/truststore.jks")
.option("kafka.ssl.keystore.location", "/home/ubuntu/kafka/keystore.jks")
.option("kafka.ssl.keystore.password", password)
.option("kafka.ssl.truststore.password", password)
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("startingOffsets", "earliest")
.option("subscribe", topic)
.load()
schema = ArrayType(
StructType([StructField("power", IntegerType()),
StructField("colorR", IntegerType()),
StructField("colorG", IntegerType()),
StructField("colorB", IntegerType()),
StructField("colorW", IntegerType())]))
readDF = df.select(
col("key").cast("string"),
from_json(col("value").cast("string"), schema))
query = readDF.writeStream.format("console").start()
query.awaitTermination()
和我遇到的错误:
Fail to execute line 43: query.awaitTermination()
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-2171412221151055324.py", line 380, in <module>
exec(code, _zcUserQueryNameSpace)
File "<stdin>", line 43, in <module>
File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103, in awaitTermination
return self._jsq.awaitTermination()
File "/home/ubuntu/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/ubuntu/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 75, in deco
raise StreamingQueryException(s.split(': ', 1)[1], stackTrace)
StreamingQueryException: u'Failed to construct kafka consumern=== Streaming Query ===nIdentifier: [id = 2ee20c47-8293-469a-bc0b-ef71a1f118bc, runId = 72422290-090a-4b6d-bd66-088a5a534240]nCurrent Committed Offsets: {}nCurrent Available Offsets: {}nnCurrent State: ACTIVEnThread State: RUNNABLEnnLogical Plan:nProject [cast(key#7 as string) AS key#22, jsontostructs(ArrayType(StructType(StructField(power,IntegerType,true), StructField(colorR,IntegerType,true), StructField(colorG,IntegerType,true), StructField(colorB,IntegerType,true), StructField(colorW,IntegerType,true)),true), cast(value#8 as string), Some(Etc/UTC)) AS jsontostructs(CAST(value AS STRING))#21]n+- StreamingExecutionRelation KafkaV2[Subscribe[tanana-44614.lightbulb]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]n'
当我使用读写而不是readstream和Writestream时,我不会遇到任何错误,但是当我将一些数据发送到Kafka时,控制台上没有任何错误。
我还应该尝试什么?
看起来Kafka消费者无法访问~/kafka/truststore.jks
,因此异常。用完全指定的路径替换~
(没有Tilde(,问题应消失。