如何在SparkKubernetesOperator操作中传递execution_date作为参数?



我正试图找到一种方法来传递execution_Date到SparkKubernetesOperator。无论如何都有可能通过,因为我将使用spark运行和s3分区的执行日期。

submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",
application_file="/k8s/compaction_s3.yml",
namespace=kubernetes_namespace,
kubernetes_conn_id="kubernetes",
params={
"warehouse_path": s3_path,
"snapshot_expire_time": execution_date,
"partition_filter": execution_date,
"k8s_namespace": kubernetes_namespace,
"docker_image_tag": docker_image_tag,
}

不幸的是,params只向jinja公开自定义值,但它不会在其中呈现jinja模板。

例如,让我们看看这个PythonOperator。

op = PythonOperator(
task_id="my_operator",
python_callable=lambda **context: print(context['params']),
params={
"date": "{{ execution_date }}"
},
dag=dag
)

日期键的值是字面值字符串"{{ execution_date }}",而不是呈现值。

[2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}

BaseOperator中的params钩子允许您传递的字典参数和/或对象到模板。请花时间理解参数my_param是如何传递到模板。

你可以在气流文档中阅读更多关于Jinja模板的参数。


execution_date可能有其他用途。SparkKubernetesOperator利用jinja模板进行这些设置。

template_fields = ['application_file', 'namespace']  
template_ext = ('yaml', 'yml', 'json')

SparkKubernetesOperator有两个模板字段,application_filenamespace,这意味着你可以使用jinja模板作为值。如果你引用一个扩展名为jinja的文件,它会在里面呈现这个文件和jinja模板。

让我们修改您提供的操作符。

submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",
application_file="/k8s/compaction_s3.yml",
namespace=kubernetes_namespace,
kubernetes_conn_id="kubernetes",
params={
"k8s_namespace": kubernetes_namespace,
"warehouse_path": s3_path,
}
)

我要猜一下/k8s/compaction_s3.yml是什么样子,并添加一些jinja模板。

---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
namespace: "{{ params.k8s_namespace }}"
labels:
warehouse_path: "{{ params.k8s_namespace }}"
date: "{{ ds }}"
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v2.4.4"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
sparkVersion: "2.4.4"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 2.4.4
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.4
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"

您可以在DAG中检查任务实例的呈现模板视图。

请同时参考气流文档中的示例DAG和示例application_file。

https://stackoverflow.com/questions/66481727/how-to-pass-execution-date-as-parameter-in-sparkkubernetesoperator-operator\

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:  
name: "sparkapp-test-{{ ts_nodash|lower }}-{{ task_instance.try_number }}"
namespace: "default"
spec:
type: Python
pythonVersion: "3"
...
timeToLiveSeconds: 3600 # delete sparkapplication after an hour

需要一个唯一的sparkapplication id,所以我把|lower改成小写的T

$ k get pod | grep sparkapp-test
# 1    sparkapp-test-20211122t070418-1-driver                    1/1     Running     0          15s
$ k get pod | grep sparkapp-test
# 2-1 sparkapp-test-20211122t070418-1-a882cc7d46760183-exec-1   1/1     Running     0          4s
# 2-2 sparkapp-test-20211122t070418-1-a882cc7d46760183-exec-2   1/1     Running     0          4s
# 2-3 sparkapp-test-20211122t070418-1-a882cc7d46760183-exec-3   1/1     Running     0          4s

最新更新