如何创建一个函数,使用字符串列表来迭代下面的内容?意图是a、b和c表示用户上传的表。目标是以编程方式迭代,无论用户上传多少个表。我只是想拉出表的新行计数。
mylist = df.select('S_ID').distinct().rdd.flatMap(lambda x: x).collect()
mylist
>> ['a', 'b', 'c']
##Count new rows by S_ID type
a = df.filter(df.S_ID == 'a').count()
b = df.filter(df.S_ID == 'b').count()
c = df.filter(df.S_ID == 'c').count()
##Count current rows from Snowflake
a_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'a'").load()
a_current = a_current.count()
b_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'b'").load()
b_current = b_current.count()
c_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'c'").load()
c_current = c_current.count()
##Calculate count of new rows
a_new = a - a_current
a_new = str(a_new)
b_new = b - b_current
b_new = str(b_new)
c_new = c - c_current
c_new = str(c_new)
像这样:
new_counts_list = []
for i in mylist:
i = df.filter(df.S_ID == 'i').count()
i_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = 'i'").load()
i_current = i_current.count()
i_new = i - i_current
i_new = str(i_new)
new_counts_list.append(i)
我坚持保留{names: new_counts}
因为它属于:
我坚持保持{names: new_counts}
,在for循环结束时可以使用
new_counts_list[i]=i_new
不是
new_counts_list.append(i)
假设您更改了new_counts_list
的初始化方式。即初始化为字典(new_counts_list={}
)而不是列表。
您似乎也在硬编码文字值'i'
,这是一个字符串,而不是使用变量i
(即)在您提出的解决方案中没有引号。更新后的解决方案可能看起来像
new_counts_list={}
for i in mylist:
i = df.filter(df.S_ID == i).count()
i_current = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query", "select R_ID FROM mytable WHERE S_ID = '{0}'".format(i)).load()
i_current = i_current.count()
i_new = i - i_current
i_new = str(i_new)
new_counts_list[i]=i_new
另一个注意事项,当您的方法,即依次循环遍历我列表中的每个S_ID
并运行操作时,即
- 运行动作
collect
将所有S_ID
从初始数据帧df
拉到驱动节点到列表mylist
- 分别计算
S_ID
在初始数据帧中的出现次数,然后执行另一个可能昂贵的(IO读取/网络通信/shuffle)collect()
- 使用
spark.read.format(SNOWFLAKE_SOURCE_NAME)
创建另一个数据帧,在执行计数之前将每个S_ID
过滤的所有记录加载到内存中 - 查找初始数据帧与雪花源之间的差异
将工作,它是昂贵的IO读取和基于您的集群/设置,潜在的昂贵的网络通信和洗牌。
您可以考虑使用groupby来减少执行可能非常昂贵的collect
的次数。此外,您还可以将初始数据框连接到雪花源,并让spark将您的操作优化为分布在集群/设置中的延迟执行计划。此外,与您对snowflake源使用下推过滤器的方式类似,您可以在该查询中组合所有选择的S_ID
,以允许snowflake在一次读取中减少所有所需的结果。你不需要一个循环。这可能看起来像:
方法1
在这种方法中,我将提供一个纯火花解决方案来实现您想要的结果
from pyspark.sql import functions as F
# Ask spark to select only the `S_ID` and group the data but not execute the transformation
my_exiting_counts_df = df.select('S_ID').groupBy('S_ID').count()
# Ask spark to select only the `S_ID` counts from the snowflake source
current_counts_df = (
spark.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfOptions)
.option("query", "select R_ID, COUNT(1) as cnt FROM mytable GROUP BY R_ID")
)
# Join both datasets which will filter to only selected `S_ID`
# and determine the differences between the existing and current counts
results_df = (
my_exiting_counts_df.alias("existing")
.join(
current_counts_df.alias("current"),
F.col("S_ID")=F.col("R_ID"),
"inner"
)
.selectExpr(
"S_ID",
"count - cnt as actual_count"
)
)
# Execute the above transformations with `collect` and
# Convert the dictionary values in the list above to your desired final dictionary
new_counts = {}
for row in results_df.collect():
new_counts[row['S_ID']]=row['actual_count']
# your desired results are in `new_counts`
方法2
在这种方法中,我将收集组by的结果,然后使用它来优化对雪花模式的下推查询,以返回所需的结果。
my_list_counts = df.select('S_ID').groupBy('S_ID').count()
selected_sids = []
case_expression = ""
for row in my_list_counts:
selected_sids.append(row['S_ID'])
case_expression = case_expression + " CASE WHEN R_ID='{0}' THEN {0} ".format(
row['S_ID'],
row['count']
)
# The above has a table with columns `S_ID` and `count` where the
# latter is the number of occurrences of `S_ID` in the dataset `df`
snowflake_push_down_query="""
SELECT
R_ID AS S_ID
((CASE
{0}
END) - cnt) as actual_count
FROM (
SELECT
R_ID,
COUNT(1) AS cnt
FROM
mytable
WHERE
R_ID IN ('{1}')
GROUP BY
R_ID
) t
""".format(
case_expression,
"','".join(selected_sids)
)
results_df = (
spark.read
.format(SNOWFLAKE_SOURCE_NAME)
.options(**sfOptions)
.option("query", snowflake_push_down_query)
)
# Execute the above transformations with `collect` and
# Convert the dictionary values in the list above to your desired final dictionary
new_counts = {}
for row in results_df.collect():
new_counts[row['S_ID']]=row['actual_count']
# your desired results are in `new_counts`
让我知道这是否适合你。