如何将Spark数据帧转换为Pandas并返回Kedro



我试图了解在Kedro中,在不创建冗余转换步骤的情况下,将来自一个节点的Spark数据帧转换为另一个节点所需的Pandas的最佳方式是什么。

Kedro目前支持两种策略:

使用代码转换功能

这需要为同一数据集定义两个DataCatalog条目,在catalog.yml:中以通用格式(Parquet、JSON、CSV等(处理同一文件

my_dataframe@spark:
type: kedro.contrib.io.pyspark.SparkDataSet
filepath: data/02_intermediate/data.parquet
my_dataframe@pandas:
type: ParquetLocalDataSet
filepath: data/02_intermediate/data.parquet

然后像这样在管道中使用它们:

Pipeline([
node(my_func1, "spark_input", "my_dataframe@spark"),
node(my_func2, "my_dataframe@pandas", "output"),
])

在这种情况下,kedro理解my_dataframe在两种情况下都是相同的数据集,并正确解析节点执行顺序。同时,kedro将使用SparkDataSet实现进行保存,ParquetLocalDataSet用于加载,因此第一个节点应输出pyspark.sql.DataFrame,而第二个节点将接收pandas.Dataframe

使用Pandas to Spark和Spark to Pandas节点装饰器

注意:Spark <-> Pandas内存转换因其内存需求而臭名昭著,因此只有当已知数据帧很小时,这才是一个可行的选项。

可以根据文档装饰节点:

from spark import get_spark
from kedro.contrib.decorators import pandas_to_spark
@pandas_to_spark(spark_session)
def my_func3(data):
data.show() # data is pyspark.sql.DataFrame

甚至整个管道:

Pipeline([
node(my_func4, "pandas_input", "some_output"),
...
]).decorate(pandas_to_spark)

相关内容

  • 没有找到相关文章

最新更新