读取pyspark数据帧时筛选雪花表的行



我有一个巨大的雪花表。我想在pyspark中对表进行一些转换。我的雪花表有一个名为"快照"的列。我只想读取pyspark数据帧中的当前快照数据,并对过滤后的数据进行转换。

那么,有没有一种方法可以在读取spark数据帧中的雪花表时应用过滤行(我不想读取内存中的整个雪花表,因为它效率不高(,或者我需要读取整个雪花表(在spark数据框架中(,然后应用过滤来获取最新快照,如下所示?

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
snowflake_database="********"
snowflake_schema="********"
source_table_name="********"
snowflake_options = {
"sfUrl": "********",
"sfUser": "********",
"sfPassword": "********",
"sfDatabase": snowflake_database,
"sfSchema": snowflake_schema,
"sfWarehouse": "COMPUTE_WH"
}
df = spark.read 
.format(SNOWFLAKE_SOURCE_NAME) 
.options(**snowflake_options) 
.option("dbtable",snowflake_database+"."+snowflake_schema+"."+source_table_name) 
.load()
df = df.where(df.snapshot == current_timestamp()).collect()

有些形式的过滤器(过滤器其中Spark DataFrame的功能(Spark不会传递到Spark Snowflake连接器。这意味着,在某些情况下,您可能会获得比预期更多的记录。

最安全的方法是直接使用SQL查询:

df = spark.read 
.format(SNOWFLAKE_SOURCE_NAME) 
.options(**snowflake_options) 
.option("query","SELECT X,Y,Z FROM TABLE1 WHERE SNAPSHOT==CURRENT_TIMESTAMP()") 
.load()

当然,如果您想使用Spark DataFrame的filter/where功能,请检查Snowflake UI中的查询历史记录,看看生成的查询是否应用了正确的筛选器。

最新更新