基于 pyspark 中的键高效推断数据帧模式



我有一个由 json 行(行)组成的大型数据集。 这些行具有多个字段,并且存在的字段取决于该行中的一个 json 字段。 这里有一个小例子:

%pyspark
data = sc.parallelize([{'key':'k1','a':1.0,'b':2.0},
                    {'key':'k1','a':1.0,'b':20.0},
                    {'key':'k1','a':100.0,'b':.2},
                    {'key':'k2','y':10.0,'z':20.0},
                    {'key':'k2','y':1.0,'z':250.0},
                    {'key':'k1','a':1.0,'b':2.0},], 2)

我的目标是将此数据放入数据帧中,而无需指定架构。 Pyspark (至少)有两个函数可以帮助解决这个问题:1) toDF() ,它只将第一行数据作为模式,2) sqlContext.createDataFrame(),您可以在其中指定要采样的行比例以推断模式。 例如:

data.toDF().show()
+-----+----+---+
|    a|   b|key|
+-----+----+---+
|  1.0| 2.0| k1|
|  1.0|20.0| k1|
|100.0| 0.2| k1|
| null|null| k2|
| null|null| k2|
|  1.0| 2.0| k1|
+-----+----+---+
sqlContext.createDataFrame(data,samplingRatio=1).show()
+-----+----+---+----+-----+
|    a|   b|key|   y|    z|
+-----+----+---+----+-----+
|  1.0| 2.0| k1|null| null|
|  1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
|  1.0| 2.0| k1|null| null|
+-----+----+---+----+-----+

sqlContext.createDataFrame()做了我想要的,但由于我在 40 亿行中可能只有五个键,我认为一定有一种更快的方法来推断架构。 此外,有些键非常罕见,所以我无法逃脱samplingRatio变小。

鉴于只有几种行类型,是否有一种优雅而快速的方法来推断架构?

更多的谷歌搜索引导我找到解决方案。

首先创建一个可靠的数据帧连接器(unionAll无法合并架构):

def addEmptyColumns(df, colNames):
    exprs = df.columns + ["null as " + colName for colName in colNames]
    return df.selectExpr(*exprs)

def concatTwoDfs(left, right):
    # append columns from right df to left df
    missingColumnsLeft = set(right.columns) - set(left.columns)
    left = addEmptyColumns(left, missingColumnsLeft)
    # append columns from left df to right df
    missingColumnsRight = set(left.columns) - set(right.columns)
    right = addEmptyColumns(right, missingColumnsRight)
    # let's set the same order of columns
    right = right[left.columns]
     # finally, union them
    return left.unionAll(right)

def concat(dfs):
    return reduce(concatTwoDfs, dfs)

(代码来自 https://lab.getbase.com/pandarize-spark-dataframes/)

然后获取不同的键,创建数据帧列表,并将它们连接起来:

keys = data.map(lambda x: x['key']).distinct().collect()
a_grp = [data.filter(lambda x: x['key']==k).toDF() for k in keys]
concat(a_grp).show()
+-----+----+---+----+-----+
|    a|   b|key|   y|    z|
+-----+----+---+----+-----+
|  1.0| 2.0| k1|null| null|
|  1.0|20.0| k1|null| null|
|100.0| 0.2| k1|null| null|
|  1.0| 2.0| k1|null| null|
| null|null| k2|10.0| 20.0|
| null|null| k2| 1.0|250.0|
+-----+----+---+----+-----+

最新更新