如何解决PySpark UDF中带有边缘情况的赋值问题(如Hungarian/linear_sum_assignment



我有一个分配问题,我想问SO社区最好的方法去实现我的spark数据框架(利用spark 3.1+)。我将先描述问题,然后再谈实现。

问题是这样的:我有多达N个任务和多达N个个体(在这个问题中,N=10)。每个人都有执行每个任务的成本,其中最小成本为0美元,最大成本为10美元。这是一种匈牙利算法问题,有一些注意事项。

  1. 在某些情况下,任务和/或人员少于10人,不给某人分配任务(或任务不分配给个人)是可以的。
  2. [更复杂的边缘情况/我遇到麻烦的那个]-列表中可能有一个任务具有标志multiTask=True(不能超过1个multiTask,并且可能没有)。如果一个工人对该多任务的成本低于x,则自动将其分配到该多任务中,并在优化过程中认为该多任务已占用。
    • 我将分享几个例子。在本例中,分配给多任务的x值为1。
      • 如果10个工人中有1个在多任务上的成本为0.25,他被分配到多任务,然后其他9个工人将被分配到其他9个任务
      • 如果10个工人中有2个有成本<1在multiTask上,他们两个都被分配到multiTask,然后其他8个工人将被分配到剩下的9个任务中的8个。1任务不会分配给任何人。
      • 如果所有10个工人都有成本<1上multiTask,所有这些都被分配到multiTask。这是非常罕见的,但可能的。
      • 如果没有工人有成本<1 .在multiTask上,multiTask在优化期间只会分配给一个人,以最小化成本。

下面是spark数据框架的样子。注意:为了简单起见,我正在展示一个N=3(3个任务,3个个人)的例子。

from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=220, cost=1.50, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=110, cost=2.90, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=129, taskId=190, cost=0.80, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=220, cost=1.80, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=110, cost=0.90, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=990, taskId=190, cost=9.99, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=220, cost=1.20, isMultiTask=False),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=110, cost=0.25, isMultiTask=True),
Row(date='2019-08-01', locationId='z2-NY', workerId=433, taskId=190, cost=4.99, isMultiTask=False)
])
df = spark.createDataFrame(rdd)

你会看到有一个日期/位置,因为我需要为每个日期/位置分组解决这个分配问题。我打算通过给每个工人和任务分配一个"索引"来解决这个问题。根据它们的id使用dense_rank(),然后使用pandas UDF,根据索引填充N x N numpy数组,并调用linear_sum_assignment函数。然而,我不相信这个计划会起作用,因为我在multiTask中列出了第二个边缘情况。

worker_order_window = Window.partitionBy("date", "locationId").orderBy("workerId")
task_order_window = Window.partitionBy("date", "locationId").orderBy("taskId")
# get the dense_rank because will use this to assign a worker ID an index for the np array for linear_sum_assignment
# dense_rank - 1 as arrays are 0 indexed
df = df.withColumn("worker_idx", dense_rank().over(worker_order_window) - 1) 
df = df.withColumn("task_idx", dense_rank().over(task_order_window) - 1)

def linear_assignment_udf(pandas_df: pd.DataFrame) -> pd.DataFrame:
df_dict = pandas_df.to_dict('records')
# in case there are less than N rows/columns
N = max(pandas_df.shape[0], pandas_df.shape[1])
arr = np.zeros((N,N))
for row in df_dict: 
# worker_idx will be the row number, task idx will be the col number
worker_idx = row.get('worker_idx')
task_idx = row.get('task_idx')
arr[worker_idx][task_idx] = row.get('cost')
rids, cids = linear_sum_assignment(n)

return_list = []
# now want to return a dataframe that says which task_idx a worker has 
for r, c in zip(rids, cids):
for d in df_dict: 
if d.get('worker_idx') == r:
d['task_assignment'] = c
return_list.append(d)
return pd.DataFrame(return_list)



schema = StructType.fromJson(df.schema.jsonValue()).add('task_assignment', 'integer')
df = df.groupBy("date", "locationId").applyInPandas(linear_assignment_udf, schema)
df = df.withColumn("isAssigned", when(col("task_assignment") == col("task_idx"), True).otherwise(False))
如您所见,本例根本不包括multiTask。我想以最有效的方式解决这个问题,这样我就不会被熊猫的udf或scipy所束缚。

我对你正在使用的库一无所知,所以我无法帮助你编写代码,但我认为你应该分两步完成:

  1. 如果需要将工人分配给多任务,则将其分配给该多任务。如果有人被分配到这个任务,不要把它包括在你的成本矩阵中。
  2. 正常使用匈牙利算法(或其他替代算法)分配工人到任务。

基本的匈牙利算法只适用于方形成本矩阵,看起来你已经通过用0填充成本矩阵正确地处理了这个问题,但是有一些修改的算法适用于矩形矩阵。您可能想看看是否可以访问其中一个替代方案,因为它可能会快得多。

最新更新