我有一个Dask函数,它将列添加到现有的Dask数据框架中,这工作得很好:
df = pd.DataFrame({
'height': [6.21, 5.12, 5.85, 5.78, 5.98],
'weight': [150, 126, 133, 164, 203]
})
df_dask = dd.from_pandas(df, npartitions=2)
s = """
obj.weight + 100
"""
df_dask['new_weight'] = df_dask.apply(lambda obj: eval(s), meta=dict, axis=1)
现在,我想添加两列而不是一列:
s = """
obj.weight + 100, obj.weight + 200
"""
df_dask['new_weight','new_weight2'] = df_dask.apply(lambda obj: eval(s), meta=dict, axis=1)
但是我得到
NotImplementedError: Item assignment with <class 'tuple'> not supported
这是否意味着这是不支持的还是我做错了什么?如果不支持,是否有解决方案?我需要的是返回一个浮点数列表。
首先,确保基于pandas
的解决方案交付预期的结果将是有用的:
df = pd.DataFrame({
'height': [6.21, 5.12, 5.85, 5.78, 5.98],
'weight': [150, 126, 133, 164, 203]
})
s = """
obj.weight + 100, obj.weight + 200
"""
df['new_weight'], df['new_weight2'] = zip(*df.apply(lambda obj: eval(s), axis=1))
print(df)
# height weight new_weight new_weight2
# 0 6.21 150 250.0 350.0
# 1 5.12 126 226.0 326.0
# 2 5.85 133 233.0 333.0
# 3 5.78 164 264.0 364.0
# 4 5.98 203 303.0 403.0
现在,对于dask
溶液。由于每个分区都是pandas
数据框,最简单的解决方案(对于基于行的转换)是将pandas
代码包装成一个函数并将其插入map_partitions
:
df = pd.DataFrame({
'height': [6.21, 5.12, 5.85, 5.78, 5.98],
'weight': [150, 126, 133, 164, 203]
})
s = """
obj.weight + 100, obj.weight + 200
"""
df_dask = dd.from_pandas(df, npartitions=2)
def new_cols(df):
df = df.copy()
df['new_weight'], df['new_weight2'] = zip(*df.apply(lambda obj: eval(s), axis=1))
return df
df_dask = df_dask.map_partitions(new_cols)
print(df_dask.compute())
# height weight new_weight new_weight2
# 0 6.21 150 250.0 350.0
# 1 5.12 126 226.0 326.0
# 2 5.85 133 233.0 333.0
# 3 5.78 164 264.0 364.0
# 4 5.98 203 303.0 403.0
根据您的用例的具体情况,上面的模式可能有提高效率的余地。