将输出从一个工作流步骤传递到GCP中的另一个步骤



我正试图编排一个GCP工作流,首先在Big query中运行一个查询,以获取一些元数据(名称和id(,然后将这些元数据传递到工作流中的另一个步骤,该步骤在给定这些参数作为输入的情况下启动数据流作业。

所以一步一步地我想要这样的东西:

  1. Result=Query("SELECT ID&name from biq Query table"(
  2. 启动数据流作业:输入(结果(

这可能吗?或者有更好的解决方案吗?

我向您提出了两个解决方案,希望它能有所帮助。

-解决方案1:如果在Cloud Composer:中有类似Airflow的编排器

  • Airflow中使用带有BigQueryInsertJobOperator的任务,此运算符允许执行对Bigquery的查询
  • 通过xcom将结果传递给第二个操作员
  • 2秒算子是扩展BeamRunPythonPipelineOperator的算子
  • 扩展BeamRunPythonPipelineOperator时,将覆盖execute方法。在这种方法中,您可以通过xcom pull将以前操作员的数据恢复为Dict
  • 将此Dict作为管道选项传递给扩展BeamRunPythonPipelineOperator的操作员
  • BeamRunPythonPipelineOperator将启动您的Dataflow作业

一个使用execute方法的算子示例:

class CustomBeamOperator(BeamRunPythonPipelineOperator):
def __init__(
self,
your_field
...
**kwargs) -> None:
super().__init__(**kwargs)
self.your_field = your_field
...
def execute(self, context):
task_instance = context['task_instance']
your_conf_from_bq =  task_instance.xcom_pull('task_id_previous_operator')

operator = BeamRunPythonPipelineOperator(
runner='DataflowRunner',
py_file='your_dataflow_main_file.py',
task_id='launch_dataflow_job',
pipeline_options=your_conf_from_bq,
py_system_site_packages=False,
py_interpreter='python3',
dataflow_config=DataflowConfiguration(
location='your_region'
)
)

operator.execute(context)

-解决方案2:

如果你没有像Airflow这样的配器

  • 您可以使用启动实际Dataflow作业的相同虚拟环境,但添加PythonBigquery客户端作为包:https://cloud.google.com/bigquery/docs/reference/libraries
  • 创建一个主Python文件,通过Bigquery客户端从Bigquery表中检索conf作为Dict
  • 使用Python生成命令行,以使用从数据库检索的上一个conf启动Dataflow作业,例如使用Python:
python -m folder.your_main_file 
--runner=DataflowRunner 
--conf1=conf1/ 
--conf2=conf2
....
--setup_file=./your_setup.py 
  • 使用Pythonsuprocess启动上一个Python命令
  • 您也可以尝试使用此api来启动Dataflow作业:https://pypi.org/project/google-cloud-dataflow-client/我没有试过

我认为Airflow的解决方案更容易。

相关内容

最新更新