Pyspark流从Kafka源并保存到hdfs



我有一个kafka主题,我想使用PySpark流从kafka生产者读取数据,做一些转换,并保存到HDFS。我希望每次从kafka源捕获数据时都能完成。我想问题出在我的"更新"上。函数。下面是我的代码:

orders_df = spark 
.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) 
.option("subscribe", kafka_topic_name) 
.option("startingOffsets", "latest") 
.load()
orders_df1 = orders_df.selectExpr("CAST(value AS STRING)", "timestamp")

stock_price_schema = types.StructType([
types.StructField("symbol", types.StringType(), True),
types.StructField("date", types.DateType(), True),
types.StructField("open", types.DoubleType(), True),
types.StructField("high", types.DoubleType(), True),
types.StructField("low", types.DoubleType(), True),
types.StructField("close", types.DoubleType(), True),
types.StructField("volume", types.IntegerType(), True)
])
orders_df2 = orders_df1
.select(from_json(col("value"), stock_price_schema)
.alias("orders"), "timestamp")

# orders_df3 = orders_df2.select("orders.*", "timestamp")
stock_df = orders_df2.select("orders.*")

# Write final result into console for debugging purpose
orders_agg_write_stream = stock_df 
.writeStream 
.trigger(processingTime='5 seconds') 
.outputMode("update") 
.option("truncate", "false")
.format("console") 
.start()

def update(stock_df):

if stock_df.count() == 0:
return

for row in stock_df.rdd.collect():

symbol = row["symbol"]
df_new = stock_df[stock_df["symbol"] == symbol]
df_old = spark.read.parquet(f"data/pq/{symbol}/")

df_new = df_old.union(df_new).distinct()
df_new.repartition(4).write.parquet(f"data/pq/{symbol}/")
update(orders_agg_write_stream)
orders_agg_write_stream.awaitTermination()
print("Stream Data Processing Application Completed.")

第一件事-你不能简单地在流上做.rdd.collect-流的定义是无限的。相反,你需要在微批上执行你的逻辑,如果你使用foreachBatch函数,它将被调用,而这个函数的存在正是为了这个原因。

此外,我建议使用Delta Lake数据格式而不是普通的Parquet(顺便说一句,Delta Lake也是建立在Parquet之上的)。原因如下:
  • 如果你从Kafka流,你将创建很多小文件,并且在读取Parquet时列出它们将非常昂贵。Delta通过使用事务日志
  • 解决了这个问题
  • 你将获得数据的事务性——消费者不会读取部分结果
  • Delta具有内置的合并/更新/删除功能,也是事务性的,因此使用内置功能表达逻辑可能更有效,而不是再次重写所有数据&一次。

最新更新