我正试图编排一个GCP工作流,首先在Big query中运行一个查询,以获取一些元数据(名称和id(,然后将这些元数据传递到工作流中的另一个步骤,该步骤在给定这些参数作为输入的情况下启动数据流作业。
所以一步一步地我想要这样的东西:
- Result=Query("SELECT ID&name from biq Query table"(
- 启动数据流作业:输入(结果(
这可能吗?或者有更好的解决方案吗?
我向您提出了两个解决方案,希望它能有所帮助。
-解决方案1:如果在Cloud Composer
:中有类似Airflow
的编排器
- 在
Airflow
中使用带有BigQueryInsertJobOperator
的任务,此运算符允许执行对Bigquery
的查询 - 通过
xcom
将结果传递给第二个操作员 - 2秒算子是扩展
BeamRunPythonPipelineOperator
的算子 - 扩展
BeamRunPythonPipelineOperator
时,将覆盖execute
方法。在这种方法中,您可以通过xcom pull
将以前操作员的数据恢复为Dict
- 将此
Dict
作为管道选项传递给扩展BeamRunPythonPipelineOperator
的操作员 BeamRunPythonPipelineOperator
将启动您的Dataflow
作业
一个使用execute
方法的算子示例:
class CustomBeamOperator(BeamRunPythonPipelineOperator):
def __init__(
self,
your_field
...
**kwargs) -> None:
super().__init__(**kwargs)
self.your_field = your_field
...
def execute(self, context):
task_instance = context['task_instance']
your_conf_from_bq = task_instance.xcom_pull('task_id_previous_operator')
operator = BeamRunPythonPipelineOperator(
runner='DataflowRunner',
py_file='your_dataflow_main_file.py',
task_id='launch_dataflow_job',
pipeline_options=your_conf_from_bq,
py_system_site_packages=False,
py_interpreter='python3',
dataflow_config=DataflowConfiguration(
location='your_region'
)
)
operator.execute(context)
-解决方案2:
如果你没有像Airflow
这样的配器
- 您可以使用启动实际
Dataflow
作业的相同虚拟环境,但添加PythonBigquery
客户端作为包:https://cloud.google.com/bigquery/docs/reference/libraries - 创建一个主
Python
文件,通过Bigquery
客户端从Bigquery
表中检索conf作为Dict
- 使用Python生成命令行,以使用从数据库检索的上一个conf启动Dataflow作业,例如使用Python:
python -m folder.your_main_file
--runner=DataflowRunner
--conf1=conf1/
--conf2=conf2
....
--setup_file=./your_setup.py
- 使用
Python
suprocess
启动上一个Python
命令 - 您也可以尝试使用此api来启动
Dataflow
作业:https://pypi.org/project/google-cloud-dataflow-client/我没有试过
我认为Airflow
的解决方案更容易。