我有一个这样的PySpark数据框架:
data = [{"ID": 1, "Value": 3478},
{"ID": 2, "Value": 10},
{"ID": 3, "Value": 3323},
{"ID": 1, "Value": 2300},
{"ID": 2, "Value": 40},
{"ID": 3, "Value": 93},
{"ID": 1, "Value": 500},
{"ID": 2, "Value": 50},
{"ID": 3, "Value": 73}]
df = spark.createDataFrame(data)
df.show(20, False)
# +---+-----+
# |ID |Value|
# +---+-----+
# |1 |3478 |
# |2 |10 |
# |3 |3323 |
# |1 |2300 |
# |2 |40 |
# |3 |93 |
# |1 |500 |
# |2 |50 |
# |3 |73 |
# +---+-----+
我有另一个Spark数据框架,像这样:
final_data = [{"ID": 1, "Value": 1234563478},
{"ID": 2, "Value": 2134510},
{"ID": 3, "Value": 789033323}]
final_df = spark.createDataFrame(final_data)
final_df.show(5, False)
# +---+----------+
# |ID |Value |
# +---+----------+
# |1 |1234563478|
# |2 |2134510 |
# |3 |789033323 |
# +---+----------+
现在我的要求是根据df值对final_df进行过滤。
from pyspark.sql import functions as F
final_df.filter(F.col("ID") == "1").where(F.col("Value").like('%3478%') | F.col("ID").like('%2300%') | F.col("ID").like('%500%')).show(5, False)
# +---+----------+
# |ID |Value |
# +---+----------+
# |1 |1234563478|
# +---+----------+
在这里,我手动将df值作为值传递给like
条件。是否有一种动态的方法来调用final_df中的df值。今天我只有3个匹配id == 1的值,明天我可能有10个值。如何在PySpark数据框中动态设置like
条件?
不确定你真正期望的是什么。让我们尝试使用列表和fstring,如果你在声明变量并动态选择它们之后。如果不是你想要的,让我知道。
s='3478'
s1 ='2300'
s2 ='500'
lst=['1']
from pyspark.sql import functions as F
final_df.filter(F.col("ID").isin(lst)).where(F.col("Value").like(f'%{s}%') | F.col("ID").like(f'%{s1}%')| F.col("ID").like(f'%{s2}%')).show(5,False)
您可能需要一些python来创建具有许多动态条件的过滤器。我假设你的df相对较小,所以我使用collect
filter_list = df.groupBy('ID').agg(F.collect_set('Value').alias('vals')).collect()
row_filter = []
for r in filter_list:
s = f"ID={r['ID']} AND "
s += '(' + ' OR '.join([f"Value LIKE '%{v}%'" for v in r['vals']]) + ')'
row_filter.append(s)
final_df.filter(f"({') OR ('.join(row_filter)})").show()
我认为下面的代码必须在Python 2.7中工作。
filter_list = df.groupBy('ID').agg(F.collect_set('Value').alias('vals')).collect()
row_filter = []
for r in filter_list:
s = "ID=" + str(r['ID']) + " AND "
s += '(' + ' OR '.join(["Value LIKE '%" + str(v) + "%'" for v in r['vals']]) + ')'
row_filter.append(s)
final_df.filter("(" + ") OR (".join(row_filter) + ")").show()