Python Spark将事务分组到嵌套模式中



我想用一列"key"将存储在pyspark.sql.dataframe.DataFrame"ddf"中的事务分组,该列指示事务的来源(在本例中为客户ID)。

分组是一个相当昂贵的过程,所以我想以嵌套模式将组写入磁盘:

(key, [[c1, c2, c3,...], ...])

这将使我能够快速加载密钥上的所有事务,并在不重新运行分组的情况下开发复杂的自定义聚合器。

如何创建嵌套模式并将其写入磁盘?

我花了很长时间才弄清楚,尽管答案很简单,所以我想在这里发布我的解决方案。

首先通过key(客户ID)减少所有交易:

from operators import add
# ddf is a dataframe with a transaction in each row.  Key is the column
# we want to group the transactions by.
txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],) ).reduceByKey(add)

这得到了一个看起来像(key, [list of Rows])rdd。要将其写回dataframe,您需要构建模式。事务列表可以通过ArrayType进行建模。

from pyspark.sql import types as sqxt
txn_schema = sqxt.StructType([
    sqxt.StructField('Key', sqxt.StringType()),
    sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema))
])

然后,以这种结构将数据直接写入磁盘:

txnddf = txnrdd.toDF(schema=txn_schema)
txnddf.write.parquet('customer-transactions.parquet')

性能似乎还可以。如果不通过RDD,就找不到实现这一点的方法。

相关内容

  • 没有找到相关文章

最新更新