如何将参数从google composer传递到数据流模板



我正试图通过以下方式将参数从googlecomposer传递到数据流模板中,但它不起作用。

# composer code
trigger_dataflow = DataflowTemplateOperator(
task_id="trigger_dataflow",
template="gs://mybucket/my_template",
dag=dag,
job_name='appsflyer_events_daily',
parameters={
"input": f'gs://my_bucket/' + "{{ ds }}" + "/*.gz"
}
)
# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my_bucket/*.gz',
help='path of input file')
def main():
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions) 
p = beam.Pipeline(options=pipeline_options)
lines = (
p
| MatchFiles(user_options.input)
)

您可以像下面这样传递。

DataflowTemplateOperator(,
task_id="task1",
template=get_variable_value("template"),
on_failure_callback=update_job_message,
parameters={
"fileBucket": get_variable_value("file_bucket"),
"basePath": get_variable_value("path_input"),         
"Day": "{{ json.loads(ti.xcom_pull(key=run_id))['day'] }}",
},
)

我们使用Java,在Dataflow作业中,我们有选项类get和set,如下所示

public interface MyOptions extends CommonOptions {

@Description("The output bucket")
@Validation.Required
ValueProvider<String> getFileBucket();
void setFileBucket(ValueProvider<String> value);

}

我们需要为这个数据流作业创建模板,该模板将由composer dag触发。

从数据流经典模板转移到flex模板解决了这个问题。

最新更新