气流- DAG完整性测试- sqlalchemy. exe . operationalerror:(sqlite3.Op



我正在尝试在气流中编写一些DAG完整性测试。我遇到的问题是我正在测试的DAG,我在该DAG内的一些任务中引用了变量。

如:Variable.get("AIRFLOW_VAR_BLOB_CONTAINER"

我似乎得到了错误:sqlalchemy.exc。OperationalError: (sqlite3.OperationalError) no such table: variable

,因为当通过pytest测试时,这些变量(和变量表)不存在。有人知道在运行DAG完整性测试时处理变量/连接引用的任何变通方法或建议方法吗?

谢谢,

您可以创建一个本地metastore进行测试。在没有任何其他设置的情况下运行airflow db init将在您的主目录中创建一个SQLite metastore,您可以在测试期间使用它。我对用于测试的本地转移的默认附加设置是:

AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False  (to ensure there are no defaults to make things magically work)
AIRFLOW__CORE__LOAD_EXAMPLES=False  (to ensure there are no defaults to make things magically work)
AIRFLOW__CORE__UNIT_TEST_MODE=True  (Set default test settings, skip certain actions, etc.)
AIRFLOW_HOME=[project root dir]  (To avoid Airflow files in your home dir)

使用这些设置运行airflow db init会在项目根目录下产生三个文件:

  1. unittests.db
  2. unittests.cfg
  3. webserver_config.py

这可能是一个好主意,将这些添加到您的.gitignore。有了这个设置,您就可以在测试期间安全地针对本地metastoreunittests.db进行测试(确保在运行pytest时设置了相同的env变量)。

或者,如果您出于某些原因不想要一个局部的metastore,您将不得不求助于mock来代替Airflow对metastore的调用。这需要了解气流内部的知识。一个例子:

import datetime
from unittest import mock
from airflow.models import DAG
from airflow.operators.bash import BashOperator

def test_bash_operator(tmp_path):
with DAG(dag_id="test_dag", start_date=datetime.datetime(2021, 1, 1), schedule_interval="@daily") as dag:
with mock.patch("airflow.models.variable.Variable.get") as variable_get_mock:
employees = ["Alice", "Bob", "Charlie"]
variable_get_mock.return_value = employees
output_file = tmp_path / "output.txt"
test = BashOperator(task_id="test", bash_command="echo {{ var.json.employees }} > " + str(output_file))
dag.clear()
test.run(
start_date=dag.start_date,
end_date=dag.start_date,
ignore_first_depends_on_past=True,
ignore_ti_state=True,
)
variable_get_mock.assert_called_once()
assert output_file.read_text() == f"[{', '.join(employees)}]n"

这些行:

with mock.patch("airflow.models.variable.Variable.get") as variable_get_mock:
employees = ["Alice", "Bob", "Charlie"]
variable_get_mock.return_value = employees

确定实际上没有调用函数airflow.models.variable.Variable.get,而是返回了这个列表:["Alice", "Bob", "Charlie"]。由于task.run()没有返回任何内容,因此我让bash_command写入tmp_path,并读取文件以断言内容是否符合我的期望。

这完全避免了对转移的需要,但是一旦您的测试超出了像这样的基本示例,mock可能会做很多工作并且很复杂。

相关内容

最新更新