Spark 流式传输到 PySpark JSON 文件中的数据帧



我需要 pyspark 的帮助。我正在从 kafka 流式传输 json 数据,我需要在 pyspark 中将数据帧转换为数据帧。为了流式传输,我使用了以下代码。

from __future__ import print_function
import sys
import csv
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pandas as pd
global gspark
def convert_Json2DF(time,rdd):
nf = gspark.read.json(rdd)
nf.toDF().show()
# Convert RDD[String] to RDD[Row] to DataFrame
#rowRdd = rdd.map(lambda w: Row(word=w))
#wordsDataFrame = gspark.createDataFrame(rowRdd)
#pdf = wordsDataFrame.toDF()
#pdf.show()
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
gspark = SparkSession 
.builder 
.appName("SparkSteaming Kafka Receiver") 
.config("spark.some.config.option", "some-value") 
.config("spark.ui.port", 22300) 
.config("spark.executor.instances", 4) 
.config("spark.executor.cores", 4) 
.getOrCreate()
sc = gspark.sparkContext
SQLContext= SQLContext(sc)
ssc = StreamingContext(sc, 15)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda (key,value): json.loads(value))
lines.pprint()
lines.foreachRDD(Json2DF)
ssc.start()
ssc.awaitTermination()

对于上面的代码,我无法将 json 数据转换为数据帧。 任何人都可以在我需要进行更改的地方纠正我,在 Json2DF 函数或主函数中执行。

谢谢 巴拉

首先,确保所有 JSON 数据具有相同的架构。

def check_json(js, col):
try:
data = json.loads(js)
return [data.get(i) for i in col]
except:
return []

def convert_json2df(rdd, col):
ss = SparkSession(rdd.context)
if rdd.isEmpty():
return
df = ss.createDataFrame(rdd, schema=StructType("based on 'col'"))
df.show()

cols = ['idx', 'name']
sc = SparkContext()
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream('localhost', 9999) 
.map(lambda x: check_json(x, cols)) 
.filter(lambda x: x) 
.foreachRDD(lambda x: convert_json2df(x, cols))
ssc.start()
ssc.awaitTermination()

最新更新