我正试图将带有dict的参数从气流xcom_pull推送到PapermillOperator,就像这样:
send_to_jupyter_operator = PapermillOperator(
task_id='send_to_jupyter',
input_nb="./dags/notebooks/input_test.ipynb",
output_nb="./dags/notebooks/{{ execution_date }}-result.ipynb",
parameters={"table_list": "{{ ti.xcom_pull(dag_id='select_data_from_table',task_ids='select_data', key='table_result_dict') }}"} )
Task_id='select_data'的任务-它是一个PythonOperator,它将dict推送到xcom。
内部ti.xcom_pull(dag_id='select_data_from_table', task_ids='select_data', key='table_result_dict')
-dicts的dict(keys-维度名称,values-带key=属性名称的dicts,values–值列表(;
但是用这个语法jupyter笔记本导入字符串,而不是dict,比如:table_list = "{'key1': {'attr1': []}}"
有什么技巧可以解决这个问题吗?
我已经尝试使用:parameters={"table_list": {{ ti.xcom_pull(dag_id='select_data_from_table', task_ids='select_data', key='table_result_dict') }} }
-在这个密钥中,python不知道"ti"实际上是什么。
parameters={"table_list": {{ context['ti'].xcom_pull(dag_id='select_data_from_table', task_ids='select_data', key='table_result_dict') }} }
-在这个密钥中,python不知道"context"实际上是什么。
我用另一种方法解决了这个问题。
只需将此添加到您的jupyter笔记本:
list = json.loads(input_list.replace("'",'"').replace('None', 'null'))