气流 DAG 序列化:类型错误:类型"V1Pod"的对象不可 JSON 序列化



"当使用KubernetesExecutor时,Airflow提供了在每个任务的基础上覆盖系统默认值的能力。为了利用这个功能,我们可以创建一个KubernetesV1pod对象并填充所需的覆盖。">

我正试图用以下操作符触发DAG(来自官方核心文档的示例):

...
volume_task = PythonOperator(
task_id="task_with_volume",
python_callable=test_volume_mount,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/foo/", name="example-kubernetes-test-volume"
)
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
),
},
)
...

当我在UI中点击我的DAG时,我得到了以下错误:


____/ (  (    )   )  ___
/( (  (  )   _    ))  )   )
((     (   )(    )  )   (   )  )
((/  ( _(   )   (   _) ) (  () )  )
( (  ( (_)   ((    (   )  .((_ ) .  )_
( (  )    (      (  )    )   ) . ) (   )
(  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
( (  (   ) (  )   (  ))     ) _)(   )  )  )
( (  (  ) (    (_  ( ) ( )  )   ) )  )) ( )
(  (   (  (   (_ ( ) ( _    )  ) (  )  )   )
( (  ( (  (  )     (_  )  ) )  _)   ) _( ( )
((  (   )(    (     _    )   _) _(_ (  (_ )
(_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
((__)        \||lll|l||///          _))
(   /(/ (  )  ) )   )
(    ( ( ( | | ) ) )   )
(   /(| / ( )) ) ) )) )
(     ( ((((_(|)_)))))     )
(      ||(|(|)|/||     )
(        |(||(||)||||        )
(     //|/l|||)|\      )
(/ / //  /|//||||\      _)
-------------------------------------------------------------------------------
Node: 9c9de21b5ea0
-------------------------------------------------------------------------------
Traceback (most recent call last):
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/opt/python3.6/lib/python3.6/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/opt/python3.6/lib/python3.6/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/flask_login/utils.py", line 258, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/www/utils.py", line 386, in view_func
return f(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/www/utils.py", line 292, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/www/views.py", line 1706, in tree
show_external_logs=bool(external_logs))
File "/usr/local/lib/airflow/airflow/www/views.py", line 425, in render
return super(AirflowViewMixin, self).render(template, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/base.py", line 308, in render
return render_template(template, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/flask/templating.py", line 140, in render_template
ctx.app,
File "/opt/python3.6/lib/python3.6/site-packages/flask/templating.py", line 120, in _render
rv = template.render(context)
File "/opt/python3.6/lib/python3.6/site-packages/jinja2/environment.py", line 1090, in render
self.environment.handle_exception()
File "/opt/python3.6/lib/python3.6/site-packages/jinja2/environment.py", line 832, in handle_exception
reraise(*rewrite_traceback_stack(source=source))
File "/opt/python3.6/lib/python3.6/site-packages/jinja2/_compat.py", line 28, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/airflow/airflow/www/templates/airflow/tree.html", line 20, in top-level template code
{% extends "airflow/dag.html" %}
File "/usr/local/lib/airflow/airflow/www/templates/airflow/dag.html", line 21, in top-level template code
{% import 'admin/lib.html' as lib with context %}
File "/usr/local/lib/airflow/airflow/www/templates/airflow/master.html", line 20, in top-level template code
{% extends "admin/master.html" %}
File "/usr/local/lib/airflow/airflow/www/templates/admin/master.html", line 20, in top-level template code
{% extends 'admin/base.html' %}
File "/opt/python3.6/lib/python3.6/site-packages/flask_admin/templates/bootstrap3/admin/base.html", line 95, in top-level template code
{% block tail %}
File "/usr/local/lib/airflow/airflow/www/templates/airflow/tree.html", line 85, in block "tail"
var data = {{ data|tojson }};
File "/opt/python3.6/lib/python3.6/site-packages/flask/json/__init__.py", line 376, in tojson_filter
return Markup(htmlsafe_dumps(obj, **kwargs))
File "/opt/python3.6/lib/python3.6/site-packages/flask/json/__init__.py", line 290, in htmlsafe_dumps
dumps(obj, **kwargs)
File "/opt/python3.6/lib/python3.6/site-packages/flask/json/__init__.py", line 211, in dumps
rv = _json.dumps(obj, **kwargs)
File "/opt/python3.6/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/opt/python3.6/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/opt/python3.6/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/opt/python3.6/lib/python3.6/site-packages/flask/json/__init__.py", line 100, in default
return _json.JSONEncoder.default(self, o)
File "/opt/python3.6/lib/python3.6/json/encoder.py", line 180, in default
o.__class__.__name__)
TypeError: Object of type 'V1Pod' is not JSON serializable

你能告诉我发生了什么事,我该如何解决这个问题吗?由于

这是一个bug,在气流2.0.0(见PR)修复

最新更新