PySpark 结构化流数据写入 Cassandra 而不填充数据



我想将Spark结构化流数据写入cassandra。我的火花版本是 2.4.0。

我来自 Kafka 的输入源是 JSON 的,所以当写入控制台时,没问题,但是当我在 cqlsh Cassandra 中查询时,没有附加到表中的记录。你能告诉我出了什么问题吗?

schema = StructType() 
.add("humidity", IntegerType(), True) 
.add("time", TimestampType(), True) 
.add("temperature", IntegerType(), True) 
.add("ph", IntegerType(), True) 
.add("sensor", StringType(), True) 
.add("id", StringType(), True)
def writeToCassandra(writeDF, epochId):
writeDF.write 
.format("org.apache.spark.sql.cassandra") 
.mode('append') 
.options("spark.cassandra.connection.host", "cassnode1, cassnode2") 
.options(table="sensor", keyspace="sensordb") 
.save()
# Load json format to dataframe
df = spark 
.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", "kafkanode") 
.option("subscribe", "iot-data-sensor") 
.load() 
.select([
get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
for c in ["humidity", "time", "temperature", "ph", "sensor", "id"]])
df.writeStream 
.foreachBatch(writeToCassandra) 
.outputMode("update") 
.start()

我在 pyspark 中遇到了同样的问题。 请尝试以下步骤

  1. 首先,验证它是否连接到 cassandra。您可以指向不可用的表,并查看它是否由于"找不到表"而失败

  2. 尝试如下 writeStream(在调用 cassandra 更新之前包括触发器和输出模式(

df.writeStream .trigger(processingTime="10 seconds") .outputMode("update") .foreachBatch(writeToCassandra)

相关内容

  • 没有找到相关文章

最新更新