路易吉全局变量



我想在Luigi中将一些目标路径设置为全局变量。

原因是我使用的目标路径基于给定数值天气预报 (NWP( 的最后一次运行,并且需要一些时间才能获得该值。一旦我检查了最后一次运行,我就会创建一个路径,我将在其中放置多个目标文件(具有相同的父文件夹(。

我目前正在重复类似的调用以获取多个任务的父路径的值,将此路径设置为全局变量会更有效。我尝试从 luigi 类调用的一个函数 (get_target_path( 中定义全局变量,但当我回到 Luigi 管道时,全局变量似乎不会持久化。

此外,我的代码如下所示:

class GetNWP(luigi.Task):
"""
Download the NWP data.
"""
product_id = luigi.Parameter()
date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
run_hr = luigi.Parameter(default='latest')
def requires(self):
return None
def output(self):
path = get_target_path(self.product_id, self.date, self.run_hr,
type='getNWP')
return luigi.LocalTarget(path)
def run(self):
download_nwp_data(self.product_id, self.date, self.run_hr)

class GetNWP_GFS(luigi.Task):
"""
GFS data.
"""
product_id = luigi.Parameter()
date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
run_hr = luigi.Parameter(default='latest')
def requires(self):
return None
def output(self):
path = get_target_path(self.product_id_PV, self.date, self.run_hr,
type='getNWP_GFS')
return luigi.LocalTarget(path)
def run(self):
download_nwp_data(self.product_id, self.date, self.run_hr,
type='getNWP_GFS')

class Predict(luigi.Task):
"""
Create forecast.
"""
product_id = luigi.Parameter(default=None)
date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
run_hr = luigi.Parameter(default='latest')
horizon = luigi.Parameter(default='DA')
def requires(self):
return [
GetNWP_GFS(self.product_id, self.date, self.run_hr),
GetNWP(self.product_id, self.date, self.run_hr)
]
def output(self):
path = get_target_path(self.product_id, self.date, self.run_hr,
type='predict', horizon=self.horizon)
return luigi.LocalTarget(path)
def run(self):
get_forecast(self.product_id, self.date, self.run_hr)

该函数get_target_path根据输入参数定义目标路径。我希望这个函数设置可以从 Luigi 访问的全局变量。例如,如下所示(只是getNWP任务的代码(:

def get_target_path(product_id, date, run_hr, type=None, horizon='DA'):
"""
Obtain target path.
"""
if type == 'getNWP_GFS':
if 'path_nwp_gfs' in globals():
return path_nwp_gfs
else:
...
elif type == 'getNWP':
if 'path_nwp_model' in globals():
return path_nwp_model
else:
filename = f'{nwp_model}_{date}_{run_hr}_{horizon}.{ext}'
path = Path(db_dflt['app_data']['nwp_folder'])
create_directory(path)
global path_nwp_model
path_nwp_model = Path(path) / filename
elif type == 'predict':
if 'path_predict' in globals():
return path_predict
else:
...

当我回到 Luigi 时,此函数中定义的全局变量不存在。

关于如何解决这个问题的任何想法将不胜感激!

由于似乎没有内置方法来存储 Luigi 目标的路径,我最终决定创建一个包含与 Luigi 目标/路径相关的所有信息的类。当调用需要知道哪些是目标路径的外部函数时,此类在 Luigi 的任务中使用。

此类在主 luigy 脚本中导入,并在定义任务之前实例化:

from .utils import Targets
paths = Targets()
class GetNWP(luigi.Task):
"""Download NWP data required to prepare the prediction."""
product_id = luigi.Parameter()
date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
run_hr = luigi.Parameter(default='latest')
def requires(self):
return GetProductInfo(self.product_id)
def output(self):
path = paths.getpath_nwp(self.product_id, self.date, self.run_hr)
path_gfs = paths.getpath_nwp_GFS(self.product_id, self.date, self.run_hr)
return [luigi.LocalTarget(path),
luigi.LocalTarget(path_gfs)]
def run(self):
download_nwp_data(self.product_id, date=self.date, run_hr=self.run_hr,
paths=paths, nwp_model=paths.nwp_model)
download_nwp_data(self.product_id, date=self.date, run_hr=self.run_hr,
paths=paths, nwp_model=paths.gfs_model)   
class Predict(luigi.Task):
"""Create forecast based on the product information and NWP data."""
product_id = luigi.Parameter()
date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
run_hr = luigi.Parameter(default='latest')
def requires(self):
return GetNWP(self.product_id, self.date, self.run_hr)
def output(self):
path = paths.getpath_predict(self.product_id, self.date, self.run_hr)
path_gfs = paths.getpath_predict_GFS(self.product_id, self.date,
self.run_hr)
return [luigi.LocalTarget(path),
luigi.LocalTarget(path_gfs)]
def run(self):
get_forecast(product_id=self.product_id, date=self.date,
run_hr=self.run_hr, paths=paths, nwp_model=paths.nwp_model)
get_forecast(product_id=self.product_id, date=self.date,
run_hr=self.run_hr, paths=paths, nwp_model=paths.gfs_model)

其中,目标类具有以下结构:

class Targets:
"""Store Luigi's target paths."""
def __init__(self):
"""Initialize paths and variables."""
self.path1 = None
self.path2 = None
self.path3 = None
def update_object(self, product_id, date=None, run_hr=None):
"""Update object based on inputs."""
if self.prod_id is None:
self.prod_id = product_id
if self.path_1 is None:
self.get_path_1(product_id)
if self.path_2 is None:
self.get_path_2(product_id)
if self.path_3 is None:
self.get_path_3(product_id)
def get_path_1(self, product_id, ...)
"""Generate a path 1 for a luigi Task."""
... define self.path_1...
def get_path_2(self, product_id, ...)
"""Generate a path 2 for a luigi Task."""
... define self.path_2...
def get_path_3(self, product_id, ...)
"""Generate a path 3 for a luigi Task."""
... define self.path_3...

主要思想是只设置一次目标路径,并从每个 Luigi 任务中使用它们作为输入参数。这允许:

  • 更快地执行任务,以及
  • 如果目标路径由于新的 NWP 不可用而更改,请避免错误。

如果你愿意,你可以使用 mixins,但请记住,luigi 任务可以继承实例方法和参数。

import os
import luigi
LUIGI_BASE_PATH='/path/to/luigi/dir'
class BaseTask(luigi.Task)
product_id = luigi.Parameter()
date = luigi.Parameter(default=datetime.today().strftime('%Y%m%d'))
run_hr = luigi.Parameter(default='latest')
def get_path_dynamic(self):
return os.path.join(LUIGI_BASE_PATH, 
self.__class__.__name__, 
self.product_id,
...)
def output(self):
return luigi.LocalTarget(self.get_path_dynamic())

class Predict(BaseTask):
def run(self):
...

额外的好处是你不需要重新定义相同的参数,并且 子任务的名称(PredictGetNWP(将入到输出路径中。我不确定path1path2等属性与gpath_nwp()和类似函数的关系,因为它们的定义不包含在示例中,但您可以使用@property装饰器来模拟相同的功能来定义 getter 和 setter。

最新更新