我正在使用并行循环和熊猫编写自举算法。我遇到的问题是,并行循环内部的合并命令会导致" valueError:buffer源数组"的读取"错误 - 但前提是我使用完整的数据集合并(120k行(。任何少于12K线的子集都可以正常工作,因此我推断这不是语法的问题。我该怎么办?
当前的熊猫版本为0.24.2,Cython为0.29.7。
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 418, in _process_worker
r = call_item()
File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 272, in __call__
return self.fn(*self.args, **self.kwargs)
File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 567, in __call__
return self.func(*args, **kwargs)
File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/parallel.py", line 225, in __call__
for func, args, kwargs in self.items]
File "/home/ubuntu/.local/lib/python3.6/site-packages/joblib/parallel.py", line 225, in <listcomp>
for func, args, kwargs in self.items]
File "<ipython-input-72-cdb83eaf594c>", line 12, in bootstrap
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/frame.py", line 6868, in merge
copy=copy, indicator=indicator, validate=validate)
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 48, in merge
return op.get_result()
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 546, in get_result
join_index, left_indexer, right_indexer = self._get_join_info()
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 756, in _get_join_info
right_indexer) = self._get_join_indexers()
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 735, in _get_join_indexers
how=self.how)
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 1130, in _get_join_indexers
llab, rlab, shape = map(list, zip(* map(fkeys, left_keys, right_keys)))
File "/home/ubuntu/.local/lib/python3.6/site-packages/pandas/core/reshape/merge.py", line 1662, in _factorize_keys
rlab = rizer.factorize(rk)
File "pandas/_libs/hashtable.pyx", line 111, in pandas._libs.hashtable.Int64Factorizer.factorize
File "stringsource", line 653, in View.MemoryView.memoryview_cwrapper
File "stringsource", line 348, in View.MemoryView.memoryview.__cinit__
ValueError: buffer source array is read-only
"""
The above exception was the direct cause of the following exception:
ValueError Traceback (most recent call last)
<ipython-input-73-652c1db5701b> in <module>()
1 num_cores = multiprocessing.cpu_count()
----> 2 results = Parallel(n_jobs=num_cores, prefer='processes', verbose = 5)(delayed(bootstrap)() for i in range(n_trials))
3 #pd.DataFrame(results[0])
~/.local/lib/python3.6/site-packages/joblib/parallel.py in __call__(self, iterable)
932
933 with self._backend.retrieval_context():
--> 934 self.retrieve()
935 # Make sure that we get a last message telling us we are done
936 elapsed_time = time.time() - self._start_time
~/.local/lib/python3.6/site-packages/joblib/parallel.py in retrieve(self)
831 try:
832 if getattr(self._backend, 'supports_timeout', False):
--> 833 self._output.extend(job.get(timeout=self.timeout))
834 else:
835 self._output.extend(job.get())
~/.local/lib/python3.6/site-packages/joblib/_parallel_backends.py in wrap_future_result(future, timeout)
519 AsyncResults.get from multiprocessing."""
520 try:
--> 521 return future.result(timeout=timeout)
522 except LokyTimeoutError:
523 raise TimeoutError()
/usr/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
430 raise CancelledError()
431 elif self._state == FINISHED:
--> 432 return self.__get_result()
433 else:
434 raise TimeoutError()
/usr/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
382 def __get_result(self):
383 if self._exception:
--> 384 raise self._exception
385 else:
386 return self._result
ValueError: buffer source array is read-only
,代码为
def bootstrap():
df_resample_ids = skl.utils.resample(ob_ids)
df_resample_ids = pd.DataFrame(df_resample_ids).sort_values(by="0").reset_index(drop=True)
df_resample_ids.columns = [ob_id_field]
df_resample = pd.DataFrame(df_resample_ids.merge(df, on = ob_id_field))
return df_resample
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores, prefer='processes', verbose = 5)(delayed(bootstrap)() for i in range(n_trials))
ALGO将创建来自ID变量的重采样/替换ID,并使用Merge命令根据重新采样ID和存储在DF中的原始数据集创建新数据集。如果我切出原始数据集的子集(任何地方(,少于〜12k行,则并行循环不会出错并按照预期进行。
按照要求,以下是重新创建数据结构并反映我目前正在使用的主要方法的新片段:
import pandas as pd
import sklearn as skl
import multiprocessing
from joblib import Parallel, delayed
df = pd.DataFrame(np.random.randn(200000, 24), columns=list('ABCDDEFGHIJKLMNOPQRSTUVW'))
df["ID"] = df.index.drop_duplicates().tolist()
ob_ids = df.index.drop_duplicates().tolist()
def bootstrap2():
df_resample_ids = skl.utils.resample(ob_ids)
df_resample_ids = pd.DataFrame(df_resample_ids).sort_values(by=0).reset_index(drop=True)
df_resample_ids.columns = ['ID']
df_resample = pd.DataFrame(df1.merge(df_resample_ids, on = 'ID'))
result = df_resample
return result
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores, prefer='processes', verbose = 5)(delayed(bootstrap2)() for i in range(n_trials))
但是,我注意到,当数据完全由np.random编号组成时,循环会毫无疑问。原始数据框的dtypes是:
start_rtg int64
end_rtg float64
days_diff float64
ultimate_customer_system_id int64
如何避免仅阅读错误?
在我发现其中一个变量为INT64数据类型时,向我的问题发布答案。当我将所有变量转换为float64时,错误就消失了。因此,这是一个仅限于某些数据类型的问题...
欢呼斯蒂芬