我的数据源是一个报表工具。它只以交叉表的形式输出报表,因此存在行组。当我将报告输出到CSV时,它看起来像这样,在ID列上有行组:
ID,ItemCode,Cost,Quantity
50002242,IT_01,23.00,45
,IL_42,34.00,60
,IM_65,15.00,15
,IS_23,2.00,200
50009999,IT_02,33.00,1
,IG_76,31.00,20
,IP_65,53.00,43
,IA_28,23.00,14
我需要将其导入spark并使用python将其转换为标准表羞耻。在本例中,这意味着确保ID在每条记录中,因此它看起来像这样:
ID,ItemCode,Cost,Quantity
50002242,IT_01,23.00,45
50002242,IL_42,34.00,60
50002242,IM_65,15.00,15
50002242,IS_23,2.00,200
50009999,IT_02,33.00,1
50009999,IG_76,31.00,20
50009999,IP_65,53.00,43
50009999,IA_28,23.00,14
感谢任何关于如何在pyspark python中完成此操作的专业知识。谢谢你!
假设:每个ID之间只缺少3行
由于这是某种带有行组的表,我想这可能是
的情况(df
# creating fake ID so we can reserve the initial order for later rankings
.withColumn('fid', F.monotonically_increasing_id())
# get "next ID (#1)" based on ID
.withColumn('ID_1', F.lag('ID') .over(W.orderBy('fid')))
# get "next ID (#2)" based on ID #1
.withColumn('ID_2', F.lag('ID_1').over(W.orderBy('fid')))
# get "next ID (#3)" based on ID #2
.withColumn('ID_3', F.lag('ID_2').over(W.orderBy('fid')))
# find first not null ID based on the IDs
.withColumn('ID', F.coalesce('ID', 'ID_1', 'ID_2', 'ID_3'))
# drop temp columns
.drop('fid', 'ID_1', 'ID_2', 'ID_3')
.show()
)
# Output
# +--------+--------+-----+--------+
# | ID|ItemCode| Cost|Quantity|
# +--------+--------+-----+--------+
# |50002242| IT_01|23.00| 45|
# |50002242| IL_42|34.00| 60|
# |50002242| IM_65|15.00| 15|
# |50002242| IS_23| 2.00| 200|
# |50009999| IT_02|33.00| 1|
# |50009999| IG_76|31.00| 20|
# |50009999| IP_65|53.00| 43|
# |50009999| IA_28|23.00| 14|
# +--------+--------+-----+--------+
正如我在前面的评论中所述,这看起来像CSV格式,除了偶尔缺失的列,当缺失时,它具有先前实际列值的隐含值。因此,我创建了一个包含这些数据的文件:-
ID,ItemCode,Cost,Quantity<br>
50002242,IT_01,23.00,45<br>
,IL_42,34.00,60<br>
,IM_65,15.00,15<br>
,IS_23,2.00,200<br>
50009999,IT_02,33.00,1<br>
,IG_76,31.00,20<br>
,IP_65,53.00,43<br>
,IA_28,23.00,14<br>
那么下面的代码:-
CSV = []
prev = None
with open('/Users/andy/ct.txt') as ct:
for line in ct:
t = line.strip().split(',')
if t[0] == '':
t[0] = prev
else:
prev = t[0]
CSV.append(','.join(t))
print(CSV)
…将构建一个字符串列表,这些字符串可以被视为真正的CSV