Dask Cluster: AttributeError: ' datafframe '对象没有属性&#



我正在GCP上使用任务集群。我使用下面的代码来部署它:

from dask_cloudprovider.gcp import GCPCluster
from dask.distributed import Client
enviroment_vars = {
'EXTRA_PIP_PACKAGES': '"gcsfs"'
}
cluster = GCPCluster(
n_workers=32,
docker_image='daskdev/dask:2021.2.0',
env_vars=enviroment_vars,
network='my-network',
#filesystem_size=150,
machine_type='e2-standard-16',
projectid='my-project-id',
zone='us-central1-a',
on_host_maintenance="MIGRATE"
client = Client(cluster)

然后读取csv文件,代码如下:

import dask.dataframe as dd
import csv
col_dtypes = {
'var1': 'float64',
'var2': 'object',
'var3': 'object',
'var4': 'float64'
}
df = dd.read_csv('gs://my_bucket/files-*.csv', blocksize=None, dtype= col_dtypes)
df = df.persist()

一切工作正常,但当我试图做一些查询,或计算,我得到一个错误。例如这段代码:

df.var1.value_counts().compute()

输出:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-14-711a7c21ed42> in <module>
----> 1 df.var1.value_counts().compute()
/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
279         dask.base.compute
280         """
--> 281         (result,) = compute(self, traverse=False, **kwargs)
282         return result
283 
/opt/conda/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
561         postcomputes.append(x.__dask_postcompute__())
562 
--> 563     results = schedule(dsk, keys, **kwargs)
564     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
565 
/opt/conda/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2653                     should_rejoin = False
2654             try:
-> 2655                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2656             finally:
2657                 for f in futures.values():
/opt/conda/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1962             else:
1963                 local_worker = None
-> 1964             return self.sync(
1965                 self._gather,
1966                 futures,
/opt/conda/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
836             return future
837         else:
--> 838             return sync(
839                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
840             )
/opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
338     if error[0]:
339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
341     else:
342         return result[0]
/opt/conda/lib/python3.8/site-packages/distributed/utils.py in f()
322             if callback_timeout is not None:
323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
325         except Exception as exc:
326             error[0] = sys.exc_info()
/opt/conda/lib/python3.8/site-packages/tornado/gen.py in run(self)
760 
761                     try:
--> 762                         value = future.result()
763                     except Exception:
764                         exc_info = sys.exc_info()
/opt/conda/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1827                             exc = CancelledError(key)
1828                         else:
-> 1829                             raise exception.with_traceback(traceback)
1830                         raise exc
1831                     if errors == "skip":
/opt/conda/lib/python3.8/site-packages/dask/optimization.py in __call__()
961         if not len(args) == len(self.inkeys):
962             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 963         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
964 
965     def __reduce__(self):
/opt/conda/lib/python3.8/site-packages/dask/core.py in get()
149     for key in toposort(dsk):
150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
152         cache[key] = result
153     result = _execute_task(out, cache)
/opt/conda/lib/python3.8/site-packages/dask/core.py in _execute_task()
119         # temporaries by their reference count and can execute certain
120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
122     elif not ishashable(arg):
123         return arg
/opt/conda/lib/python3.8/site-packages/dask/utils.py in apply()
33 def apply(func, args, kwargs=None):
34     if kwargs:
---> 35         return func(*args, **kwargs)
36     else:
37         return func(*args)
/opt/conda/lib/python3.8/site-packages/dask/dataframe/core.py in apply_and_enforce()
5474             return meta
5475         if is_dataframe_like(df):
-> 5476             check_matching_columns(meta, df)
5477             c = meta.columns
5478         else:
/opt/conda/lib/python3.8/site-packages/dask/dataframe/utils.py in check_matching_columns()
690 def check_matching_columns(meta, actual):
691     # Need nan_to_num otherwise nan comparison gives False
--> 692     if not np.array_equal(np.nan_to_num(meta.columns), np.nan_to_num(actual.columns)):
693         extra = methods.tolist(actual.columns.difference(meta.columns))
694         missing = methods.tolist(meta.columns.difference(actual.columns))
/opt/conda/lib/python3.8/site-packages/pandas/core/generic.py in __getattr__()
5268             or name in self._accessors
5269         ):
-> 5270             return object.__getattribute__(self, name)
5271         else:
5272             if self._info_axis._can_hold_identifiers_and_holds_name(name):
pandas/_libs/properties.pyx in pandas._libs.properties.AxisProperty.__get__()
/opt/conda/lib/python3.8/site-packages/pandas/core/generic.py in __getattr__()
5268             or name in self._accessors
5269         ):
-> 5270             return object.__getattribute__(self, name)
5271         else:
5272             if self._info_axis._can_hold_identifiers_and_holds_name(name):
AttributeError: 'DataFrame' object has no attribute '_data'

我的docker文件中的Pandas版本是1.0.1,所以我已经尝试升级Pandas(到1.2.2版本),但它没有工作,我做错了什么?

我的猜测是你有一个版本不匹配的地方。client.get_versions(check=True)怎么说?

相关内容

  • 没有找到相关文章

最新更新