从xcom气流向PapermillOperator推送dict参数时出现问题



我正试图将带有dict的参数从气流xcom_pull推送到PapermillOperator,就像这样:

send_to_jupyter_operator = PapermillOperator(
task_id='send_to_jupyter',     
input_nb="./dags/notebooks/input_test.ipynb",     
output_nb="./dags/notebooks/{{ execution_date }}-result.ipynb",     
parameters={"table_list": "{{ ti.xcom_pull(dag_id='select_data_from_table',task_ids='select_data', key='table_result_dict') }}"} )

Task_id='select_data'的任务-它是一个PythonOperator,它将dict推送到xcom。

内部ti.xcom_pull(dag_id='select_data_from_table', task_ids='select_data', key='table_result_dict')-dicts的dict(keys-维度名称,values-带key=属性名称的dicts,values–值列表(;

但是用这个语法jupyter笔记本导入字符串,而不是dict,比如:table_list = "{'key1': {'attr1': []}}"

有什么技巧可以解决这个问题吗?

我已经尝试使用:parameters={"table_list": {{ ti.xcom_pull(dag_id='select_data_from_table', task_ids='select_data', key='table_result_dict') }} }-在这个密钥中,python不知道"ti"实际上是什么。

parameters={"table_list": {{ context['ti'].xcom_pull(dag_id='select_data_from_table', task_ids='select_data', key='table_result_dict') }} }-在这个密钥中,python不知道"context"实际上是什么。

我用另一种方法解决了这个问题。

只需将此添加到您的jupyter笔记本:

list = json.loads(input_list.replace("'",'"').replace('None', 'null'))

相关内容

  • 没有找到相关文章

最新更新