Python中的FSSpec错误处理-超时错误



我正试图从Microsoft Planetary获取Terraclimate数据,但遇到超时错误。是否有可能增加超时时间?请找到下面的代码和我面临的错误。我使用fsspec和xarray从MS Planetary门户下载空间数据。

import fsspec
import xarray as xr
store = fsspec.get_mapper(asset.href)
data = xr.open_zarr(store, **asset.extra_fields["xarray:open_kwargs"])

clipped_data = data.sel(time=slice('2015-01-01','2019-12-31'),lon=slice(min_lon,max_lon),lat=slice(max_lat,min_lat))

parsed_data = clipped_data[['tmax', 'tmin', 'ppt', 'soil']]
lat_list = parsed_data['lat'].values.tolist()
lon_list = parsed_data['lon'].values.tolist()
filename = "Soil_Moisture_sample.csv"
for(i,j) in zip(lat_list,lon_list):
parsed_data[["soil","tmax","tmin","ppt"]].sel(lon=i, lat=j, method="nearest").to_dataframe().to_csv(filename,mode='a',index=False, header=False)

我得到以下错误

TimeoutError                              Traceback (most recent call last)
File ~Anaconda3envssatellitelibsite-packagesfsspecasyn.py:53, in _runner(event, coro, result, timeout)
52 try:
---> 53     result[0] = await coro
54 except Exception as ex:

File ~Anaconda3envssatellitelibsite-packagesfsspecasyn.py:423, in AsyncFileSystem._cat(self, path, recursive, on_error, batch_size, **kwargs)
422     if ex:
--> 423         raise ex
424 if (
425     len(paths) > 1
426     or isinstance(path, list)
427     or paths[0] != self._strip_protocol(path)
428 ):

File ~Anaconda3envssatellitelibasynciotasks.py:455, in wait_for(fut, timeout, loop)
454 if timeout is None:
--> 455     return await fut
457 if timeout <= 0:

File ~Anaconda3envssatellitelibsite-packagesfsspecimplementationshttp.py:221, in HTTPFileSystem._cat_file(self, url, start, end, **kwargs)
220 async with session.get(url, **kw) as r:
--> 221     out = await r.read()
222     self._raise_not_found_for_status(r, url)

File ~Anaconda3envssatellitelibsite-packagesaiohttpclient_reqrep.py:1036, in ClientResponse.read(self)
1035 try:
-> 1036     self._body = await self.content.read()
1037     for trace in self._traces:

File ~Anaconda3envssatellitelibsite-packagesaiohttpstreams.py:375, in StreamReader.read(self, n)
374 while True:
--> 375     block = await self.readany()
376     if not block:

File ~Anaconda3envssatellitelibsite-packagesaiohttpstreams.py:397, in StreamReader.readany(self)
396 while not self._buffer and not self._eof:
--> 397     await self._wait("readany")
399 return self._read_nowait(-1)

File ~Anaconda3envssatellitelibsite-packagesaiohttpstreams.py:304, in StreamReader._wait(self, func_name)
303     with self._timer:
--> 304         await waiter
305 else:

File ~Anaconda3envssatellitelibsite-packagesaiohttphelpers.py:721, in TimerContext.__exit__(self, exc_type, exc_val, exc_tb)
720 if exc_type is asyncio.CancelledError and self._cancelled:
--> 721     raise asyncio.TimeoutError from None
722 return None

TimeoutError: 

The above exception was the direct cause of the following exception:

FSTimeoutError                            Traceback (most recent call last)
Input In [62], in <cell line: 3>()
1 # Flood Region Point - Thiruvanthpuram
2 filename = "Soil_Moisture_sample.csv"
----> 3 parsed_data[["soil","tmax","tmin","ppt"]].sel(lon=8.520833, lat=76.4375, method="nearest").to_dataframe().to_csv(filename,mode='a',index=False, header=False)

File ~Anaconda3envssatellitelibsite-packagesxarraycoredataset.py:5898, in Dataset.to_dataframe(self, dim_order)
5870 """Convert this dataset into a pandas.DataFrame.
5871 
5872 Non-index variables in this dataset form the columns of the
(...)
5893 
5894 """
5896 ordered_dims = self._normalize_dim_order(dim_order=dim_order)
-> 5898 return self._to_dataframe(ordered_dims=ordered_dims)

File ~Anaconda3envssatellitelibsite-packagesxarraycoredataset.py:5862, in Dataset._to_dataframe(self, ordered_dims)
5860 def _to_dataframe(self, ordered_dims: Mapping[Any, int]):
5861     columns = [k for k in self.variables if k not in self.dims]
-> 5862     data = [
5863         self._variables[k].set_dims(ordered_dims).values.reshape(-1)
5864         for k in columns
5865     ]
5866     index = self.coords.to_index([*ordered_dims])
5867     return pd.DataFrame(dict(zip(columns, data)), index=index)

