我正试图从Microsoft Planetary获取Terraclimate数据,但遇到超时错误。是否有可能增加超时时间?请找到下面的代码和我面临的错误。我使用fsspec和xarray从MS Planetary门户下载空间数据。
import fsspec
import xarray as xr
store = fsspec.get_mapper(asset.href)
data = xr.open_zarr(store, **asset.extra_fields["xarray:open_kwargs"])
clipped_data = data.sel(time=slice('2015-01-01','2019-12-31'),lon=slice(min_lon,max_lon),lat=slice(max_lat,min_lat))
parsed_data = clipped_data[['tmax', 'tmin', 'ppt', 'soil']]
lat_list = parsed_data['lat'].values.tolist()
lon_list = parsed_data['lon'].values.tolist()
filename = "Soil_Moisture_sample.csv"
for(i,j) in zip(lat_list,lon_list):
parsed_data[["soil","tmax","tmin","ppt"]].sel(lon=i, lat=j, method="nearest").to_dataframe().to_csv(filename,mode='a',index=False, header=False)
我得到以下错误
TimeoutError Traceback (most recent call last)
File ~Anaconda3envssatellitelibsite-packagesfsspecasyn.py:53, in _runner(event, coro, result, timeout)
52 try:
---> 53 result[0] = await coro
54 except Exception as ex:
File ~Anaconda3envssatellitelibsite-packagesfsspecasyn.py:423, in AsyncFileSystem._cat(self, path, recursive, on_error, batch_size, **kwargs)
422 if ex:
--> 423 raise ex
424 if (
425 len(paths) > 1
426 or isinstance(path, list)
427 or paths[0] != self._strip_protocol(path)
428 ):
File ~Anaconda3envssatellitelibasynciotasks.py:455, in wait_for(fut, timeout, loop)
454 if timeout is None:
--> 455 return await fut
457 if timeout <= 0:
File ~Anaconda3envssatellitelibsite-packagesfsspecimplementationshttp.py:221, in HTTPFileSystem._cat_file(self, url, start, end, **kwargs)
220 async with session.get(url, **kw) as r:
--> 221 out = await r.read()
222 self._raise_not_found_for_status(r, url)
File ~Anaconda3envssatellitelibsite-packagesaiohttpclient_reqrep.py:1036, in ClientResponse.read(self)
1035 try:
-> 1036 self._body = await self.content.read()
1037 for trace in self._traces:
File ~Anaconda3envssatellitelibsite-packagesaiohttpstreams.py:375, in StreamReader.read(self, n)
374 while True:
--> 375 block = await self.readany()
376 if not block:
File ~Anaconda3envssatellitelibsite-packagesaiohttpstreams.py:397, in StreamReader.readany(self)
396 while not self._buffer and not self._eof:
--> 397 await self._wait("readany")
399 return self._read_nowait(-1)
File ~Anaconda3envssatellitelibsite-packagesaiohttpstreams.py:304, in StreamReader._wait(self, func_name)
303 with self._timer:
--> 304 await waiter
305 else:
File ~Anaconda3envssatellitelibsite-packagesaiohttphelpers.py:721, in TimerContext.__exit__(self, exc_type, exc_val, exc_tb)
720 if exc_type is asyncio.CancelledError and self._cancelled:
--> 721 raise asyncio.TimeoutError from None
722 return None
TimeoutError:
The above exception was the direct cause of the following exception:
FSTimeoutError Traceback (most recent call last)
Input In [62], in <cell line: 3>()
1 # Flood Region Point - Thiruvanthpuram
2 filename = "Soil_Moisture_sample.csv"
----> 3 parsed_data[["soil","tmax","tmin","ppt"]].sel(lon=8.520833, lat=76.4375, method="nearest").to_dataframe().to_csv(filename,mode='a',index=False, header=False)
File ~Anaconda3envssatellitelibsite-packagesxarraycoredataset.py:5898, in Dataset.to_dataframe(self, dim_order)
5870 """Convert this dataset into a pandas.DataFrame.
5871
5872 Non-index variables in this dataset form the columns of the
(...)
5893
5894 """
5896 ordered_dims = self._normalize_dim_order(dim_order=dim_order)
-> 5898 return self._to_dataframe(ordered_dims=ordered_dims)
File ~Anaconda3envssatellitelibsite-packagesxarraycoredataset.py:5862, in Dataset._to_dataframe(self, ordered_dims)
5860 def _to_dataframe(self, ordered_dims: Mapping[Any, int]):
5861 columns = [k for k in self.variables if k not in self.dims]
-> 5862 data = [
5863 self._variables[k].set_dims(ordered_dims).values.reshape(-1)
5864 for k in columns
5865 ]
5866 index = self.coords.to_index([*ordered_dims])
5867 return pd.DataFrame(dict(zip(columns, data)), index=index)
File ~Anaconda3envssatellitelibsite-packagesxarraycoredataset.py:5863, in <listcomp>(.0)
5860 def _to_dataframe(self, ordered_dims: Mapping[Any, int]):
5861 columns = [k for k in self.variables if k not in self.dims]
5862 data = [
-> 5863 self._variables[k].set_dims(ordered_dims).values.reshape(-1)
5864 for k in columns
5865 ]
5866 index = self.coords.to_index([*ordered_dims])
5867 return pd.DataFrame(dict(zip(columns, data)), index=index)
File ~Anaconda3envssatellitelibsite-packagesxarraycorevariable.py:527, in Variable.values(self)
524 @property
525 def values(self):
526 """The variable's data as a numpy.ndarray"""
--> 527 return _as_array_or_item(self._data)
File ~Anaconda3envssatellitelibsite-packagesxarraycorevariable.py:267, in _as_array_or_item(data)
253 def _as_array_or_item(data):
254 """Return the given values as a numpy array, or as an individual item if
255 it's a 0d datetime64 or timedelta64 array.
256
(...)
265 TODO: remove this (replace with np.asarray) once these issues are fixed
266 """
--> 267 data = np.asarray(data)
268 if data.ndim == 0:
269 if data.dtype.kind == "M":
File ~AppDataRoamingPythonPython38site-packagesdaskarraycore.py:1696, in Array.__array__(self, dtype, **kwargs)
1695 def __array__(self, dtype=None, **kwargs):
-> 1696 x = self.compute()
1697 if dtype and x.dtype != dtype:
1698 x = x.astype(dtype)
File ~AppDataRoamingPythonPython38site-packagesdaskbase.py:315, in DaskMethodsMixin.compute(self, **kwargs)
291 def compute(self, **kwargs):
292 """Compute this dask collection
293
294 This turns a lazy Dask collection into its in-memory equivalent.
(...)
313 dask.base.compute
314 """
--> 315 (result,) = compute(self, traverse=False, **kwargs)
316 return result
File ~AppDataRoamingPythonPython38site-packagesdaskbase.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
597 keys.append(x.__dask_keys__())
598 postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~AppDataRoamingPythonPython38site-packagesdaskthreaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
86 elif isinstance(pool, multiprocessing.pool.Pool):
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads
101 with pools_lock:
File ~AppDataRoamingPythonPython38site-packagesdasklocal.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
509 _execute_task(task, data) # Re-execute locally
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
513 state["cache"][key] = res
File ~AppDataRoamingPythonPython38site-packagesdasklocal.py:319, in reraise(exc, tb)
317 if exc.__traceback__ is not tb:
318 raise exc.with_traceback(tb)
--> 319 raise exc
File ~AppDataRoamingPythonPython38site-packagesdasklocal.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
226 result = dumps((result, id))
File ~AppDataRoamingPythonPython38site-packagesdaskcore.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~AppDataRoamingPythonPython38site-packagesdaskarraycore.py:128, in getter(a, b, asarray, lock)
123 # Below we special-case `np.matrix` to force a conversion to
124 # `np.ndarray` and preserve original Dask behavior for `getter`,
125 # as for all purposes `np.matrix` is array-like and thus
126 # `is_arraylike` evaluates to `True` in that case.
127 if asarray and (not is_arraylike(c) or isinstance(c, np.matrix)):
--> 128 c = np.asarray(c)
129 finally:
130 if lock:
File ~Anaconda3envssatellitelibsite-packagesxarraycoreindexing.py:459, in ImplicitToExplicitIndexingAdapter.__array__(self, dtype)
458 def __array__(self, dtype=None):
--> 459 return np.asarray(self.array, dtype=dtype)
File ~Anaconda3envssatellitelibsite-packagesxarraycoreindexing.py:623, in CopyOnWriteArray.__array__(self, dtype)
622 def __array__(self, dtype=None):
--> 623 return np.asarray(self.array, dtype=dtype)
File ~Anaconda3envssatellitelibsite-packagesxarraycoreindexing.py:524, in LazilyIndexedArray.__array__(self, dtype)
522 def __array__(self, dtype=None):
523 array = as_indexable(self.array)
--> 524 return np.asarray(array[self.key], dtype=None)
File ~Anaconda3envssatellitelibsite-packagesxarraybackendszarr.py:76, in ZarrArrayWrapper.__getitem__(self, key)
74 array = self.get_array()
75 if isinstance(key, indexing.BasicIndexer):
---> 76 return array[key.tuple]
77 elif isinstance(key, indexing.VectorizedIndexer):
78 return array.vindex[
79 indexing._arrayize_vectorized_indexer(key, self.shape).tuple
80 ]
File ~Anaconda3envssatellitelibsite-packageszarrcore.py:788, in Array.__getitem__(self, selection)
786 result = self.vindex[selection]
787 else:
--> 788 result = self.get_basic_selection(pure_selection, fields=fields)
789 return result
File ~Anaconda3envssatellitelibsite-packageszarrcore.py:914, in Array.get_basic_selection(self, selection, out, fields)
911 return self._get_basic_selection_zd(selection=selection, out=out,
912 fields=fields)
913 else:
--> 914 return self._get_basic_selection_nd(selection=selection, out=out,
915 fields=fields)
File ~Anaconda3envssatellitelibsite-packageszarrcore.py:957, in Array._get_basic_selection_nd(self, selection, out, fields)
951 def _get_basic_selection_nd(self, selection, out=None, fields=None):
952 # implementation of basic selection for array with at least one dimension
953
954 # setup indexer
955 indexer = BasicIndexer(selection, self)
--> 957 return self._get_selection(indexer=indexer, out=out, fields=fields)
File ~Anaconda3envssatellitelibsite-packageszarrcore.py:1247, in Array._get_selection(self, indexer, out, fields)
1241 if not hasattr(self.chunk_store, "getitems") or
1242 any(map(lambda x: x == 0, self.shape)):
1243 # sequentially get one key at a time from storage
1244 for chunk_coords, chunk_selection, out_selection in indexer:
1245
1246 # load chunk selection into output array
-> 1247 self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
1248 drop_axes=indexer.drop_axes, fields=fields)
1249 else:
1250 # allow storage to get multiple items at once
1251 lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
File ~Anaconda3envssatellitelibsite-packageszarrcore.py:1939, in Array._chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection, drop_axes, fields)
1935 ckey = self._chunk_key(chunk_coords)
1937 try:
1938 # obtain compressed data for chunk
-> 1939 cdata = self.chunk_store[ckey]
1941 except KeyError:
1942 # chunk not initialized
1943 if self._fill_value is not None:
File ~Anaconda3envssatellitelibsite-packageszarrstorage.py:717, in KVStore.__getitem__(self, key)
716 def __getitem__(self, key):
--> 717 return self._mutable_mapping[key]
File ~Anaconda3envssatellitelibsite-packagesfsspecmapping.py:137, in FSMap.__getitem__(self, key, default)
135 k = self._key_to_str(key)
136 try:
--> 137 result = self.fs.cat(k)
138 except self.missing_exceptions:
139 if default is not None:
File ~Anaconda3envssatellitelibsite-packagesfsspecasyn.py:111, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
108 @functools.wraps(func)
109 def wrapper(*args, **kwargs):
110 self = obj or args[0]
--> 111 return sync(self.loop, func, *args, **kwargs)
File ~Anaconda3envssatellitelibsite-packagesfsspecasyn.py:94, in sync(loop, func, timeout, *args, **kwargs)
91 return_result = result[0]
92 if isinstance(return_result, asyncio.TimeoutError):
93 # suppress asyncio.TimeoutError, raise FSTimeoutError
---> 94 raise FSTimeoutError from return_result
95 elif isinstance(return_result, BaseException):
96 raise return_result
FSTimeoutError:
在行中:
store = fsspec.get_mapper(asset.href)
您可以向fsspec后端传递额外的参数,在本例中为HTTP,请参阅fsspec.eimplementations.HTTP.HTTPFileSystem。在本例中将client_kwargs
传递给aiohttp.ClientSession
,并包含一个可选的timeout
参数。你的电话可能看起来像
from aiohttp import ClientTimeout
store = get_mapper(asset.href, client_kwargs={"timeout": ClientTimeout(total=5000, connect=1000)})