使用Apache Spark读取Json文件



我使用Hivecontext读取json文件,使用以下代码:

df = hive_context.read.json("/Users/duttaam/Downloads/test.json")
df.registerTempTable("df");

默认情况下,火花确定了以下模式

root
 |-- id: string (nullable = true)
 |-- profiles: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- app_id: string (nullable = true)
 |    |    |-- localytics: struct (nullable = true)
 |    |    |    |-- attributes: struct (nullable = true)
 |    |    |    |    |-- ap: long (nullable = true)
 |    |    |    |    |-- app_version: string (nullable = true)
 |    |    |    |    |-- birthdate: string (nullable = true)
 |    |    |    |    |-- country: string (nullable = true)
 |    |    |    |    |-- device_timezone: string (nullable = true)
 |    |    |    |    |-- language: string (nullable = true)
 |    |    |    |    |-- last_session_date: string (nullable = true)
 |    |    |    |    |-- library_version: string (nullable = true)
 |    |    |    |    |-- os_version: string (nullable = true)
 |    |    |    |    |-- push_enabled: long (nullable = true)
 |    |    |    |    |-- total_sessions: long (nullable = true)
 |    |    |    |    |-- user_type: string (nullable = true) 

我的Json看起来如下

{
  "id": "dsdasdasdsd",
  "profiles": [
    {
      "attributes": {
        "MDN": "eoe/W/5Ru1KAPDMQQ/wqn/pu/tGRWpA=="
      },
      "localytics": {
        "attributes": {
          "last_session_date": "2016-07-17",
          "device_timezone": "-04:00",
          "country": "us",
          "language": "en",
          "user_type": "known",
          "city_name": "Indianapolis"
        }
      }
    },
    {
      "app_id": "sdas-c824fcf6-bbae-11e5-adasda-asasqwvz",
      "attributes": {
        "Automatic Backup User": "No"
      },
      "localytics": {
        "attributes": {
          "last_session_date": "2016-07-17",
          "os_version": "6.2.1",
          "app_version": "16.2.19.1",
          "library_version": "androida_3.7.0",
          "ap": 1,
          "custom_1": "Unknown (Not Logged In)",
          "total_sessions": 4,
          "birthdate": "2016-07-09",
          "push_enabled": 1,
          "user_type": "known",
          "custom_0": "Unknown (Not Logged In)",
          "seconds_since_last_session": 1457
        }
      }
    }
  ]
}

因此,默认情况下,Spark不会在两个概要文件中捕获属性字段。有没有一种方法可以自定义代码并更改模式结构?

提前谢谢。

谨致问候,Amit

您可以尝试使用hivecontxt.jsonFile(infile):

from pyspark import SparkContext
from pyspark.sql import HiveContext
import json
sc = SparkContext()
hive_contxt = HiveContext(sc)
your_schema = hive_contxt.jsonFile(INFILE)
your_schema.registerTempTable('YOUR TITLE')

您也可以使用hive_context.sql(YOUR QUERY).collect() 进行查询

您也可以尝试将json转储到内存中,然后使用hive_context.jsonRDD(json_dumped_object)

def make_json_single_row(row, field_names):
    row_lst = row.split(';')
    return json.dumps(dict(zip(field_names, row_lst)))
def make_json(rdd, field_names):
    return rdd.map(lambda row: make_json_single_row(row, field_names)
field_names = ['column1','column2','column3']
rdd = sc.textfile(infile)
split_rdd = make_json(rdd, field_names)
your_new_schema = hive_contxt.jsonRDD(split_rdd)

如果您只想要profiles
在你的情况下,你可以这样做(但我确信这不是最好的方法):

Java:

import org.apache.spark.sql.functions;
DataFrame prof = df.select(functions.explode(df.col("profiles")).as("prof"));
prof.select("prof.app_id", "prof.attributes.*", "prof.localytics.attributes.*");

这需要您充分了解作为条件的json模式

相关内容

  • 没有找到相关文章

最新更新