我有一个简单的气流DAG与2个操作符(PythonOperator,和KubernetesPodOperator):
with DAG(dag_id="dummy", start_date=datetime(2020, 11, 7), catchup=False) as dag:
logger = logging.getLogger("airflow.task")
volume_mount = k8s.v1_volume_mount.V1VolumeMount(name='osm-config',
mount_path=ROOT_PATH,
sub_path=None,
read_only=False)
pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="osm-config-pv-claim")
volume = k8s.v1_volume.V1Volume(name="osm-config",
persistent_volume_claim=pvc)
def do_it():
logger.debug("do work")
start = DummyOperator(task_id="start", dag=dag)
test = PythonOperator(task_id="test",
python_callable=do_it,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[volume_mount]
)
],
volumes=[volume],
)
)
},
dag=dag)
download_data = KubernetesPodOperator(task_id="download_data",
namespace="default",
name="openmaptiles_download_data",
image="openmaptiles/openmaptiles-tools",
cmds=["download-osm"],
volumes=[volume],
volume_mounts=[volume_mount],
dag=dag)
start >> download_data >> test
目标是拥有一个可供两个操作符使用的持久卷。k8s操作符按预期装载值,并下载所需的所有内容。然而,PythonOperator将永远保持在queued
状态。
跟踪调度程序pod显示以下错误:
Pod版本"v1"不能作为Pod来处理:v1.Pod。规范:v1.PodSpec。容器:[]v1。容器:v1.Container。VolumeMounts: [] v1。VolumeMount: readObjectStart:期望{或n,但发现",在#10字节中发现错误…|-data"}
我怀疑这是由于没有正确设置卷/卷挂载,因为格式看起来不正确:
...
"volumeMounts": [
{
"mountPath": "/opt/airflow/dags",
"name": "dags-data"
},
{
"mountPath": "/opt/airflow/logs",
"name": "logs-data"
},
"{'mount_path': '/osm_config',n 'mount_propagation': None,n 'name': 'test',n 'read_only': False,n 'sub_path': None,n 'sub_path_expr': None}"
]
但我的配置似乎与气流文档一致
问题是传递给PythonOperator的Volume
的类型。
我最初的示例使用k8s.v1_volume.V1Volume
和k8s.v1_volume_mount.V1VolumeMount
,但切换到k8s.V1Volume
和k8s.V1VolumeMount
将创建一个pod,其中卷按预期安装。