了解 pyspark 适用于 groupby



我在Azure Databricks上使用Spark上的pandas API,我有一个非常大的pyspark.pandas.DataFrame,调用data,包含不同产品的销售信息(作为时间序列),可以通过sku_id列识别,销售信息可以用target列标识。我有一个函数forecast,可以为每个sku_id实现预测。我认为这就像data.groupby("sku_id").target.apply(forecast)一样简单,但是进入forecast函数的时间序列是分区的,即forecast看不到每个 SKU 的完整时间序列。我知道像len这样的一些函数可以对批处理进行操作,然后以简单的方式合并结果( 添加每个分区的结果),forecast不是这种情况,它需要每个 SKU 的整个时间序列。我检查了时间序列是否正在分区,如果使用短序列进行评估forecast则会引发错误,如下所示

def forecast(y):
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...

我敢肯定,对于每个sku_id,观测值都大于 100(更多),但applyNotImplementedError而失败,因此正在对时间序列进行分区

因此,我想了解如何在不对时间序列进行分区的情况下将forecast之类的方法应用于分组操作。我还没有找到有关它的文档,我想知道是否可以这样做,或者这是否是一种好的做法。

或者也许方法是其他方法:如何对数据框进行分区,以便每个sku_id系列都位于同一分区中?

编辑

它适用于

  • 火花数据框applyInPandas
  • Spark Pandas API 并在预测函数中指定返回类型,它运行没有问题并且不会进入加薪
def forecast(y) -> float:
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...

为什么会发生后者?文件说

此 API 执行一次函数以推断可能成本高昂的类型,例如,在聚合或排序后创建数据集时。为避免这种情况,请在 func 中指定返回类型,例如,如下所示

可能它尝试推断返回类型以在第一次实例中使用子样本定义架构?

如果你看一下代码本身,它会提供一些提示。该代码最终会引导您到达它推断架构的路径。 对我来说,这显示了您的代码可能失败的地方。 这是因为那时它会引发您的错误。

first = rdd.first()

再深入一点,它还展示了如何关闭此行为,而是使用采样方法:spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled

禁用此设置将为您提供证据,以确定这是否确实是问题所在。 如果您选择的样本数量至少大于所需len(100)

您也可以通过删除raise来解决此问题。 这会导致问题。 您仍然可以使用累加器跟踪任何低于 100 的集合。这些是为了跟踪这种类型的数据奇数,如果您不想炸毁东西但仍跟踪奇怪情况(如少于 100 个时间序列事件)的计数,则可能是一个更好的调用。

也许如果你重新分区并缓存你的Spark DF?

data = data
.repartition(sc.defaultParallelism, ['sku_id'])
.cache()

并且不要使用.toPandas()而是使用pandas_udf

from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast(y):
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...
#Then call the UDF to build the forecasts:
results = (
data
.groupBy('sku_id')
.apply(forecast_store_item)
)

这是从这里提取的。也许给你一些很好的见解

相关内容

  • 没有找到相关文章

最新更新