参数从rest api-cloud函数传递到数据流作业时出错



任何人都可以共享数据流python代码来接受参数吗?我正面临着同样的问题,争论通过了rest API。我的df代码如下:-

def run(argv=None):
parser = argparse.ArgumentParser()
# Specifically we have the input file in CSV format to read and the output BQ table to write.
# This is the final stage of the pipeline, where we define the destination
parser.add_argument(
'--input',
dest='input',
required=False,
help='Input file to read. This can be a local file or '
'a file in a Google Storage Bucket.',
# This example file contains a total of only 10 lines.
# Useful for developing on a small set of data.
default='gs://intient_output/measurementunit.csv')
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to be written. This can be a local file or '
'a file in a Google Storage Bucket.',
default='mygcpdataengineerlab:intientpoc.measurementunit'
)
# Parse arguments from the command line.
known_args, pipeline_args = parser.parse_known_args(argv)
data_ingestion = DataIngestion()
project = ''
p = beam.Pipeline(options=PipelineOptions(pipeline_args))

下方的异常堆栈跟踪

Error-    response = request.execute()
File "/env/local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
return wrapped(*args, **kwargs)
File "/env/local/lib/python3.7/site-packages/googleapiclient/http.py", line 907, in execute
raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://dataflow.googleapis.com/v1b3/projects/mygcpdataengineerlab/templates:launch?gcsPath=gs%3A%2F%2Fgcp_dataflow_csv_bq_code%2Ftemplates&alt=json returned "(9744cfd1809f74a): The workflow could not be created. Causes: (9744cfd1809fa2d): Found unexpected parameters: ['input' (perhaps you meant 'update'), 'output' (perhaps you meant 'job_port')]">

我通过接受arg作为管道选项来解决它。class MyPipeOpt(PipelineOptions(:@类方法def_add_argparse_args(cls,解析器(:parser.add_value_provider_targument('-input',help='输入要读取的文件。这可以是本地文件,也可以是谷歌存储桶中的文件。"(

pipeline_options=PipelineOptions(argv(my_options=pipeline_options.view_as(MyPipeOpt(

p=梁。管道(options=Pipeline_options(并在创建的梁管道中使用变量作为my_options.input。

谢谢,

问题出在dest=input上,因为最新的波束选项不支持此关键字参数。为了简单起见,你可以这样做:

def run(argv=None, save_main_session=True):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_topic', required=True,
help=('Output PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'))
parser.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'))
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

相关内容

  • 没有找到相关文章

最新更新