PySpark动态类操作



我有一个这样的PySpark数据框架:

data = [{"ID": 1, "Value": 3478},
{"ID": 2, "Value": 10},
{"ID": 3, "Value": 3323},
{"ID": 1, "Value": 2300},
{"ID": 2, "Value": 40},
{"ID": 3, "Value": 93},
{"ID": 1, "Value": 500},
{"ID": 2, "Value": 50},
{"ID": 3, "Value": 73}]
df = spark.createDataFrame(data)
df.show(20, False)
# +---+-----+                                                                     
# |ID |Value|
# +---+-----+
# |1  |3478 |
# |2  |10   |
# |3  |3323 |
# |1  |2300 |
# |2  |40   |
# |3  |93   |
# |1  |500  |
# |2  |50   |
# |3  |73   |
# +---+-----+

我有另一个Spark数据框架,像这样:

final_data = [{"ID": 1, "Value": 1234563478},
{"ID": 2, "Value": 2134510},
{"ID": 3, "Value": 789033323}]
final_df = spark.createDataFrame(final_data)
final_df.show(5, False)
# +---+----------+                                                                
# |ID |Value     |
# +---+----------+
# |1  |1234563478|
# |2  |2134510   |
# |3  |789033323 |
# +---+----------+

现在我的要求是根据df值对final_df进行过滤。

from pyspark.sql import functions as F
final_df.filter(F.col("ID") == "1").where(F.col("Value").like('%3478%') | F.col("ID").like('%2300%') | F.col("ID").like('%500%')).show(5, False)
# +---+----------+                                                                
# |ID |Value     |
# +---+----------+
# |1  |1234563478|
# +---+----------+

在这里,我手动将df值作为值传递给like条件。是否有一种动态的方法来调用final_df中的df值。今天我只有3个匹配id == 1的值,明天我可能有10个值。如何在PySpark数据框中动态设置like条件?

不确定你真正期望的是什么。让我们尝试使用列表和fstring,如果你在声明变量并动态选择它们之后。如果不是你想要的,让我知道。

s='3478'
s1 ='2300'
s2 ='500'
lst=['1']
from pyspark.sql import functions as F
final_df.filter(F.col("ID").isin(lst)).where(F.col("Value").like(f'%{s}%') | F.col("ID").like(f'%{s1}%')| F.col("ID").like(f'%{s2}%')).show(5,False)

您可能需要一些python来创建具有许多动态条件的过滤器。我假设你的df相对较小,所以我使用collect

filter_list = df.groupBy('ID').agg(F.collect_set('Value').alias('vals')).collect()
row_filter = []
for r in filter_list:
s = f"ID={r['ID']} AND "
s += '(' + ' OR '.join([f"Value LIKE '%{v}%'" for v in r['vals']]) + ')'
row_filter.append(s)
final_df.filter(f"({') OR ('.join(row_filter)})").show()

我认为下面的代码必须在Python 2.7中工作。

filter_list = df.groupBy('ID').agg(F.collect_set('Value').alias('vals')).collect()
row_filter = []
for r in filter_list:
s = "ID=" + str(r['ID']) + " AND "
s += '(' + ' OR '.join(["Value LIKE '%" + str(v) + "%'" for v in r['vals']]) + ')'
row_filter.append(s)
final_df.filter("(" + ") OR (".join(row_filter) + ")").show()

相关内容

  • 没有找到相关文章

最新更新