File ~Anaconda3envssatellitelibsite-packagesxarraycoredataset.py:5863, in <listcomp>(.0)
5860 def _to_dataframe(self, ordered_dims: Mapping[Any, int]):
5861     columns = [k for k in self.variables if k not in self.dims]
5862     data = [
-> 5863         self._variables[k].set_dims(ordered_dims).values.reshape(-1)
5864         for k in columns
5865     ]
5866     index = self.coords.to_index([*ordered_dims])
5867     return pd.DataFrame(dict(zip(columns, data)), index=index)

File ~Anaconda3envssatellitelibsite-packagesxarraycorevariable.py:527, in Variable.values(self)
524 @property
525 def values(self):
526     """The variable's data as a numpy.ndarray"""
--> 527     return _as_array_or_item(self._data)

File ~Anaconda3envssatellitelibsite-packagesxarraycorevariable.py:267, in _as_array_or_item(data)
253 def _as_array_or_item(data):
254     """Return the given values as a numpy array, or as an individual item if
255     it's a 0d datetime64 or timedelta64 array.
256 
(...)
265     TODO: remove this (replace with np.asarray) once these issues are fixed
266     """
--> 267     data = np.asarray(data)
268     if data.ndim == 0:
269         if data.dtype.kind == "M":

File ~AppDataRoamingPythonPython38site-packagesdaskarraycore.py:1696, in Array.__array__(self, dtype, **kwargs)
1695 def __array__(self, dtype=None, **kwargs):
-> 1696     x = self.compute()
1697     if dtype and x.dtype != dtype:
1698         x = x.astype(dtype)

File ~AppDataRoamingPythonPython38site-packagesdaskbase.py:315, in DaskMethodsMixin.compute(self, **kwargs)
291 def compute(self, **kwargs):
292     """Compute this dask collection
293 
294     This turns a lazy Dask collection into its in-memory equivalent.
(...)
313     dask.base.compute
314     """
--> 315     (result,) = compute(self, traverse=False, **kwargs)
316     return result

File ~AppDataRoamingPythonPython38site-packagesdaskbase.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
597     keys.append(x.__dask_keys__())
598     postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~AppDataRoamingPythonPython38site-packagesdaskthreaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
86     elif isinstance(pool, multiprocessing.pool.Pool):
87         pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90     pool.submit,
91     pool._max_workers,
92     dsk,
93     keys,
94     cache=cache,
95     get_id=_thread_get_id,
96     pack_exception=pack_exception,
97     **kwargs,
98 )
100 # Cleanup pools associated to dead threads
101 with pools_lock:

File ~AppDataRoamingPythonPython38site-packagesdasklocal.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
509         _execute_task(task, data)  # Re-execute locally
510     else:
--> 511         raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
513 state["cache"][key] = res

File ~AppDataRoamingPythonPython38site-packagesdasklocal.py:319, in reraise(exc, tb)
317 if exc.__traceback__ is not tb:
318     raise exc.with_traceback(tb)
--> 319 raise exc

File ~AppDataRoamingPythonPython38site-packagesdasklocal.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223     task, data = loads(task_info)
--> 224     result = _execute_task(task, data)
225     id = get_id()
226     result = dumps((result, id))

File ~AppDataRoamingPythonPython38site-packagesdaskcore.py:119, in _execute_task(arg, cache, dsk)
115     func, args = arg[0], arg[1:]
116     # Note: Don't assign the subtask results to a variable. numpy detects
117     # temporaries by their reference count and can execute certain
118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121     return arg

File ~AppDataRoamingPythonPython38site-packagesdaskarraycore.py:128, in getter(a, b, asarray, lock)
123     # Below we special-case `np.matrix` to force a conversion to
124     # `np.ndarray` and preserve original Dask behavior for `getter`,
125     # as for all purposes `np.matrix` is array-like and thus
126     # `is_arraylike` evaluates to `True` in that case.
127     if asarray and (not is_arraylike(c) or isinstance(c, np.matrix)):
--> 128         c = np.asarray(c)
129 finally:
130     if lock:

File ~Anaconda3envssatellitelibsite-packagesxarraycoreindexing.py:459, in ImplicitToExplicitIndexingAdapter.__array__(self, dtype)
458 def __array__(self, dtype=None):
--> 459     return np.asarray(self.array, dtype=dtype)

