我有一个kafka主题,我想使用PySpark流从kafka生产者读取数据,做一些转换,并保存到HDFS。我希望每次从kafka源捕获数据时都能完成。我想问题出在我的"更新"上。函数。下面是我的代码:
orders_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
.option("subscribe", kafka_topic_name)
.option("startingOffsets", "latest")
.load()
orders_df1 = orders_df.selectExpr("CAST(value AS STRING)", "timestamp")
stock_price_schema = types.StructType([
types.StructField("symbol", types.StringType(), True),
types.StructField("date", types.DateType(), True),
types.StructField("open", types.DoubleType(), True),
types.StructField("high", types.DoubleType(), True),
types.StructField("low", types.DoubleType(), True),
types.StructField("close", types.DoubleType(), True),
types.StructField("volume", types.IntegerType(), True)
])
orders_df2 = orders_df1
.select(from_json(col("value"), stock_price_schema)
.alias("orders"), "timestamp")
# orders_df3 = orders_df2.select("orders.*", "timestamp")
stock_df = orders_df2.select("orders.*")
# Write final result into console for debugging purpose
orders_agg_write_stream = stock_df
.writeStream
.trigger(processingTime='5 seconds')
.outputMode("update")
.option("truncate", "false")
.format("console")
.start()
def update(stock_df):
if stock_df.count() == 0:
return
for row in stock_df.rdd.collect():
symbol = row["symbol"]
df_new = stock_df[stock_df["symbol"] == symbol]
df_old = spark.read.parquet(f"data/pq/{symbol}/")
df_new = df_old.union(df_new).distinct()
df_new.repartition(4).write.parquet(f"data/pq/{symbol}/")
update(orders_agg_write_stream)
orders_agg_write_stream.awaitTermination()
print("Stream Data Processing Application Completed.")
第一件事-你不能简单地在流上做.rdd.collect
-流的定义是无限的。相反,你需要在微批上执行你的逻辑,如果你使用foreachBatch函数,它将被调用,而这个函数的存在正是为了这个原因。
- 如果你从Kafka流,你将创建很多小文件,并且在读取Parquet时列出它们将非常昂贵。Delta通过使用事务日志 解决了这个问题
- 你将获得数据的事务性——消费者不会读取部分结果
- Delta具有内置的合并/更新/删除功能,也是事务性的,因此使用内置功能表达逻辑可能更有效,而不是再次重写所有数据&一次。