我将以下json发送到路径"/home/host/test",以便程序可以使用Spark Streaming捕获它并能够对其进行疑问。
{"id": "1", description: "test"}
{"id": "1", description: "test"}
但是,当我执行查询时,它看起来像以下结构
root
| --word: String (Nulleable = true)
我得到以下结果:
+ ------------------- +
| word |
---------------------
| {"id": "1", "test"}
| {"id": "1", "test"}
我需要结构看起来像这个
root
| --id: String (Nulleable = true)
| --description string (Nulleable = true)
我需要获得以下结果
----------------
| id | description
----------------
| "1" | "test" |
| "1" | "test" |
----------------
这是我的pyspkark代码
from __future__ import print_function
import os
import sys
from pyspark import SparkContext
from pyspark.sql.functions import col, explode
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, Row
from pyspark.sql import SQLContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonSqlNetworkWordCount")
ssc = StreamingContext(sc, 3)
sqlcontextoriginal = SQLContext(sc)
# Create a socket stream on target ip:port and count the
# words in input stream of n delimited text (eg. generated by 'nc')
lines = ssc.textFileStream("/home/host/test")
# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SQLContext
sqlContext = SQLContext(rdd.context)
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = sqlContext.createDataFrame(rowRdd).toJSON()
json = sqlContext.read.json(wordsDataFrame)
# Register as table
json.createOrReplaceTempView("words")
json.printSchema()
wordCountsDataFrame = sqlContext.sql("select * from words ")
wordCountsDataFrame.show()
except:
pass
lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()
好吧,我找到了解决方案。
我必须使用sql.read.json直接将其作为参数。
json = sqlcontext.read.json(rdd)