有没有办法通过气流API创建/修改连接



通过Admin -> Connections,我们可以创建/修改连接的参数,但我想知道我是否可以通过API做同样的事情,以便我可以通过编程方式设置连接

airflow.models.Connection似乎它只处理实际连接到实例而不是将其保存到列表中。这似乎是一个应该实现的功能,但我不确定在哪里可以找到此特定功能的文档。

连接实际上是一个模型,可用于查询和插入新连接

from airflow import settings
from airflow.models import Connection
conn = Connection(
conn_id=conn_id,
conn_type=conn_type,
host=host,
login=login,
password=password,
port=port
) #create a connection object
session = settings.Session() # get the session
session.add(conn)
session.commit() # it will insert the connection object programmatically.

如果需要在Python/Airflow 代码之外、通过 bash、在 Dockerfile 等情况下,您还可以从 Airflow CLI 添加、删除和列出连接。

airflow connections --add ...

用法:

airflow connections [-h] [-l] [-a] [-d] [--conn_id CONN_ID]
[--conn_uri CONN_URI] [--conn_extra CONN_EXTRA]
[--conn_type CONN_TYPE] [--conn_host CONN_HOST]
[--conn_login CONN_LOGIN] [--conn_password CONN_PASSWORD]
[--conn_schema CONN_SCHEMA] [--conn_port CONN_PORT]

https://airflow.apache.org/cli.html#connections

看起来 CLI 目前不支持修改现有连接,但在 GitHub 上有一个活动的开放 PR 存在 Jira 问题。

  • AIRFLOW-2840 - 用于更新现有连接的 cli 选项
  • https://github.com/apache/incubator-airflow/pull/3684

首先检查连接是否存在,然后使用from airflow.models import Connection创建新连接:

import logging
from airflow import settings
from airflow.models import Connection
def create_conn(conn_id, conn_type, host, login, pwd, port, desc):
conn = Connection(conn_id=conn_id,
conn_type=conn_type,
host=host,
login=login,
password=pwd,
port=port,
description=desc)
session = settings.Session()
conn_name = session.query(Connection).filter(Connection.conn_id == conn.conn_id).first()
if str(conn_name) == str(conn.conn_id):
logging.warning(f"Connection {conn.conn_id} already exists")
return None
session.add(conn)
session.commit()
logging.info(Connection.log_info(conn))
logging.info(f'Connection {conn_id} is created')
return conn

您可以使用连接 URI 格式使用环境变量填充连接。

环境变量命名约定AIRFLOW_CONN_,全部大写。

因此,如果您的连接 ID my_prod_db则变量名称应AIRFLOW_CONN_MY_PROD_DB。

一般来说,Airflow 的 URI 格式是这样的:

my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2

请注意,以这种方式注册的连接不会显示在气流 UI 中。

为了使用session = settings.Session(),它假定气流数据库后端已经启动。对于那些尚未为您的开发环境设置它的用户,同时使用 Connection 类和环境变量的混合方法将是一种解决方法。

下面是设置 S3Hook 的示例

from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.models.connection import Connection
import os
import json
aws_default = Connection(
conn_id="aws_default",
conn_type="aws",
login='YOUR-AWS-KEY-ID',
password='YOUR-AWS-KEY-SECRET',
extra=json.dumps({'region_name': 'us-east-1'})
)
os.environ["AIRFLOW_CONN_AWS_DEFAULT"] = aws_default.get_uri()
s3_hook = S3Hook(aws_conn_id='aws_default')
s3_hook.list_keys(bucket_name='YOUR-BUCKET', prefix='YOUR-FILENAME')

最新更新