使用cache_result本质上与编写临时表做的事情相同吗



更新2022-08-31

虽然选择的答案确实证实了行为是相似的,但实际上存在一个盲点,即使用cache_result实现不会覆盖以前的表,这意味着在重新缓存新结果时,额外的存储将被耗尽。在下面的示例中,我们可以看到正在创建一个新的临时表。希望雪花能更新这一点。我现在用的是0.8.0。

>>> dff = session.create_dataframe([1,2,3])
>>> dff.cache_result().explain()
---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT  *  FROM (SNOWPARK_TEMP_TABLE_X3FCJ1U38A)
...
--------------------------------------------
>>> dff.cache_result().explain()
---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT  *  FROM (SNOWPARK_TEMP_TABLE_Z9H68STVDH)
....

原始问题

在snowpark ml中,snowpark DataFrame类的cache_result()方法是否本质上与写入临时表相同?看起来确实是这样,而且可以节省很多按键,但我只是想确定一下,并了解资源信用使用的任何潜在差异。

换句话说,下面这两种方法有什么不同吗?

sql = '''
select * from foo
'''
# Approach 1: saving to temporary table and then assigning the table
session.sql('...').write.mode('overwrite').save_as_table('my_tbl', table_type='temporary')
df = session.table('my_tbl')
# Approach 2: using Cache result
df = session.sql(sql).cache_result()

你说得对。与Spark相比,Snowpark中的Die缓存还有其他意义。df.cached_result()写入一个临时表。正如文件所述:

https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.html#snowflake.snowpark.DataFrame.cache_result

退货一个DataFrame对象,它将缓存的结果保存在临时表中

您也可以通过调用df.cached_result().explain()或Python API源代码进行验证

@df_collect_api_telemetry
def cache_result(
self, *, statement_params: Optional[Dict[str, str]] = None
) -> "DataFrame":
"""Caches the content of this DataFrame to create a new cached DataFrame.
All subsequent operations on the returned cached DataFrame are performed on the cached data
and have no effect on the original DataFrame.
Examples::
>>> create_result = session.sql("create temp table RESULT (NUM int)").collect()
>>> insert_result = session.sql("insert into RESULT values(1),(2)").collect()
>>> df = session.table("RESULT")
>>> df.collect()
[Row(NUM=1), Row(NUM=2)]
>>> # Run cache_result and then insert into the original table to see
>>> # that the cached result is not affected
>>> df1 = df.cache_result()
>>> insert_again_result = session.sql("insert into RESULT values (3)").collect()
>>> df1.collect()
[Row(NUM=1), Row(NUM=2)]
>>> df.collect()
[Row(NUM=1), Row(NUM=2), Row(NUM=3)]
>>> # You can run cache_result on a result that has already been cached
>>> df2 = df1.cache_result()
>>> df2.collect()
[Row(NUM=1), Row(NUM=2)]
>>> df3 = df.cache_result()
>>> # Drop RESULT and see that the cached results still exist
>>> drop_table_result = session.sql(f"drop table RESULT").collect()
>>> df1.collect()
[Row(NUM=1), Row(NUM=2)]
>>> df2.collect()
[Row(NUM=1), Row(NUM=2)]
>>> df3.collect()
[Row(NUM=1), Row(NUM=2), Row(NUM=3)]
Args:
statement_params: Dictionary of statement level parameters to be set while executing this action.
Returns:
A :class:`DataFrame` object that holds the cached result in a temporary table.
All operations on this new DataFrame have no effect on the original.
"""
temp_table_name = random_name_for_temp_object(TempObjectType.TABLE)
create_temp_table = self._session._plan_builder.create_temp_table(
temp_table_name, self._plan
)
self._session._conn.execute(
create_temp_table,
_statement_params=create_or_update_statement_params_with_query_tag(
statement_params, self._session.query_tag, SKIP_LEVELS_TWO
),
)
new_plan = self._session.table(temp_table_name)._plan
return DataFrame(session=self._session, plan=new_plan, is_cached=True)

def create_temp_table(self, name: str, child: SnowflakePlan) -> SnowflakePlan:
return self.build_from_multiple_queries(
lambda x: self.create_table_and_insert(
self.session, name, child.schema_query, x
),
child,
None,
child.schema_query,
is_ddl_on_temp_object=True,
)

所以我想说调用df.cached_result()和在自己的上创建临时表之间没有区别

最新更新