我正试图找到一种方法来传递execution_Date到SparkKubernetesOperator。无论如何都有可能通过,因为我将使用spark运行和s3分区的执行日期。
submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",
application_file="/k8s/compaction_s3.yml",
namespace=kubernetes_namespace,
kubernetes_conn_id="kubernetes",
params={
"warehouse_path": s3_path,
"snapshot_expire_time": execution_date,
"partition_filter": execution_date,
"k8s_namespace": kubernetes_namespace,
"docker_image_tag": docker_image_tag,
}
不幸的是,params
只向jinja公开自定义值,但它不会在其中呈现jinja模板。
例如,让我们看看这个PythonOperator。
op = PythonOperator(
task_id="my_operator",
python_callable=lambda **context: print(context['params']),
params={
"date": "{{ execution_date }}"
},
dag=dag
)
日期键的值是字面值字符串"{{ execution_date }}"
,而不是呈现值。
[2021-03-05 01:24:26,770] {logging_mixin.py:103} INFO - {'date': '{{ execution_date }}'}
BaseOperator中的params钩子允许您传递的字典参数和/或对象到模板。请花时间理解参数my_param是如何传递到模板。
你可以在气流文档中阅读更多关于Jinja模板的参数。
execution_date
可能有其他用途。SparkKubernetesOperator利用jinja模板进行这些设置。
template_fields = ['application_file', 'namespace']
template_ext = ('yaml', 'yml', 'json')
SparkKubernetesOperator有两个模板字段,application_file
和namespace
,这意味着你可以使用jinja模板作为值。如果你引用一个扩展名为jinja的文件,它会在里面呈现这个文件和jinja模板。
让我们修改您提供的操作符。
submit_compaction_to_spark = SparkKubernetesOperator(
task_id="submit_compaction_to_spark",
application_file="/k8s/compaction_s3.yml",
namespace=kubernetes_namespace,
kubernetes_conn_id="kubernetes",
params={
"k8s_namespace": kubernetes_namespace,
"warehouse_path": s3_path,
}
)
我要猜一下/k8s/compaction_s3.yml
是什么样子,并添加一些jinja模板。
---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: "spark-pi-{{ ds }}-{{ task_instance.try_number }}"
namespace: "{{ params.k8s_namespace }}"
labels:
warehouse_path: "{{ params.k8s_namespace }}"
date: "{{ ds }}"
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v2.4.4"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar"
sparkVersion: "2.4.4"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 2.4.4
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 2.4.4
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
您可以在DAG中检查任务实例的呈现模板视图。
请同时参考气流文档中的示例DAG和示例application_file。
https://stackoverflow.com/questions/66481727/how-to-pass-execution-date-as-parameter-in-sparkkubernetesoperator-operator\
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: "sparkapp-test-{{ ts_nodash|lower }}-{{ task_instance.try_number }}"
namespace: "default"
spec:
type: Python
pythonVersion: "3"
...
timeToLiveSeconds: 3600 # delete sparkapplication after an hour
需要一个唯一的sparkapplication id,所以我把|lower
改成小写的T
$ k get pod | grep sparkapp-test
# 1 sparkapp-test-20211122t070418-1-driver 1/1 Running 0 15s
$ k get pod | grep sparkapp-test
# 2-1 sparkapp-test-20211122t070418-1-a882cc7d46760183-exec-1 1/1 Running 0 4s
# 2-2 sparkapp-test-20211122t070418-1-a882cc7d46760183-exec-2 1/1 Running 0 4s
# 2-3 sparkapp-test-20211122t070418-1-a882cc7d46760183-exec-3 1/1 Running 0 4s