无法在我的数据流管道中创建具有值重写参数的经典模板



我的场景是用云函数调用一个数据流管道,我创建了一个经典的模板并成功地调用了它。但我的管道包含GCS输入路径,该路径已在我的代码中进行了编码。

我想在运行时将其与API一起作为参数传递到请求主体中:{input:gcspath}。为了做到这一点我在DOC中搜索了一下,它告诉我需要覆盖value参数,我在代码中覆盖了value参数,但当我创建模板时,没有创建模板。可能是我的代码错了如果有人能检查我遗漏了什么,我会粘贴下面的代码。或者是否有人可以粘贴正确的小样本片段。

------------------我的代码---------------

import apache_beam as beam
# from apache_beam.io import fileio
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from sys import argv 

PROJECT_ID = 'xxxxx-rnd'
# SCHEMA = 'sr:INTEGER,abv:FLOAT,id:INTEGER,name:STRING,style:STRING,ounces:FLOAT'
SCHEMA = 'High:STRING,Low:STRING,Open:STRING,Close:STRING,Volume:STRING,AdjClose:STRING'
def discard_incomplete(data):
"""Filters out records that don't have an information."""
return len(data['abv']) > 0 and len(data['id']) > 0 and len(data['name']) > 0 and len(data['style']) > 0
def convert_types(data):
"""Converts string values to their appropriate type."""
data['Date'] = str(data['Date']) if 'Date' in data else None
data['High'] = str(data['High']) if 'High' in data else None
data['Low'] = str(data['Low']) if 'Low' in data else None
data['Open'] = str(data['Open']) if 'Open' in data else None
data['Close'] = str(data['Close']) if 'Close' in data else None
data['Volume'] = str(data['Volume']) if 'Volume' in data else None
data['AdjClose'] = str(data['AdjClose']) if 'AdjClose' in data else None
# data['Average'] = float(data['Average']) if 'Average' in data else None
return data

def del_unwanted_cols(data):
"""Delete the unwanted columns"""
del data['Date']
# del data['brewery_id']
return data

class WordcountOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# Use add_value_provider_argument for arguments to be templatable
# Use add_argument as usual for non-templatable arguments
parser.add_value_provider_argument(
'--input',
default='gs://xxx/convertcsv.csv',
help='Path of the file to read from')
parser.add_argument(
'--output',
required=True,
help='Output file to write results to.')
pipeline_options = PipelineOptions(['--output', 'gs://xxx/results/output'],
runner='DataflowRunner',
project='rightmechanics-rnd',
job_name='dataflow-intro',
temp_location='gs://xxx/valuetemp',
region='us-central1'
)
p = beam.Pipeline(options=pipeline_options)
wordcount_options = pipeline_options.view_as(WordcountOptions)
(p  | 'ReadData' >> beam.io.ReadFromText(wordcount_options.input, skip_header_lines =1)
| 'SplitData' >> beam.Map(lambda x: x.split(','))
| 'FormatToDict' >> beam.Map(lambda x: {"Date": x[0], "High": x[1], "Low": x[2], "Open": x[3], "Close": x[4], "Volume": x[5], "AdjClose": x[6]}) 
#    | 'DeleteIncompleteData' >> beam.Filter(discard_incomplete)
| 'ChangeDataType' >> beam.Map(convert_types)
| 'DeleteUnwantedData' >> beam.Map(del_unwanted_cols)
#    | 'WriteToStorageBucket' >> beam.io.fileio.WriteToFiles(
#        '{0}:beer.beer_data'.format(PROJECT_ID),
#        schema=SCHEMA,
#        write_disposition=beam.io.fileio.WRITE_APPEND
#    )
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'{0}:beer.beer_data2'.format(PROJECT_ID),
schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
method='FILE_LOADS'
)
)
result = p.run()
result.wait_until_finish()

您可以在这里找到创建经典模板的说明和示例。

上面写着

编写管道后,必须创建并暂存模板文件。

是否可能是您创建和暂存模板文件的命令不正确?

创建并暂存模板后,下一步就是执行模板。

最新更新