我想将Spark结构化流数据写入cassandra。我的火花版本是 2.4.0。
我来自 Kafka 的输入源是 JSON 的,所以当写入控制台时,没问题,但是当我在 cqlsh Cassandra 中查询时,没有附加到表中的记录。你能告诉我出了什么问题吗?
schema = StructType()
.add("humidity", IntegerType(), True)
.add("time", TimestampType(), True)
.add("temperature", IntegerType(), True)
.add("ph", IntegerType(), True)
.add("sensor", StringType(), True)
.add("id", StringType(), True)
def writeToCassandra(writeDF, epochId):
writeDF.write
.format("org.apache.spark.sql.cassandra")
.mode('append')
.options("spark.cassandra.connection.host", "cassnode1, cassnode2")
.options(table="sensor", keyspace="sensordb")
.save()
# Load json format to dataframe
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafkanode")
.option("subscribe", "iot-data-sensor")
.load()
.select([
get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
for c in ["humidity", "time", "temperature", "ph", "sensor", "id"]])
df.writeStream
.foreachBatch(writeToCassandra)
.outputMode("update")
.start()
我在 pyspark 中遇到了同样的问题。 请尝试以下步骤
-
首先,验证它是否连接到 cassandra。您可以指向不可用的表,并查看它是否由于"找不到表"而失败
-
尝试如下 writeStream(在调用 cassandra 更新之前包括触发器和输出模式(
df.writeStream
.trigger(processingTime="10 seconds")
.outputMode("update")
.foreachBatch(writeToCassandra)