GCP数据流,argparse.使用DataflowRunner而不是direcrunner的ArgumentError



带有运行时参数的数据流管道在使用DirectRunner时运行良好,但在切换到DataflowRunner时遇到参数错误。

File "/home/user/miniconda3/lib/python3.8/site-packages/apache_beam/options/pipeline_options.py", line 124, in add_value_provider_argument
self.add_argument(*args, **kwargs)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1386, in add_argument
return self._add_action(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1749, in _add_action
self._optionals._add_action(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1590, in _add_action
action = super(_ArgumentGroup, self)._add_action(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1400, in _add_action
self._check_conflict(action)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1539, in _check_conflict
conflict_handler(action, confl_optionals)
File "/home/user/miniconda3/lib/python3.8/argparse.py", line 1548, in _handle_conflict_error
raise ArgumentError(action, message % conflict_string)
argparse.ArgumentError: argument --bucket_input: conflicting option string: --bucket_input
下面是参数如何定义和调用
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--bucket_input',
default="device-file-dev",
help='Raw device file bucket')
pipeline = beam.Pipeline(options=pipeline_options)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
_ = (
pipeline
| 'Initiate dataflow' >> beam.Create(["Start"])
| 'Create P collection with file paths' >> beam.ParDo(
CreateGcsPCol(input_bucket=custom_options.bucket_input)
)

注意这只发生在DataflowRunner上。有人知道怎么解吗?非常感谢。

复制此处评论的答案:

该错误是由通过相对路径导入本地Python子模块引起的。对于DirectRunner,相对路径可以工作,因为它在本地机器上。然而,DataflowRunner在另一台机器上(GCE实例),需要绝对路径。因此,通过同时安装Dataflow管道模块和子模块,并从已安装的子模块导入,而不是使用相对路径,问题就解决了。

最新更新