Spark提供了几种不同的方法来实现使用和返回Pandas DataFrames的UDF。我目前使用的是cogrouped版本,它以两个(共同分组的(Pandas DataFrames作为输入,并返回第三个。
为了在Spark DataFrames和Pandas DataFrames之间进行高效转换,Spark使用Apache Arrow内存布局,但从Arrow到Pandas再返回仍然需要转换。我真的很想直接访问Arrow数据,因为这就是我最终处理UDF中数据的方式(使用Polars(。
从Spark开始似乎很浪费->箭头->熊猫->箭头(Polars(在进入的路上,反向在返回。
import pyarrow as pa
import polars as pl
sql_context = SQLContext(spark)
data = [('James',[1, 2]),]
spark_df = sql_context.createDataFrame(data=data, schema = ["name","properties"])
df = pl.from_arrow(pa.Table.from_batches(spark_df._collect_as_arrow()))
print(df)
shape: (1, 2)
┌───────┬────────────┐
│ name ┆ properties │
│ --- ┆ --- │
│ str ┆ list[i64] │
╞═══════╪════════════╡
│ James ┆ [1, 2] │
└───────┴────────────┘