Spark Python根据决策从Dataframe循环设置字段NULL创建新的Dataframe



所以我有一个关于如何用Python在Spark中编程的问题。请记住,我不是在问你编码它,我是在问如何做它。我太纠结于如何在Spark中做它了。任何帮助或想法都将不胜感激。

1( 从数据库中获取所有行,并创建pyspark.sql.DataFrame-DONE
2(转换所述DataFrame中的行/DONEbr/>3(从DataFrame中仅创建两列[ID,GROUPID]-DONE
广播变量的原因是用于分段/群集
4(循环DataFrame并搜索广播变量,查看此迭代GROUPID是否与任何OTHER ID一起存在。如果未找到记录,则为NULL此迭代GROUPID

示例:广播变量

+-------+---------+
|     ID|  GROUPID|
+-------+---------+
| 363345|    95124|
| 363356|    95124|
| 363359|    88896|
| 363361|    50012|<===== only one of this groupid in Broadcast variable
| 375362|    62551|
| 363487|    62551|
| 363489|    88896|
+-------+---------+


需要循环DataFrame(300K+行(并将广播变量检查为空GROUPID

+------+---------+-------+----+-------+------------+
|    ID|PRODUCTID|    ARM|SORT|GROUPID|        NAME|
+------+---------+-------+----+-------+------------+
|363345|   523927|5888208|  10|  95124|Enalapril...|
|363356|   523927|5888390|  10|  95124|LISINOPL5...|
|363359|   523927|5888444|  10|  88896|RANTUDEUR...|
|363361|   523927|5888450|  10|  50012|POLYALFA1...|<===== This record GROUPID should be nulled, only one record found in Broadcast variable
|375362|   523927|5888527|  10|  62551|POLAUTFA2...|
|375360|   523927|5894976|  10|   null|ENCERACAF...|
|363487|   523927|5905131|  10|  62551|Poly Alco...|
|363488|   523927|5905148|  10|   null|     Poly...|
|363489|   523927|5905160|  10|  88896|Eapril688...|
|363495|   523927|5909258|  10|   null| Eapril77...|
+------+---------+-------+----+-------+------------+


CREATING the DataFrame
df = spark.createDataFrame(
[
(363345, 523927, 5888208, 10, 95124, 'Enalapril...'), 
(363356, 523927, 5888390, 10, 95124, 'LISINOPL5...'), 
(363359, 523927, 5888444, 10, 88896, 'RANTUDEUR...'), 
(363361, 523927, 5888450, 10, 50012, 'POLYALFA1...'), 
(375362, 523927, 5888527, 10, 62551, 'POLAUTFA2...'), 
(375360, 523927, 5894976, 10,  None, 'ENCERACAF...'), 
(363487, 523927, 5905131, 10, 62551, 'Poly Alco...'), 
(363488, 523927, 5905148, 10,  None, 'Poly...'), 
(363489, 523927, 5905160, 10, 88896, 'Eapril688...'), 
(363495, 523927, 5909258, 10,  None, 'Eapril77...')
],
['ID', 'PRODUCTID', 'ARM', 'SORT', 'GROUPID', 'NAME']
)

创建广播变量

ID_GROUPID_Dictionary = {}    
for row in df.rdd.collect():
if(row['GROUPID'] != None):
ID_GROUPID_Dictionary[int(row['ID'])] = int(row['GROUPID']) 
df_FROMDB_READONLY_BROADCAST = spark.sparkContext.broadcast(ID_GROUPID_Dictionary) 

使用广播似乎有些过头了,这里还有很多其他选项可供使用,一个例子可以是通过groupid聚合ByKey(一个数据集函数(您的数据,然后运行平面图检查大小是否大于2。如果小于2,则将值更改为null;如果否,则按原样返回所有值。您可以在sql和dataframe中或者使用RDD来完成这些工作。这取决于你,但你的代码会更干净。对于任何问题,请随时评论我的回答。

顺便说一句,这就是我解决这个问题的方法@伊利亚谢谢你的便条。如果有人看到更好的方法,请告诉我。

GROUPID_countList = df.groupBy(df.GROUPID).count().collect()                   
replaceGROUPIDUDF = udf(lambda x: None if x is None else GROUPIDCount(x), IntegerType())      
def GROUPIDCount(grpid):
for x in GROUPID_countList:            
if(x["GROUPID"] == grpid): #DEBUG print("{} - {} : {} ".format(x["GROUPID"], grpid, x["count"] ))
return x["count"]             
return 0
df = df.withColumn('GROUPID_null', (when(replaceGROUPIDUDF(df['GROUPID']) < 2, lit(None).cast("string"))
.otherwise(df['GROUPID']))) 
.drop('GROUPID') 
.withColumnRenamed('GROUPID_null', 'GROUPID')