我使用Hivecontext读取json文件,使用以下代码:
df = hive_context.read.json("/Users/duttaam/Downloads/test.json")
df.registerTempTable("df");
默认情况下,火花确定了以下模式
root
|-- id: string (nullable = true)
|-- profiles: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- app_id: string (nullable = true)
| | |-- localytics: struct (nullable = true)
| | | |-- attributes: struct (nullable = true)
| | | | |-- ap: long (nullable = true)
| | | | |-- app_version: string (nullable = true)
| | | | |-- birthdate: string (nullable = true)
| | | | |-- country: string (nullable = true)
| | | | |-- device_timezone: string (nullable = true)
| | | | |-- language: string (nullable = true)
| | | | |-- last_session_date: string (nullable = true)
| | | | |-- library_version: string (nullable = true)
| | | | |-- os_version: string (nullable = true)
| | | | |-- push_enabled: long (nullable = true)
| | | | |-- total_sessions: long (nullable = true)
| | | | |-- user_type: string (nullable = true)
我的Json看起来如下
{
"id": "dsdasdasdsd",
"profiles": [
{
"attributes": {
"MDN": "eoe/W/5Ru1KAPDMQQ/wqn/pu/tGRWpA=="
},
"localytics": {
"attributes": {
"last_session_date": "2016-07-17",
"device_timezone": "-04:00",
"country": "us",
"language": "en",
"user_type": "known",
"city_name": "Indianapolis"
}
}
},
{
"app_id": "sdas-c824fcf6-bbae-11e5-adasda-asasqwvz",
"attributes": {
"Automatic Backup User": "No"
},
"localytics": {
"attributes": {
"last_session_date": "2016-07-17",
"os_version": "6.2.1",
"app_version": "16.2.19.1",
"library_version": "androida_3.7.0",
"ap": 1,
"custom_1": "Unknown (Not Logged In)",
"total_sessions": 4,
"birthdate": "2016-07-09",
"push_enabled": 1,
"user_type": "known",
"custom_0": "Unknown (Not Logged In)",
"seconds_since_last_session": 1457
}
}
}
]
}
因此,默认情况下,Spark不会在两个概要文件中捕获属性字段。有没有一种方法可以自定义代码并更改模式结构?
提前谢谢。
谨致问候,Amit
您可以尝试使用hivecontxt.jsonFile(infile):
from pyspark import SparkContext
from pyspark.sql import HiveContext
import json
sc = SparkContext()
hive_contxt = HiveContext(sc)
your_schema = hive_contxt.jsonFile(INFILE)
your_schema.registerTempTable('YOUR TITLE')
您也可以使用hive_context.sql(YOUR QUERY).collect()
进行查询
您也可以尝试将json转储到内存中,然后使用hive_context.jsonRDD(json_dumped_object)
def make_json_single_row(row, field_names):
row_lst = row.split(';')
return json.dumps(dict(zip(field_names, row_lst)))
def make_json(rdd, field_names):
return rdd.map(lambda row: make_json_single_row(row, field_names)
field_names = ['column1','column2','column3']
rdd = sc.textfile(infile)
split_rdd = make_json(rdd, field_names)
your_new_schema = hive_contxt.jsonRDD(split_rdd)
如果您只想要profiles
列
在你的情况下,你可以这样做(但我确信这不是最好的方法):
Java:
import org.apache.spark.sql.functions;
DataFrame prof = df.select(functions.explode(df.col("profiles")).as("prof"));
prof.select("prof.app_id", "prof.attributes.*", "prof.localytics.attributes.*");
这需要您充分了解作为条件的json模式