基于一些键值(PySpark)从RDD创建多个Spark DataFrames



我有一些包含JSON对象的文本文件(每行一个对象)。示例:

{"a": 1, "b": 2, "table": "foo"}
{"c": 3, "d": 4, "table": "bar"}
{"a": 5, "b": 6, "table": "foo"}
...

我想根据表名称将文本文件的内容分析到Spark DataFrames中。因此,在上面的示例中,我将拥有" foo"的数据框架和" bar"的另一个数据框架。我已经完成了将JSON的行分组到RDD内部的列表中,并使用以下(PySpark)代码:

text_rdd = sc.textFile(os.path.join("/path/to/data", "*"))
tables_rdd = text_rdd.groupBy(lambda x: json.loads(x)['table'])

这会产生一个RDD,其中包含具有以下结构的元素列表:

RDD[("foo", ['{"a": 1, "b": 2, "table": "foo"}', ...],
    ("bar", ['{"c": 3, "d": 4, "table": "bar"}', ...]]

如何将此RDD分解为每个表键的数据框架?

编辑:我试图澄清单个文件中包含表的信息中有多行。我知道我可以在我创建的" GroupBy" RDD上调用.collectasmap,但我知道这会在驱动程序上消耗大量的RAM。我的问题是:有没有办法将" GroupBy" RDD分解为多个数据范围,而无需使用.Collectasmap?

您可以有效地将其拆分为镶木式分区:首先,我们将其转换为数据框:

text_rdd = sc.textFile(os.path.join("/path/to/data", "*"))
df = spark.read.json(text_rdd)
df.printSchema()
    root
     |-- a: long (nullable = true)
     |-- b: long (nullable = true)
     |-- c: long (nullable = true)
     |-- d: long (nullable = true)
     |-- table: string (nullable = true)

现在我们可以写它:

df.write.partitionBy('table').parquet([output directory name])

如果列出了[output directory name]的内容,您会看到table的不同值尽可能多的分区:

hadoop fs -ls [output directory name]
    _SUCCESS
    table=bar/
    table=foo/

如果您只想保留每个表的列,则可以执行此操作(假设每当该表显示在文件中时出现完整的列列表)

import ast
from pyspark.sql import Row
table_cols = spark.createDataFrame(text_rdd.map(lambda l: ast.literal_eval(l)).map(lambda l: Row(
        table = l["table"], 
        keys = sorted(l.keys())
    ))).distinct().toPandas()
table_cols = table_cols.set_index("table")
table_cols.to_dict()["keys"]
    {u'bar': [u'c', u'd', u'table'], u'foo': [u'a', u'b', u'table']}

以下是:

  1. 将每个文本字符串映射到JSON。

    jsonRdd = sc.textFile(os.path.join("/path/to/data", "*")).map (.....)
    
  2. 将所有不同的表名为驱动程序。

    tables = jsonRdd.map(<extract table name only from json object >).distinct().collect()
    
  3. 遍历每个(步骤2)表和过滤Main JSONRDD,以创建单个表的RDD。

    tablesRDD=[]
    for table in tables:
         # categorize each main rdd record based on table name.
         # Compare each json object table element with for loop table string and on successful match return true.
        output.append(jasonRdd.filter(lambda jsonObj: jsonObj['table'] == table))
    

我不是python开发人员,所以准确的代码段可能无法正常工作。

最新更新