我在Azure Databricks上使用Spark上的pandas API,我有一个非常大的pyspark.pandas.DataFrame
,调用data
,包含不同产品的销售信息(作为时间序列),可以通过sku_id
列识别,销售信息可以用target
列标识。我有一个函数forecast
,可以为每个sku_id
实现预测。我认为这就像data.groupby("sku_id").target.apply(forecast)
一样简单,但是进入forecast
函数的时间序列是分区的,即forecast
看不到每个 SKU 的完整时间序列。我知道像len
这样的一些函数可以对批处理进行操作,然后以简单的方式合并结果( 添加每个分区的结果),forecast
不是这种情况,它需要每个 SKU 的整个时间序列。我检查了时间序列是否正在分区,如果使用短序列进行评估forecast
则会引发错误,如下所示
def forecast(y):
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...
我敢肯定,对于每个sku_id
,观测值都大于 100(更多),但apply
因NotImplementedError
而失败,因此正在对时间序列进行分区
因此,我想了解如何在不对时间序列进行分区的情况下将forecast
之类的方法应用于分组操作。我还没有找到有关它的文档,我想知道是否可以这样做,或者这是否是一种好的做法。
或者也许方法是其他方法:如何对数据框进行分区,以便每个sku_id
系列都位于同一分区中?
编辑
它适用于
- 火花数据框
applyInPandas
- Spark Pandas API 并在预测函数中指定返回类型,它运行没有问题并且不会进入加薪
def forecast(y) -> float:
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...
为什么会发生后者?文件说
此 API 执行一次函数以推断可能成本高昂的类型,例如,在聚合或排序后创建数据集时。为避免这种情况,请在 func 中指定返回类型,例如,如下所示
可能它尝试推断返回类型以在第一次实例中使用子样本定义架构?
如果你看一下代码本身,它会提供一些提示。该代码最终会引导您到达它推断架构的路径。 对我来说,这显示了您的代码可能失败的地方。 这是因为那时它会引发您的错误。
first = rdd.first()
再深入一点,它还展示了如何关闭此行为,而是使用采样方法:spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled
禁用此设置将为您提供证据,以确定这是否确实是问题所在。 如果您选择的样本数量至少大于所需len(100)
您也可以通过删除raise
来解决此问题。 这会导致问题。 您仍然可以使用累加器跟踪任何低于 100 的集合。这些是为了跟踪这种类型的数据奇数,如果您不想炸毁东西但仍跟踪奇怪情况(如少于 100 个时间序列事件)的计数,则可能是一个更好的调用。
也许如果你重新分区并缓存你的Spark DF?
data = data
.repartition(sc.defaultParallelism, ['sku_id'])
.cache()
并且不要使用.toPandas()
而是使用pandas_udf
:
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def forecast(y):
if len(y) <= 100:
raise NotImplementedError("Serie too short")
# Do the forecast...
#Then call the UDF to build the forecasts:
results = (
data
.groupBy('sku_id')
.apply(forecast_store_item)
)
这是从这里提取的。也许给你一些很好的见解