我正在尝试使用Snowpark将Snowflake中的一些季度数据重新采样为日常数据,我在PySpark中有一些代码可以实现这一点;然而,似乎函数";explode(("在Snowpark没有得到支持。
# define function to create date range
def date_range(t1, t2, step=60*60*24):
"""Return a list of equally spaced points between t1 and t2 with stepsize step."""
return [t1 + step*x for x in range(int((t2-t1)/step)+1)]
def resample(df, date_column='REPORTING_DATE', groupby='ID'):
# define udf
date_range_udf = udf(date_range)
# obtain min and max of time period for each group
df_base = df.groupBy(groupby)
.agg(F.min(date_column).cast('integer').alias('epoch_min')).select('epoch_min', F.current_timestamp().cast('integer').alias('epoch_max'))
# generate timegrid and explode
df_base = df_base.withColumn(date_column, F.explode(date_range_udf("epoch_min", "epoch_max")))
.drop('epoch_min', 'epoch_max')
# convert epoch to timestamp
df_base = df_base.withColumn(date_column, F.date_format(df_base[date_column].cast(dataType=T.TimestampType()), 'yyyy-MM-dd')).orderBy(date_column, ascending=True)
# outer left join on reporting_date to resample data
df = df_base.join(df, [date_column], 'leftouter')
# window for forward fill
window = Window.orderBy(date_column).partitionBy(groupby).rowsBetween(Window.unboundedPreceding, Window.currentRow)
# apply forward fill to all columns
for column in df.columns:
df = df.withColumn(column, F.last(column, ignorenulls=True).over(window))
return df
有人能提出一个替代方案/提供一段示例代码来帮助我吗。谢谢:(
我创建了一个小示例:
# coding=utf-8
from snowflake.snowpark import Session
from snowflake.snowpark.functions import udf, col, month, sql_expr
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import StringType, IntegerType, StructType, StructField, ArrayType
from snowflake.snowpark import DataFrame
import os
connection_parameters = {
"account": os.environ["SNOW_ACCOUNT"],
"user": os.environ["SNOW_USER"],
"password": os.environ["SNOW_PASSWORD"],
"role": os.environ["SNOW_ROLE"],
"warehouse": os.environ["SNOW_WAREHOUSE"],
"database": os.environ["SNOW_DATABASE"],
"schema": os.environ.get("SNOW_SCHEMA")
}
session = Session.builder.configs(connection_parameters).create()
# define function to create date range
def date_range(t1, t2, step=60*60*24):
"""Return a list of equally spaced points between t1 and t2 with stepsize step."""
return [t1 + step*x for x in range(int((t2-t1)/step)+1)]
data=[(1664627688,1664973288),(1641040488,1664973288)]
schema = StructType([
StructField("epoch_min", IntegerType(), True),
StructField("epoch_max", IntegerType(), True)])
df=session.createDataFrame(data,schema).toDF("epoch_min","epoch_max")
date_range_udf = udf(date_range,input_types=[IntegerType(),IntegerType()], return_type=ArrayType())
df.withColumn('REPORTING_DATE',date_range_udf(col("epoch_min"), col("epoch_max"))).show()
# Adding a new method
def withColumnExplode(self,colname,expr):
return self.join_table_function('flatten',date_range_udf(col("epoch_min"), col("epoch_max"))).drop(["SEQ","KEY","PATH","INDEX","THIS"]).rename("VALUE",colname)
DataFrame.withColumnExplode = withColumnExplode
df.withColumnExplode('REPORTING_DATE',date_range_udf(col("epoch_min"), col("epoch_max")))
.drop('epoch_min', 'epoch_max').show(20)
这是你想要的吗?