结合多个cron表达式的气流时间表?



我有几个需要应用于单个DAG的cron表达式。没有办法用一个cron表达式来表达它们。

气流2.2引入Timetable。是否有一个实现,需要一个cron表达式列表?

我正在寻找同样的东西,但没有找到任何东西。如果一个标准的带气流的就好了。

这是我为气流2.2.5编写的0.1版本。

# This file is <airflow plugins directory>/timetable.py
from typing import Any, Dict, List, Optional
import pendulum
from croniter import croniter
from pendulum import DateTime, Duration, timezone, instance as pendulum_instance
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.exceptions import AirflowTimetableInvalid

class MultiCronTimetable(Timetable):
valid_units = ['minutes', 'hours', 'days']
def __init__(self,
cron_defs: List[str],
timezone: str = 'Europe/Berlin',
period_length: int = 0,
period_unit: str = 'hours'):
self.cron_defs = cron_defs
self.timezone = timezone
self.period_length = period_length
self.period_unit = period_unit
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
"""
Determines date interval for manually triggered runs.
This is simply (now - period) to now.
"""
end = run_after
if self.period_length == 0:
start = end
else:
start = self.data_period_start(end)
return DataInterval(start=start, end=end)
def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction) -> Optional[DagRunInfo]:
"""
Determines when the DAG should be scheduled.
"""
if restriction.earliest is None:
# No start_date. Don't schedule.
return None
is_first_run = last_automated_data_interval is None
if is_first_run:
if restriction.catchup:
scheduled_time = self.next_scheduled_run_time(restriction.earliest)
else:
scheduled_time = self.previous_scheduled_run_time()
if scheduled_time is None:
# No previous cron time matched. Find one in the future.
scheduled_time = self.next_scheduled_run_time()
else:
last_scheduled_time = last_automated_data_interval.end
if restriction.catchup:
scheduled_time = self.next_scheduled_run_time(last_scheduled_time)
else:
scheduled_time = self.previous_scheduled_run_time()
if scheduled_time is None or scheduled_time == last_scheduled_time:
# No previous cron time matched,
# or the matched cron time was the last execution time,
scheduled_time = self.next_scheduled_run_time()
elif scheduled_time > last_scheduled_time:
# Matched cron time was after last execution time, but before now.
# Use this cron time
pass
else:
# The last execution time is after the most recent matching cron time.
# Next scheduled run will be in the future
scheduled_time = self.next_scheduled_run_time()
if scheduled_time is None:
return None
if restriction.latest is not None and scheduled_time > restriction.latest:
# Over the DAG's scheduled end; don't schedule.
return None
start = self.data_period_start(scheduled_time)
return DagRunInfo(run_after=scheduled_time, data_interval=DataInterval(start=start, end=scheduled_time))
def data_period_start(self, period_end: DateTime):
return period_end - Duration(**{self.period_unit: self.period_length})
def croniter_values(self, base_datetime=None):
if not base_datetime:
tz = timezone(self.timezone)
base_datetime = pendulum.now(tz)
return [croniter(expr, base_datetime) for expr in self.cron_defs]
def next_scheduled_run_time(self, base_datetime: DateTime = None):
min_date = None
tz = timezone(self.timezone)
if base_datetime:
base_datetime_localized = base_datetime.in_timezone(tz)
else:
base_datetime_localized = pendulum.now(tz)
for cron in self.croniter_values(base_datetime_localized):
next_date = cron.get_next(DateTime)
if not min_date:
min_date = next_date
else:
min_date = min(min_date, next_date)
if min_date is None:
return None
return pendulum_instance(min_date)
def previous_scheduled_run_time(self, base_datetime: DateTime = None):
"""
Get the most recent time in the past that matches one of the cron schedules
"""
max_date = None
tz = timezone(self.timezone)
if base_datetime:
base_datetime_localized = base_datetime.in_timezone(tz)
else:
base_datetime_localized = pendulum.now(tz)
for cron in self.croniter_values(base_datetime_localized):
prev_date = cron.get_prev(DateTime)
if not max_date:
max_date = prev_date
else:
max_date = max(max_date, prev_date)
if max_date is None:
return None
return pendulum_instance(max_date)

def validate(self) -> None:
if not self.cron_defs:
raise AirflowTimetableInvalid("At least one cron definition must be present")
if self.period_unit not in self.valid_units:
raise AirflowTimetableInvalid(f'period_unit must be one of {self.valid_units}')
if self.period_length < 0:
raise AirflowTimetableInvalid(f'period_length must not be less than zero')
try:
self.croniter_values()
except Exception as e:
raise AirflowTimetableInvalid(str(e))
@property
def summary(self) -> str:
"""A short summary for the timetable.
This is used to display the timetable in the web UI. A cron expression
timetable, for example, can use this to display the expression.
"""
return ' || '.join(self.cron_defs) + f' [TZ: {self.timezone}]'
def serialize(self) -> Dict[str, Any]:
"""Serialize the timetable for JSON encoding.
This is called during DAG serialization to store timetable information
in the database. This should return a JSON-serializable dict that will
be fed into ``deserialize`` when the DAG is deserialized.
"""
return dict(cron_defs=self.cron_defs,
timezone=self.timezone,
period_length=self.period_length,
period_unit=self.period_unit)
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> "MultiCronTimetable":
"""Deserialize a timetable from data.
This is called when a serialized DAG is deserialized. ``data`` will be
whatever was returned by ``serialize`` during DAG serialization.
"""
return cls(**data)

class CustomTimetablePlugin(AirflowPlugin):
name = "custom_timetable_plugin"
timetables = [MultiCronTimetable]

要使用它,您需要提供一个cron表达式列表,可选的时区字符串,可选的周期长度和周期单位。

对于我的用例,我实际上不需要周期长度+单位,它们用于确定DAG的data_interval。如果您的DAG不关心data_interval,则可以将它们保留为默认值0分钟。

我试图模仿标准的schedule_interval行为。例如,如果catchup = False和DAG自上次运行以来可能已经被触发了几次(无论出于什么原因,例如DAG运行时间比预期长,或者调度程序没有运行,或者这是DAG第一次被调度),那么DAG将被调度到最近一次匹配时间运行。

我还没有真正用catchup = True测试过它,但理论上它会在DAG的start_date之后的每个匹配的cron时间运行(但每个不同的时间只运行一次,例如使用*/30 * * * *0 * * * *, DAG将每小时运行两次,而不是三次)。

示例DAG文件:

from time import sleep
import airflow
from airflow.operators.python import PythonOperator
import pendulum
from timetable import MultiCronTimetable
def sleepy_op():
sleep(660)

with airflow.DAG(
dag_id='timetable_test',
start_date=pendulum.datetime(2022, 6, 2, tz=pendulum.timezone('America/New_York')),
timetable=MultiCronTimetable(['*/5 * * * *', '*/3 * * * fri,sat', '1 12 3 * *'], timezone='America/New_York', period_length=10, period_unit='minutes'),
catchup=False,
max_active_runs=1) as dag:
sleepy = PythonOperator(
task_id='sleepy',
python_callable=sleepy_op
)

最新更新