使用PythonOperator装载persistentvolumecclaim



我有一个简单的气流DAG与2个操作符(PythonOperator,和KubernetesPodOperator):

with DAG(dag_id="dummy", start_date=datetime(2020, 11, 7), catchup=False) as dag:
logger = logging.getLogger("airflow.task")
volume_mount = k8s.v1_volume_mount.V1VolumeMount(name='osm-config',
mount_path=ROOT_PATH,
sub_path=None,
read_only=False)
pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="osm-config-pv-claim")
volume = k8s.v1_volume.V1Volume(name="osm-config",
persistent_volume_claim=pvc)
def do_it():
logger.debug("do work")

start = DummyOperator(task_id="start", dag=dag)
test = PythonOperator(task_id="test",
python_callable=do_it,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[volume_mount]
)
],
volumes=[volume],
)
)
},
dag=dag)
download_data = KubernetesPodOperator(task_id="download_data",
namespace="default",
name="openmaptiles_download_data",
image="openmaptiles/openmaptiles-tools",
cmds=["download-osm"],
volumes=[volume],
volume_mounts=[volume_mount],
dag=dag)

start >> download_data >> test

目标是拥有一个可供两个操作符使用的持久卷。k8s操作符按预期装载值,并下载所需的所有内容。然而,PythonOperator将永远保持在queued状态。

跟踪调度程序pod显示以下错误:

Pod版本"v1"不能作为Pod来处理:v1.Pod。规范:v1.PodSpec。容器:[]v1。容器:v1.Container。VolumeMounts: [] v1。VolumeMount: readObjectStart:期望{或n,但发现",在#10字节中发现错误…|-data"}

我怀疑这是由于没有正确设置卷/卷挂载,因为格式看起来不正确:

...
"volumeMounts": [ 
{ 
"mountPath": "/opt/airflow/dags", 
"name": "dags-data" 
}, 
{ 
"mountPath": "/opt/airflow/logs", 
"name": "logs-data" 
}, 
"{'mount_path': '/osm_config',n 'mount_propagation': None,n 'name': 'test',n 'read_only': False,n 'sub_path': None,n 'sub_path_expr': None}" 
] 

但我的配置似乎与气流文档一致

问题是传递给PythonOperator的Volume的类型。

我最初的示例使用k8s.v1_volume.V1Volumek8s.v1_volume_mount.V1VolumeMount,但切换到k8s.V1Volumek8s.V1VolumeMount将创建一个pod,其中卷按预期安装。

相关内容

  • 没有找到相关文章

最新更新