所以我有一个关于如何用Python在Spark中编程的问题。请记住,我不是在问你编码它,我是在问如何做它。我太纠结于如何在Spark中做它了。任何帮助或想法都将不胜感激。
1( 从数据库中获取所有行,并创建pyspark.sql.DataFrame-DONE
2(转换所述DataFrame中的行/DONEbr/>3(从DataFrame中仅创建两列[ID,GROUPID]-DONE
广播变量的原因是用于分段/群集
4(循环DataFrame并搜索广播变量,查看此迭代GROUPID是否与任何OTHER ID一起存在。如果未找到记录,则为NULL此迭代GROUPID
示例:广播变量
+-------+---------+
| ID| GROUPID|
+-------+---------+
| 363345| 95124|
| 363356| 95124|
| 363359| 88896|
| 363361| 50012|<===== only one of this groupid in Broadcast variable
| 375362| 62551|
| 363487| 62551|
| 363489| 88896|
+-------+---------+
需要循环DataFrame(300K+行(并将广播变量检查为空GROUPID
+------+---------+-------+----+-------+------------+
| ID|PRODUCTID| ARM|SORT|GROUPID| NAME|
+------+---------+-------+----+-------+------------+
|363345| 523927|5888208| 10| 95124|Enalapril...|
|363356| 523927|5888390| 10| 95124|LISINOPL5...|
|363359| 523927|5888444| 10| 88896|RANTUDEUR...|
|363361| 523927|5888450| 10| 50012|POLYALFA1...|<===== This record GROUPID should be nulled, only one record found in Broadcast variable
|375362| 523927|5888527| 10| 62551|POLAUTFA2...|
|375360| 523927|5894976| 10| null|ENCERACAF...|
|363487| 523927|5905131| 10| 62551|Poly Alco...|
|363488| 523927|5905148| 10| null| Poly...|
|363489| 523927|5905160| 10| 88896|Eapril688...|
|363495| 523927|5909258| 10| null| Eapril77...|
+------+---------+-------+----+-------+------------+
CREATING the DataFrame
df = spark.createDataFrame(
[
(363345, 523927, 5888208, 10, 95124, 'Enalapril...'),
(363356, 523927, 5888390, 10, 95124, 'LISINOPL5...'),
(363359, 523927, 5888444, 10, 88896, 'RANTUDEUR...'),
(363361, 523927, 5888450, 10, 50012, 'POLYALFA1...'),
(375362, 523927, 5888527, 10, 62551, 'POLAUTFA2...'),
(375360, 523927, 5894976, 10, None, 'ENCERACAF...'),
(363487, 523927, 5905131, 10, 62551, 'Poly Alco...'),
(363488, 523927, 5905148, 10, None, 'Poly...'),
(363489, 523927, 5905160, 10, 88896, 'Eapril688...'),
(363495, 523927, 5909258, 10, None, 'Eapril77...')
],
['ID', 'PRODUCTID', 'ARM', 'SORT', 'GROUPID', 'NAME']
)
创建广播变量
ID_GROUPID_Dictionary = {}
for row in df.rdd.collect():
if(row['GROUPID'] != None):
ID_GROUPID_Dictionary[int(row['ID'])] = int(row['GROUPID'])
df_FROMDB_READONLY_BROADCAST = spark.sparkContext.broadcast(ID_GROUPID_Dictionary)
使用广播似乎有些过头了,这里还有很多其他选项可供使用,一个例子可以是通过groupid聚合ByKey(一个数据集函数(您的数据,然后运行平面图检查大小是否大于2。如果小于2,则将值更改为null;如果否,则按原样返回所有值。您可以在sql和dataframe中或者使用RDD来完成这些工作。这取决于你,但你的代码会更干净。对于任何问题,请随时评论我的回答。
顺便说一句,这就是我解决这个问题的方法@伊利亚谢谢你的便条。如果有人看到更好的方法,请告诉我。
GROUPID_countList = df.groupBy(df.GROUPID).count().collect()
replaceGROUPIDUDF = udf(lambda x: None if x is None else GROUPIDCount(x), IntegerType())
def GROUPIDCount(grpid):
for x in GROUPID_countList:
if(x["GROUPID"] == grpid): #DEBUG print("{} - {} : {} ".format(x["GROUPID"], grpid, x["count"] ))
return x["count"]
return 0
df = df.withColumn('GROUPID_null', (when(replaceGROUPIDUDF(df['GROUPID']) < 2, lit(None).cast("string"))
.otherwise(df['GROUPID'])))
.drop('GROUPID')
.withColumnRenamed('GROUPID_null', 'GROUPID')