我有以下CSV(示例)
id timestamp routeid creationdate parameters
1000 21-11-2016 22:55 14 21-11-2016 22:55 RSRP=-102,
1002 21-11-2016 22:55 14 21-11-2016 22:55 RA Req. SN=-146,TPC=4,RX Antennas=-8,
1003 21-11-2016 22:55 14 21-11-2016 22:55 RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,
基本上我想将参数从一列分成多列,如下所示:
id , timestamp, routeid, creationdate, RSRP ,RA REQ. SN, TPC,RX Antennas,MCS
因此,如果没有任何参数的值,我会将值设置为 NULL,例如:
1000 21-11-2016 22:55 14 21-11-2016 22:55 -102 NULL NULL NULL NULL
如果值存在,请填写行,
这是我尝试过的:
from pyspark import SparkContext
import os
import sys
from pyspark.sql import SQLContext
import itertools
import re
sc = SparkContext("local","Work")
sqlContext = SQLContext(sc)
df1 = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('file:///sample.csv')
def aaa(a):
aa = a.split(',', 15000)
filtered = filter(lambda p: not re.match(r'^s*$', p), aa)
listWithNoEmptyLines = [z for z in filtered if z != []]
for x in listWithNoEmptyLines:
ab = x.split("=")
AllList = []
rsrp = ""
ra_req_sn = ""
tpc = ""
rx_antenas = ""
mcs = ""
if 'RSRP' in ab:
rsrp = ab[1]
else:
rsrp = "NULL"
if 'RA Req. SN' in ab:
ra_req_sn = ab[1]
else:
ra_req_sn = "NULL"
if 'TPC' in ab:
tpc = ab[1]
else:
tpc = "NULL"
if 'RX Antennas' in ab:
rx_antenas = ab[1]
else:
rx_antenas = "NULL"
if 'MCS' in ab:
mcs = ab[1]
else:
mcs = "NULL"
return rsrp,ra_req_sn,tpc,rx_antenas
DFtoRDD = df1.rdd.map(list).map(lambda x: [str(x[1]), str(x[2]), str(x[3]), aaa(str(x[4]))])
print DFtoRDD.collect()
给我以下结果,
[['1000','21-11-2016 22:55', '14', '21-11-2016 22:55', ('-102', 'NULL', 'NULL', 'NULL')], ['1002',21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '-146', 'NULL', 'NULL')], ['1003','21-11-2016 22:55', '14', '21-11-2016 22:55', ('NULL', '134', 'NULL', 'NULL')]]
预期成果 :
id timestamp routeid creationdate RSRP RA Req. SN TPC RX Antennas MCS
1000 21-11-2016 22:55 14 21-11-2016 22:55 -102 NULL NULL NULL NULL
1002 21-11-2016 22:55 14 21-11-2016 22:55 NULL -146 4 -8 NULL
1003 21-11-2016 22:55 14 21-11-2016 22:55 NULL 134 -191 -91 -83
您需要按如下方式定义 udf,然后选择每个字段。我使用了与制表符分隔符相同的数据。
from pyspark.sql.functions import udf
from pyspark.sql.types import *
df1 = spark.read.format('com.databricks.spark.csv').options(header='true',delimiter='t').load('./sample.txt')
df1.show()
# +----+----------------+-------+----------------+--------------------+
# | id| timestamp|routeid| creationdate| parameters|
# +----+----------------+-------+----------------+--------------------+
# |1000|21-11-2016 22:55| 14|21-11-2016 22:55| RSRP=-102,|
# |1002|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=-146,T...|
# |1003|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=134,RX...|
# +----+----------------+-------+----------------+--------------------+
现在让我们定义我们的 UDF,如上所述:
import re
def f_(s):
pattern = re.compile("([^,=]+)=([0-9-]+)")
return dict(pattern.findall(s or ""))
我们可以直接在"简单"样本上测试函数:
f_("RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,")
# {'RA Req. SN': '134', 'RX Antennas': '-91', 'TPC': '-191', 'MCS': '-83'}
好的,它正在工作。我们现在可以注册以在 SQL 中使用:
spark.udf.register("f", f_, MapType(StringType(), StringType()))
spark.sql("SELECT f('RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,')").show()
# +---------------------------------------------------+
# |f(RA Req. SN=134,RX Antennas=-91,MCS=-83,TPC=-191,)|
# +---------------------------------------------------+
# | Map(RA Req. SN ->...|
# +---------------------------------------------------+
但是在您的情况下,我认为您会对每个字段的实际udf感兴趣:
extract = udf(f_, MapType(StringType(), StringType()))
df1.select(df1['*'], extract(df1['parameters']).getItem('RSRP').alias('RSRP')).show()
# +----+----------------+-------+----------------+--------------------+----+
# | id| timestamp|routeid| creationdate| parameters|RSRP|
# +----+----------------+-------+----------------+--------------------+----+
# |1000|21-11-2016 22:55| 14|21-11-2016 22:55| RSRP=-102,|-102|
# |1002|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=-146,T...|null|
# |1003|21-11-2016 22:55| 14|21-11-2016 22:55|RA Req. SN=134,RX...|null|
# +----+----------------+-------+----------------+--------------------+----+