气流 SSH 操作员如何指定身份验证类型



我正在尝试使用SSHOperator从 Airflow worker ssh 到我的一台服务器。我的 SSH 被配置为使用身份验证类型作为 Kerberos。我收到以下错误,默认SSH连接配置了以下额外参数。

SSH 操作员错误:身份验证类型错误;允许的类型:['公钥、'gssapi-keyex'、gssapi-with-mic、'keyboard-interactive']

我该如何解决此错误。我在气流UI中连接的额外字段中尝试了以下设置。

{ "gss-auth":"true","gssapi-keyex":"true","gss-kex":"true"}

气流SSHoperator中是否有一个选项来指定要使用的身份验证类型是 Kerberos?

在现有的 AirflowSSHOperator中没有 Kerberos 身份验证支持,即使底层 Paramiko 库具有该支持。

我能够通过编写一个扩展SSHHook的自定义钩子来解决此问题,该钩子将参数传递给底层 Paramiko 库以将 Kerberos 指定为身份验证类型。成功了!得益于气流的易扩展性。

自定义挂钩:

class CustomSSHHook(SSHHook):
"""
Custom SSH Hook with kerberose authentication support
"""
def __init__(self,
ssh_conn_id=None,
remote_host=None,
username=None,
password=None,
key_file=None,
port=None,
timeout=10,
keepalive_interval=30):
super(CustomSSHHook, self).__init__(
ssh_conn_id,
remote_host,
username,
password,
key_file,
port,
timeout,
keepalive_interval)
# Use connection to override defaults
self.gss_auth = False
if self.ssh_conn_id is not None:
conn = self.get_connection(self.ssh_conn_id)
if conn.extra is not None:
extra_options = conn.extra_dejson
if "gss_auth" in extra_options 
and str(extra_options["gss_auth"]).lower() == 'true':
self.gss_auth = True
def get_conn(self):
"""
Opens a ssh connection to the remote host.
:return paramiko.SSHClient object
"""
self.log.debug('Creating SSH client for conn_id: %s', self.ssh_conn_id)
client = paramiko.SSHClient()
client.load_system_host_keys()
if self.no_host_key_check:
# Default is RejectPolicy
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if self.password and self.password.strip():
client.connect(hostname=self.remote_host,
username=self.username,
password=self.password,
key_filename=self.key_file,
timeout=self.timeout,
compress=self.compress,
port=self.port,
sock=self.host_proxy)
else:
client.connect(hostname=self.remote_host,
username=self.username,
key_filename=self.key_file,
timeout=self.timeout,
compress=self.compress,
port=self.port,
sock=self.host_proxy,
gss_auth=self.gss_auth)
if self.keepalive_interval:
client.get_transport().set_keepalive(self.keepalive_interval)
self.client = client
return client

class CustomSshPlugin(AirflowPlugin):
name = "ssh_plugins"
DEFAULT_PLUGIN = "1.0"
hooks = [CustomSSHHook]

以上可以在DAG中使用,如下所示:

from airflow.hooks.ssh_plugins import CustomSSHHook
edge_node_hook = CustomSSHHook(ssh_conn_id="ssh_con_id",
remote_host=host_ip,
port=22,
timeout=100)
ssh_execution = SSHOperator(dag=dag, task_id='sample_task',
ssh_hook=edge_node_hook,
command='whoami ',
do_xcom_push=False)

您必须在与id"ssh_conn_id"相关的额外字段中添加以下参数。

{"gss_auth":"true"}

最新更新