获取Kedro自定义数据集用于SunPy Maps写入/从S3



我目前正在尝试定义一个自定义数据集,以从S3读取/写入.fits文件作为SunPy Maps。

数据目录中与此最接近的东西是枕头。ImageDataSet枕头。ImageDataSet,它支持在加载时传递一个文件对象:https://pillow.readthedocs.io/en/stable/reference/Image.html。

我不确定Maps在输入方面是否足够灵活,以证明采用类似的方法是合理的。我到目前为止修改枕头的尝试。ImageDataSet_load方法包含

smap = Map(fs_file)
return smap

导致以下错误:

DataSetError: Failed while loading data from data set SunPyMapDataSet(filepath=sunspots/data/01_raw/map_sample.fits, protocol=s3, save_args={'overwrite': True}).
Invalid input: <File-like object S3FileSystem, sunspots/data/01_raw/map_sample.fits>

我怎样才能让这里的东西正常工作呢?

我不熟悉这个SunPy库。到目前为止,我认为你的方法是正确的。

fs_file是一个处理程序,你需要一个正确的方式来打开这个文件。我认为你得到这个错误可能是因为Map(fs_file)不是正确的方式来加载文件。

你应该寻找从文件中加载Map对象的函数。

几个月前,我为SunPy编写了一个Kedro自定义数据集,使用Astropy作为中介,忘记回答这个问题。为SunPy用户打开新的kedro-datasets包的PR可能是值得的。

import warnings
from copy import deepcopy
from pathlib import PurePosixPath
from typing import Any, Dict
import fsspec
from kedro.io.core import (
AbstractVersionedDataSet,
DataSetError,
Version,
get_filepath_str,
get_protocol_and_path,
)
import numpy as np
from astropy.io import fits
from sunpy.map import Map

class SunPyMapDataSet(AbstractVersionedDataSet):
DEFAULT_SAVE_ARGS = {"overwrite": False}
def __init__(
self,
filepath: str,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
_fs_args = deepcopy(fs_args) or {}
_fs_open_args_load = _fs_args.pop("open_args_load", {})
_fs_open_args_save = _fs_args.pop("open_args_save", {})
_credentials = deepcopy(credentials) or {}
protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)
self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)
super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)
_fs_open_args_save.setdefault("mode", "wb")
self._fs_open_args_load = _fs_open_args_load
self._fs_open_args_save = _fs_open_args_save
def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
save_args=self._save_args,
version=self._version,
)
def _load(self) -> Map:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
with self._fs.open(load_path, **self._fs_open_args_load) as fs_file:
file = fits.open(fs_file).copy()
image_hdu = file[1]
image_hdu.verify("fix")
smap = Map((image_hdu.data, image_hdu.header))
return smap
def _save(self, data: Map) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)
with self._fs.open(save_path, **self._fs_open_args_save) as fs_file:
hdu = fits.ImageHDU()
hdu.header = data.fits_header
hdu.data = data.data
hdu.writeto(fs_file, **self._save_args)
self._invalidate_cache()
def _exists(self) -> bool:
try:
load_path = get_filepath_str(self._get_load_path(), self._protocol)
except DataSetError:
return False
return self._fs.exists(load_path)
def _release(self) -> None:
super()._release()
self._invalidate_cache()
def _invalidate_cache(self) -> None:
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)

相关内容

  • 没有找到相关文章

最新更新