使用pyspark为动态表创建XPATH行



我是pyspark(1.6版(的新手,我有严重的XPATHS(在处理的键值中(及其列表文件中的值

root/catalog~1/product~1/arrived-on~1='today'
root/catalog~1/product~1/arrived-for~1='gha'
root/catalog~1/product~1/catalog-item~1/price~1='39.95'
root/catalog~1/product~1/catalog-item~1/name~1='A'
root/catalog~1/product~1/catalog-item~2/name~1='C'
root/catalog~1/product~1/catalog-item~2/price~1='49.95'
root/catalog~1/product~1/catalog-item~3/name~1='B'
root/catalog~1/product~1/catalog-item~3/price~1='100'
root/catalog~1/product~2/arrived-on~1'yesterday'
root/catalog~1/product~2/arrived-for~1='gha'
root/catalog~1/product~2/catalog-item~1/price~1='399.95'
root/catalog~1/product~2/catalog-item~1/name~1='AA'
root/catalog~1/product~2/catalog-item~2/name~1='CC'
root/catalog~1/product~2/catalog-item~2/price~1='939.95'
root/catalog~1/product~2/catalog-item~3/name~1='AB'
root/catalog~1/product~2/catalog-item~3/price~1='239'
root/catalog~2/product~1/arrived-on~1='Monday'
root/catalog~2/product~1/arrived-for~1='raf'
root/catalog~2/product~1/catalog-item~1/price~1='70'
root/catalog~2/product~1/catalog-item~1/name~1='AAA'
root/catalog~2/product~1/catalog-item~2/name~1='CCCC'
root/catalog~2/product~1/catalog-item~2/price~1='49.95'
root/catalog~2/product~1/catalog-item~2/anytag~1='anytag'
root/catalog~2/product~1/catalog-item~3/name~1='B'
root/catalog~2/product~1/catalog-item~3/price~1='D'
root/catalog~2/product~2/arrived-on~1'today'
root/catalog~2/product~2/arrived-for~1='jaf'
root/catalog~2/product~2/catalog-item~1/price~1='39.95'
root/catalog~2/product~2/catalog-item~1/name~1='A'
root/catalog~2/product~2/catalog-item~2/name~1='C'
root/catalog~2/product~2/catalog-item~2/price~1='39.95'
root/catalog~2/product~2/catalog-item~2/extratag~1='only on this'

我希望将此输出转换为多个表,其中before=的值为列名,before=之前的值为表名。

在这种情况下,我最终会有两张桌子。Tbl 1目录表

catalog product arrived-on  arrived-for
1   1   today   gha
1   2   yesterday   gha
2   1   Monday  raf
2   2   today   raf

表2-目录项目表

catalog product catalog-item    name    price   extratag    Anytag
1   1   1   A   39.95   NULL    NULL
1   1   2   C   49.95   NULL    NULL
1   1   3   B   100 NULL    NULL
1   2   1   AA  399.95  NULL    NULL
1   2   2   CC  939.95  NULL    NULL
1   2   3   AB  239 NULL    NULL
2   1   1   AAA 70  NULL    NULL
2   1   2   CCC 49.95   NULL    anytag
2   1   3   B   D   NULL    NULL
2   2   2   C   39.95   NULL    NULL
2   2   2   A   39.95   Only on This    NULL

到目前为止,我所做的是将xpath值拆分为多个列

import pyspark.sql.functions as f    
myrdd=sc.textfile("<fileanme>").map(lambda line: line.split("=")
df=myrdd.toDF(['xpath'],['value'])
df=df.withcolumn('columnname',f.reverse(f.split(f.reverse(df.xpath),"/")[0]))
df=df.withcolumn('tablename',f.reverse(f.split(f.reverse(df.xpath),"/")[1]))
df=df.withcolumn('hirearchy_1',f.split(f.col('xpath'),"/")[1])
df=df.withcolumn('hirearchy_3',f.split(f.col('xpath'),"/")[3])
df=df.withcolumn('hirearchy_2',f.split(f.col('xpath'),"/")[2])
df=df.withcolumn('hirearchy_4',f.split(f.col('xpath'),"/")[4])
df=df.withcolumn('layer_2',f.split(f.col('hirearchy_2'),"/")[1])
df=df.withcolumn('layer_3',f.split(f.col('hirearchy_3'),"/")[1])
df=df.withcolumn('layer_4',f.split(f.col('hirearchy_4'),"/")[1])
df=df.withcolumn('hirearchy_1',f.split(f.col('hirearchy_1'),"~")[0])
df=df.withcolumn('hirearchy_3',f.split(f.col('xpath'),"~")[0])
df=df.withcolumn('hirearchy_2',f.split(f.col('xpath'),"~")[0])
df=df.withcolumn('hirearchy_4',f.split(f.col('xpath'),"~")[0])

我想进一步动态拆分,使所有/和~都成为delimeter,并使其成为列。

我想选择具有层值的columname tablename hirechary和transform,并将其带到所需的输出。我在这里只列出了第一个huirechary。我正在寻找一些通用的东西,它将动态添加列并进行转置,并为我带来所需的列。

任何建议、指示和答案都将不胜感激。

在RDD中,我们可以进行更多的清理和DF以获得更高的性能。

myrdd=sc.textFile("data/xpath.txt").map(lambda x: x.replace('1=',''))
def arr(x): 
return 'catalog-item' not in x 
def cat(x): 
return 'catalog-item' in x
# Maybe someone can write better definitions on following 
def makeUpArr(x,head):
line = ''
for i in head: 
found = ''
for k in x:
if k[0] == i:
found = k[1]
line += found  + ','
return line[:-1]
def lastCol(y):
line = ''
for i in range(len(y)):
if i%2 == 0:
line += '(' + y[i] + ',' 
else:
line +=  y[i] + '),' 
return line[:-1]

rdd_arr, rdd_cat = (myrdd.filter(f) for f in (arr , cat))
import pyspark.sql.functions as F
from pyspark.sql.functions import concat, col, lit
#---------
rdd_arrHead = rdd_arr.map(lambda x : [i.split('~')[0] for i in x.split('/') if (i != 'root')]).flatMap(lambda x : x)
header_arr = [i for i in rdd_arrHead.distinct().collect()]
rdd_arrData = rdd_arr.map(lambda x : [(i.split('~')[0] ,i.split('~')[1] ) for i in x.split('/') if ( i != 'root')]).map(lambda x : makeUpArr(x,header_arr)).map(lambda x : x.split(','))
dfArr = sqlContext.createDataFrame(rdd_arrData, header_arr)
#---------
rdd_catHead = rdd_cat.map(lambda x : [i.split('~')[0] for i in x.split('/') if (i != 'root')]).flatMap(lambda x : x)
header_cat = [i for i in rdd_catHead.distinct().collect()]
rdd_catData = rdd_cat.map(lambda x : [(i.split('~')[0] ,i.split('~')[1] ) for i in x.split('/') if ( i != 'root')]).map(lambda x : makeUpArr(x,header_cat)).map(lambda x : x.split(','))
dfData = sqlContext.createDataFrame(rdd_catData, header_cat)
#dfArr.groupBy(['catalog', 'product']).agg(F.max(col('arrived-on')).alias('arrived-on'), F.max(col('arrived-for')).alias('arrived-for')).show()
>>> dfArr.groupBy(['catalog', 'product']).agg(*[F.max(dfArr[i]) for i in header_arr if i != 'catalog' and i != 'product'  ]).show()
+-------+-------+-----------+-----------+
|catalog|product| arrived-on|arrived-for|
+-------+-------+-----------+-----------+
|      1|      1|    'today'|      'gha'|
|      2|      2|    'today'|      'jaf'|
|      1|      2|'yesterday'|      'gha'|
|      2|      1|   'Monday'|      'raf'|
+-------+-------+-----------+-----------+
#dfData.groupBy(['catalog', 'product','catalog-item']).agg(F.max(col('name')).alias('name'),F.max(col('price')).alias('price'),F.max(col('extratag')).alias('extratag'), F.max(col('Anytag')).alias('Anytag')).show()
>>> dfData.groupBy(['catalog', 'product','catalog-item']).agg(*[F.max(dfData[i]) for i in header_cat if i != 'catalog' and i != 'product' and i != 'catalog-item'  ]).show()
+-------+-------+------------+------+--------+--------------+--------+
|catalog|product|catalog-item|  name|   price|      extratag|  Anytag|
+-------+-------+------------+------+--------+--------------+--------+
|      1|      1|           3|   'B'|   '100'|              |        |
|      2|      2|           1|   'A'| '39.95'|              |        |
|      1|      1|           1|   'A'| '39.95'|              |        |
|      2|      1|           3|   'B'|     'D'|              |        |
|      1|      2|           1|  'AA'|'399.95'|              |        |
|      1|      2|           2|  'CC'|'939.95'|              |        |
|      2|      1|           2|'CCCC'| '49.95'|              |'anytag'|
|      1|      1|           2|   'C'| '49.95'|              |        |
|      1|      2|           3|  'AB'|   '239'|              |        |
|      2|      1|           1| 'AAA'|    '70'|              |        |
|      2|      2|           2|   'C'| '39.95'|'only on this'|        |
+-------+-------+------------+------+--------+--------------+--------+

最新更新