File ~Anaconda3envssatellitelibsite-packagesxarraycoreindexing.py:623, in CopyOnWriteArray.__array__(self, dtype)
622 def __array__(self, dtype=None):
--> 623     return np.asarray(self.array, dtype=dtype)

File ~Anaconda3envssatellitelibsite-packagesxarraycoreindexing.py:524, in LazilyIndexedArray.__array__(self, dtype)
522 def __array__(self, dtype=None):
523     array = as_indexable(self.array)
--> 524     return np.asarray(array[self.key], dtype=None)

File ~Anaconda3envssatellitelibsite-packagesxarraybackendszarr.py:76, in ZarrArrayWrapper.__getitem__(self, key)
74 array = self.get_array()
75 if isinstance(key, indexing.BasicIndexer):
---> 76     return array[key.tuple]
77 elif isinstance(key, indexing.VectorizedIndexer):
78     return array.vindex[
79         indexing._arrayize_vectorized_indexer(key, self.shape).tuple
80     ]

File ~Anaconda3envssatellitelibsite-packageszarrcore.py:788, in Array.__getitem__(self, selection)
786     result = self.vindex[selection]
787 else:
--> 788     result = self.get_basic_selection(pure_selection, fields=fields)
789 return result

File ~Anaconda3envssatellitelibsite-packageszarrcore.py:914, in Array.get_basic_selection(self, selection, out, fields)
911     return self._get_basic_selection_zd(selection=selection, out=out,
912                                         fields=fields)
913 else:
--> 914     return self._get_basic_selection_nd(selection=selection, out=out,
915                                         fields=fields)

File ~Anaconda3envssatellitelibsite-packageszarrcore.py:957, in Array._get_basic_selection_nd(self, selection, out, fields)
951 def _get_basic_selection_nd(self, selection, out=None, fields=None):
952     # implementation of basic selection for array with at least one dimension
953 
954     # setup indexer
955     indexer = BasicIndexer(selection, self)
--> 957     return self._get_selection(indexer=indexer, out=out, fields=fields)

File ~Anaconda3envssatellitelibsite-packageszarrcore.py:1247, in Array._get_selection(self, indexer, out, fields)
1241 if not hasattr(self.chunk_store, "getitems") or 
1242    any(map(lambda x: x == 0, self.shape)):
1243     # sequentially get one key at a time from storage
1244     for chunk_coords, chunk_selection, out_selection in indexer:
1245 
1246         # load chunk selection into output array
-> 1247         self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
1248                             drop_axes=indexer.drop_axes, fields=fields)
1249 else:
1250     # allow storage to get multiple items at once
1251     lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)

File ~Anaconda3envssatellitelibsite-packageszarrcore.py:1939, in Array._chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes, fields)
1935 ckey = self._chunk_key(chunk_coords)
1937 try:
1938     # obtain compressed data for chunk
-> 1939     cdata = self.chunk_store[ckey]
1941 except KeyError:
1942     # chunk not initialized
1943     if self._fill_value is not None:

File ~Anaconda3envssatellitelibsite-packageszarrstorage.py:717, in KVStore.__getitem__(self, key)
716 def __getitem__(self, key):
--> 717     return self._mutable_mapping[key]

File ~Anaconda3envssatellitelibsite-packagesfsspecmapping.py:137, in FSMap.__getitem__(self, key, default)
135 k = self._key_to_str(key)
136 try:
--> 137     result = self.fs.cat(k)
138 except self.missing_exceptions:
139     if default is not None:

File ~Anaconda3envssatellitelibsite-packagesfsspecasyn.py:111, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
108 @functools.wraps(func)
109 def wrapper(*args, **kwargs):
110     self = obj or args[0]
--> 111     return sync(self.loop, func, *args, **kwargs)

File ~Anaconda3envssatellitelibsite-packagesfsspecasyn.py:94, in sync(loop, func, timeout, *args, **kwargs)
91 return_result = result[0]
92 if isinstance(return_result, asyncio.TimeoutError):
93     # suppress asyncio.TimeoutError, raise FSTimeoutError
---> 94     raise FSTimeoutError from return_result
95 elif isinstance(return_result, BaseException):
96     raise return_result

FSTimeoutError: 

在行中:

store = fsspec.get_mapper(asset.href)

您可以向fsspec后端传递额外的参数,在本例中为HTTP,请参阅fsspec.eimplementations.HTTP.HTTPFileSystem。在本例中将client_kwargs传递给aiohttp.ClientSession,并包含一个可选的timeout参数。你的电话可能看起来像

from aiohttp import ClientTimeout
store = get_mapper(asset.href, client_kwargs={"timeout": ClientTimeout(total=5000, connect=1000)})

相关内容

  • 没有找到相关文章

最新更新