我有一个格式(时间戳、价格、金额(的价格更新表。时间戳是日期时间、价格分类和金额浮动64。时间戳列设置为索引。我的目标是在每个时间点获得每个价格水平的可用金额。首先,我使用透视将价格分散到列中,然后向前填充。
pivot = price_table.pivot_table(index = 'timestamp',
columns = 'price', values = 'amount')
pivot_ffill = pivot.fillna(method = 'ffill')
我可以compute
或head
应用于pivot_ffill
并且效果很好。显然,表的开头仍有 NA 尚未更新。当我申请时
pivot_nullfill = pivot_ffill.fillna(0)
pivot_nullfill.head()
我确实收到错误 The columns in the computed data do not match the columns in the provided metadata
.我尝试用 0.0
或 float(0)
替换零,但无济于事。由于前面的步骤有效,我强烈怀疑它与 fillna
,但由于计算延迟,不一定是真的。
有人知道是什么原因造成的吗?谢谢!
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-180-f8ab344c7939> in <module>
----> 1 pivot_ffill.fillna(0).head()
C:ProgramDataAnaconda3envspython36libsite-packagesdaskdataframecore.py in head(self, n, npartitions, compute)
896
897 if compute:
--> 898 result = result.compute()
899 return result
900
C:ProgramDataAnaconda3envspython36libsite-packagesdaskbase.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
C:ProgramDataAnaconda3envspython36libsite-packagesdaskbase.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
C:ProgramDataAnaconda3envspython36libsite-packagesdaskthreaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads
C:ProgramDataAnaconda3envspython36libsite-packagesdasklocal.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
460 _execute_task(task, data) # Re-execute locally
461 else:
--> 462 raise_exception(exc, tb)
463 res, worker_id = loads(res_info)
464 state['cache'][key] = res
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcompatibility.py in reraise(exc, tb)
110 if exc.__traceback__ is not tb:
111 raise exc.with_traceback(tb)
--> 112 raise exc
113
114 import pickle as cPickle
C:ProgramDataAnaconda3envspython36libsite-packagesdasklocal.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
228 try:
229 task, data = loads(task_info)
--> 230 result = _execute_task(task, data)
231 id = get_id()
232 result = dumps((result, id))
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in _execute_task(arg, cache, dsk)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in <listcomp>(.0)
116 elif istask(arg):
117 func, args = arg[0], arg[1:]
--> 118 args2 = [_execute_task(a, cache) for a in args]
119 return func(*args2)
120 elif not ishashable(arg):
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
C:ProgramDataAnaconda3envspython36libsite-packagesdaskoptimization.py in __call__(self, *args)
940 % (len(self.inkeys), len(args)))
941 return core.get(self.dsk, self.outkey,
--> 942 dict(zip(self.inkeys, args)))
943
944 def __reduce__(self):
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcore.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
C:ProgramDataAnaconda3envspython36libsite-packagesdaskcompatibility.py in apply(func, args, kwargs)
91 def apply(func, args, kwargs=None):
92 if kwargs:
---> 93 return func(*args, **kwargs)
94 else:
95 return func(*args)
C:ProgramDataAnaconda3envspython36libsite-packagesdaskdataframecore.py in apply_and_enforce(*args, **kwargs)
3800 if not np.array_equal(np.nan_to_num(meta.columns),
3801 np.nan_to_num(df.columns)):
-> 3802 raise ValueError("The columns in the computed data do not match"
3803 " the columns in the provided metadata")
3804 else:
ValueError: The columns in the computed data do not match the columns in the provided metadata
错误消息应该为您提供有关如何解决这种情况的建议。我们假设您是从CSV加载的(问题没有说(,所以您可能会得到一行类似
df = dd.read_csv(..., dtype={...})
它指示熊猫读者你想要强制执行的dtype,因为你比熊猫知道更多的信息。这可确保所有分区的所有列具有相同的类型 - 请参阅文档的注释部分。