我有一些包含JSON对象的文本文件(每行一个对象)。示例:
{"a": 1, "b": 2, "table": "foo"}
{"c": 3, "d": 4, "table": "bar"}
{"a": 5, "b": 6, "table": "foo"}
...
我想根据表名称将文本文件的内容分析到Spark DataFrames中。因此,在上面的示例中,我将拥有" foo"的数据框架和" bar"的另一个数据框架。我已经完成了将JSON的行分组到RDD内部的列表中,并使用以下(PySpark)代码:
text_rdd = sc.textFile(os.path.join("/path/to/data", "*"))
tables_rdd = text_rdd.groupBy(lambda x: json.loads(x)['table'])
这会产生一个RDD,其中包含具有以下结构的元素列表:
RDD[("foo", ['{"a": 1, "b": 2, "table": "foo"}', ...],
("bar", ['{"c": 3, "d": 4, "table": "bar"}', ...]]
如何将此RDD分解为每个表键的数据框架?
编辑:我试图澄清单个文件中包含表的信息中有多行。我知道我可以在我创建的" GroupBy" RDD上调用.collectasmap,但我知道这会在驱动程序上消耗大量的RAM。我的问题是:有没有办法将" GroupBy" RDD分解为多个数据范围,而无需使用.Collectasmap?
您可以有效地将其拆分为镶木式分区:首先,我们将其转换为数据框:
text_rdd = sc.textFile(os.path.join("/path/to/data", "*"))
df = spark.read.json(text_rdd)
df.printSchema()
root
|-- a: long (nullable = true)
|-- b: long (nullable = true)
|-- c: long (nullable = true)
|-- d: long (nullable = true)
|-- table: string (nullable = true)
现在我们可以写它:
df.write.partitionBy('table').parquet([output directory name])
如果列出了[output directory name]
的内容,您会看到table
的不同值尽可能多的分区:
hadoop fs -ls [output directory name]
_SUCCESS
table=bar/
table=foo/
如果您只想保留每个表的列,则可以执行此操作(假设每当该表显示在文件中时出现完整的列列表)
)import ast
from pyspark.sql import Row
table_cols = spark.createDataFrame(text_rdd.map(lambda l: ast.literal_eval(l)).map(lambda l: Row(
table = l["table"],
keys = sorted(l.keys())
))).distinct().toPandas()
table_cols = table_cols.set_index("table")
table_cols.to_dict()["keys"]
{u'bar': [u'c', u'd', u'table'], u'foo': [u'a', u'b', u'table']}
以下是:
-
将每个文本字符串映射到JSON。
jsonRdd = sc.textFile(os.path.join("/path/to/data", "*")).map (.....)
-
将所有不同的表名为驱动程序。
tables = jsonRdd.map(<extract table name only from json object >).distinct().collect()
-
遍历每个(步骤2)表和过滤Main JSONRDD,以创建单个表的RDD。
tablesRDD=[] for table in tables: # categorize each main rdd record based on table name. # Compare each json object table element with for loop table string and on successful match return true. output.append(jasonRdd.filter(lambda jsonObj: jsonObj['table'] == table))
我不是python开发人员,所以准确的代码段可能无法正常工作。