在Python中处理来自Kafka主题的json表格数据



我以key:value json(没有嵌套结构)的形式将事件流式传输到多个Kafka主题中,例如:

event_1: {"name": "Alex", "age": 27, "hobby": "pc games"},  
event_2: {"name": "Bob", "age": 33, "hobby: "swimming"},  
event_3: {"name": "Charlie", "age": 12, "hobby: "collecting stamps"}

我在Python 3.7中工作,并希望从这些主题中消费一批事件,比如,每5分钟,将其转换为数据框架,对这些数据进行一些处理和丰富,并将结果保存到csv文件中。

我是Spark的新手,搜索了文档来帮助我完成这项任务,但没有找到任何文档。有没有推荐的最新信息来源?
另外,如果有其他适合这项任务的推荐大数据框架,我很乐意听到。

参考结构化流媒体编程指南中的触发器部分。有3种不同类型的触发器,默认为微批,一旦上一个微批处理完成,就会生成微批。

如果您需要固定间隔的微批,您可以指定查询必须触发的持续时间。下面是完成该操作的代码片段:

df.writeStream 
.format("csv") 
.option("header", True) 
.option("path", "path/to/destination/dir") 
.trigger(processingTime='5 minutes')  # fixed interval trigger
.start()

简短的代码

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType

# Define schema of kafak message
schema = StructType([
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("hobby", StringType, true),
])
# Initialize spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Read Kafka topic and load data using schema
df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","x.x.x.x:2181")
.option("startingOffsets", "latest")
.option("subscribe","testdata")
.load()
.select(from_json(col("value").cast("string"), schema).alias("data"))
.select(f.col("data.*"))
# Do some transformation
df1 = df...
# Write the resultant dataframe as CSV file
df1.writeStream 
.format("csv") 
.option("header", True) 
.option("path", "path/to/destination/dir") 
.trigger(processingTime='5 minutes') 
.start()

如果需要,您还可以在写入csv文件之前重新分区最终数据帧

最新更新