Airlfow API:带参数的测试任务



我不知道如何使用pytest来测试等待xcom_arg的dag任务。

我使用新的气流API语法创建了以下DAG:


@dag(...)
def transfer_files():
@task()
def retrieve_existing_files():
existing = []
for elem in os.listdir("./backup"):
existing.append(elem)
return existing
@task()
def get_new_file_to_sync(existing: list[str]):
new_files = []
for elem in os.listdir("./prod"):
if not elem in existing:
new_files.append(elem)
return new_files
r = retrieve_existing_files()
get_new_file_to_sync(r)

现在我想对get_new_file_to_sync任务执行单元测试。我写了以下测试:

def test_get_new_elan_list():
mocked_existing = ["a.out", "b.out"]
dag_bag = DagBag(include_examples=False)
dag = dag_bag.get_dag("transfer_files")
task = dag.get_task("get_new_file_to_sync")
result = task.execute({}, mocked_existing)
print(result)

测试失败,因为task.execute正在等待2个参数,但给出了3个。

我的问题是,我不知道如何继续测试我的任务,用一个模拟的自定义参数等待参数。

感谢您的见解

我设法找到了一种方法来对使用新的气流API声明的气流任务进行单元测试。

以下是问题中声明的DAGtransfer_files中包含的任务get_new_file_to_sync的测试用例:

def test_get_new_file_to_synct():
mocked_existing = ["a.out", "b.out"]

# Asking airflow to load the dags in its home folder
dag_bag = DagBag(include_examples=False)
# Retrieving the dag to test
dag = dag_bag.get_dag("transfer_files")

# Retrieving the task to test
task = dag.get_task("get_new_file_to_sync")
# extracting the function to test from the task
function_to_unit_test = task.python_callable

# Calling the function normally
results = function_to_unit_test(mocked_existing)
assert len(results) == 10

这允许在调用为任务编写的实际代码之前绕过所有触发的气流机制。因此,您可以专注于为任务编写的代码编写测试。

对于测试这样的任务,我认为您需要使用pytest中的mocking

让我们以这个用户定义的操作符为例:

class MovielensPopularityOperator(BaseOperator):
def __init__(self, conn_id, start_date, end_date, min_ratings=4, top_n=5, **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
self._min_ratings = min_ratings
self._top_n = top_n
def execute(self, context):
with MovielensHook(self._conn_id) as hook:
ratings = hook.get_ratings(start_date=self._start_date, end_date=self._end_date)
rating_sums = defaultdict(Counter)
for rating in ratings:
rating_sums[rating["movieId"]].update(count=1, rating=rating["rating"])
averages = {
movie_id: (rating_counter["rating"] / rating_counter["count"], rating_counter["count"])
for movie_id, rating_counter in rating_sums.items()
if rating_counter["count"] >= self._min_ratings
}
return sorted(averages.items(), key=lambda x: x[1], reverse=True)[: self._top_n]

还有一个和你写的一样的测试:

def test_movielenspopularityoperator():
task = MovielensPopularityOperator(
task_id="test_id",
start_date="2015-01-01",
end_date="2015-01-03",
top_n=5,
)
result = task.execute(context={})
assert len(result) == 5

运行此测试失败为:

=============================== FAILURES ===============================
___________________ test_movielenspopularityoperator ___________________
mocker = <pytest_mock.plugin.MockFixture object at 0x10fb2ea90>
def test_movielenspopularityoperator(mocker: MockFixture):
task = MovielensPopularityOperator(
➥
>
task_id="test_id", start_date="2015-01-01", end_date="2015-01-
03", top_n=5
)
➥
E
TypeError: __init__() missing 1 required positional argument:
'conn_id'
tests/dags/chapter9/custom/test_operators.py:30: TypeError
========================== 1 failed in 0.10s ==========================

测试失败,因为我们缺少必需的参数conn_id,该参数指向元存储中的连接id。但你如何在测试中提供这一点?测试应相互隔离;它们不应该影响其他测试的结果,因此在测试之间共享数据库不是一个理想的情况。在这种情况下,嘲讽起到了拯救作用。

模拟是"伪造"某些操作或对象。例如,通过告诉Python返回某个值,而不是对(在测试期间不存在的(数据库进行实际调用,可以伪造或模拟对生产设置中预期存在但在测试期间没有的数据库的调用。这允许您在不需要连接到外部系统的情况下开发和运行测试。它需要深入了解您正在测试的任何内容的内部,因此有时需要深入研究第三方代码。

在您的环境中安装pytest mock后:

pip install pytest-mock

以下是使用嘲讽的测试:

def test_movielenspopularityoperator(mocker):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(conn_id="test", login="airflow", password="airflow"),
)
task = MovielensPopularityOperator(
task_id="test_id",
conn_id="test",
start_date="2015-01-01",
end_date="2015-01-03",
top_n=5,
)
result = task.execute(context=None)
assert len(result) == 5

现在,希望这能给你一个关于如何为气流任务编写测试的想法。

有关模拟和单元测试的更多信息,您可以在此处查看。

最新更新