无法运行KubernetesPodOperator



我正在使用气流2.4.3和运行KubernetesPodOperator

下面是代码和错误:-

请帮助我在python中创建KubernetesPosOperator。GCP和Azure我都试过了。

还添加了kubernetes文档供参考:-

https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/operators/kubernetes_pod/index.html airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator

如果需要,我也可以分享任何其他信息。


from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
#custom modules
from spark_API.spark_submit import SparkSubmit
#import modules
import json
import datetime
import logging
import airflow_API
import time

airflow_dag_object = AirflowDagUtilities("aaa_test_airflow_api")

def def_func1(**kwargs):
print("In func1")

namespace = "segmentation-pipeline"
docker_image = "****:v6" # name commented out
is_delete_operator_pod = True

docker_image_creds =  [k8s.V1LocalObjectReference("****")] # harbor name commented out

submit_command = ["/bin/bash","-c"]

max_cores = 60
driver_memory = "4g"
executor_memory = "4g"

submit_args = "/usr/local/spark/bin/spark-submit --master local[" + str(max_cores) + "] --driver-memory " + 
driver_memory + " --executor-memory " + executor_memory + " "

submit_spark_pipeline_config_conf = "--conf " + ''' + 'spark.pipelineConfig' + "=" + json.dumps(_infra_config.get_infra_config(),separators=(',',':')) + ''' + " "

submit_spark_broadcast_timeout = "--conf " + '"' + "spark.sql.broadcastTimeout" + "=" + str("36000") + '"' + " "

submit_spark_max_result_size = "--conf " + '"' + "spark.driver.maxResultSize" + "=" + str("0") + '"' + " "
final_dependency_jars = ["./resources/mysql_connector_java_5.1.45.jar",
"./resources/commons_httpclient_3.0.1.jar"]


dependency_jars_string = ','.join(list(set(final_dependency_jars)))

submit_spark_dependency_jars = "--conf " + '"' + "spark.jars" + "=" + dependency_jars_string + '"' + " "
extra_conf = []
extra_conf_final = []

for conf in extra_conf:
conf_appended_string = "--conf " + '"' + conf + ''' + " "
extra_conf_final.append(conf_appended_string)


extra_conf = " ".join(extra_conf_final) + " "

airflow_task_settings = airflow_API.extract_airflow_task_details(kwargs['task_instance'])

submit_spark_airflow_task_details = "--conf " + '"' + "spark.airflowTaskDetails" + "=" + json.dumps(airflow_task_settings) + ''' + " "

common_submit_args_beginning = submit_args + submit_spark_broadcast_timeout + submit_spark_max_result_size + submit_spark_dependency_jars + extra_conf + submit_spark_airflow_task_details

application_resource = "/update_scores.py"
application_arguments = ["test_args"]

string_application_arguments = " "
for i in range(0,len(application_arguments)):
string_application_arguments = string_application_arguments + " " + json.dumps(application_arguments[i]) 


common_submit_args_end = application_resource + string_application_arguments

platform_utilities = PlatformUtilities(_infra_config)

print("platform_utilities.get_python_modules_path() -> ",str(platform_utilities.get_python_modules_path()))

submit_spark_python_module_path = "--conf " + '"' + "spark.modulePath" + "=" + str(platform_utilities.get_python_modules_path()) + '"' + " "

submit_spark_args = [common_submit_args_beginning + submit_spark_pipeline_config_conf + submit_spark_python_module_path + common_submit_args_end]

print("submit_spark_args -> ",submit_spark_args)

submit_in_cluster = True


submit_spark_pod_affinity = k8s.V1Affinity(
node_affinity=k8s.V1NodeAffinity(k8s.V1NodeSelectorTerm(
match_expressions=[
k8s.V1NodeSelectorRequirement(key="****", operator="In", values=["n2-highmem-8"]),
k8s.V1NodeSelectorRequirement(key="deployment", operator="In", values=["dynamic"]),
]
)
)
)


submit_spark_pod_tolerations = [k8s.V1Toleration(key="deployment", operator="Equal", value="dynamic", effect="NoSchedule")]

application_name = "test_airflow_api_test_task_id"

container_resources = k8s.V1ResourceRequirements(
requests={
'memory': str("10Gi"),
'cpu': str("2")
},
limits={
'memory': str("50Gi"),
'cpu': str("5")
}
)

submit_startup_timeout_seconds = 600

submit_get_logs = True

kube_submssion = KubernetesPodOperator(namespace = namespace,
image = docker_image,
is_delete_operator_pod = is_delete_operator_pod,
image_pull_secrets = docker_image_creds,
cmds = submit_command,
arguments = submit_spark_args,
in_cluster = submit_in_cluster,
affinity = submit_spark_pod_affinity,
tolerations = submit_spark_pod_tolerations,
container_resources = container_resources,
name = application_name,
task_id = application_name,
startup_timeout_seconds = submit_startup_timeout_seconds,
get_logs = submit_get_logs
)

kube_submssion.execute(context = None)


def def_func2(**kwargs):
print("In func2")
dag_base = airflow_dag_object.get_dag_object()
func1=PythonOperator(
task_id='func1',
provide_context=True,
python_callable=def_func1,
dag=dag_base
)
func2=PythonOperator(
task_id='func2',
provide_context=True,
python_callable=def_func2,
dag=dag_base
)
func1 >> func2

输出错误:-

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 419, in execute
context=context,
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 387, in get_or_create_pod
pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py", line 371, in find_pod
label_selector=label_selector,
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 15697, in list_namespaced_pod
return self.list_namespaced_pod_with_http_info(namespace, **kwargs)  # noqa: E501
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 15826, in list_namespaced_pod_with_http_info
collection_formats=collection_formats)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
_preload_content, _request_timeout, _host)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
_request_timeout=_request_timeout)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 377, in request
headers=headers)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 244, in GET
query_params=query_params)
File "/home/airflow/.local/lib/python3.7/site-packages/kubernetes/client/rest.py", line 234, in request
raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'Audit-Id': '6ab39ea1-f955-4481-b3eb-7b3abe747a7c', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Kubernetes-Pf-Flowschema-Uid': '8e487991-120d-49d0-940a-ace0b0e64421', 'X-Kubernetes-Pf-Prioritylevel-Uid': '8f6ab0b3-abdf-4782-994c-2f0f247592d2', 'Date': 'Thu, 12 Jan 2023 13:13:20 GMT', 'Content-Length': '169'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"found ',', expected: !, identifier, or 'end of string'","reason":"BadRequest","code":400}

气流<2.3 KubernetesPodOperator用于处理上下文

正如你的问题所提到的

kube_submssion = KubernetesPodOperator(namespace = namespace,
image = docker_image,
is_delete_operator_pod = is_delete_operator_pod,
image_pull_secrets = docker_image_creds,
cmds = submit_command,
arguments = submit_spark_args,
in_cluster = submit_in_cluster,
affinity = submit_spark_pod_affinity,
tolerations = submit_spark_pod_tolerations,
container_resources = container_resources,
name = application_name,
task_id = application_name,
startup_timeout_seconds = submit_startup_timeout_seconds,
get_logs = submit_get_logs
)

kube_submssion.execute(context = None)

execute方法正在等待下面链接

文档中提到的上下文。https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_modules/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.html KubernetesPodOperator.execute

你可以从**kwargs传递上下文到execute方法。您可以尝试通过将kwargs传递给execute方法

kube_submssion.execute(context = kwargs)

相关内容

  • 没有找到相关文章

最新更新