我正试图传递一个BigQuery表名作为apachebeam管道模板的值提供程序。根据他们的文档和StackOverflow的回答,可以将值提供程序传递给apache_beam.io.gcp.bigquery.ReadFromBigQuery
。
这是我的管道的代码
class UserOptions(PipelineOptions):
"""Define runtime argument"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input', type=str)
parser.add_value_provider_argument('--output', type=str)
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)
(p | 'Read from BQ Table' >> beam.io.gcp.bigquery.ReadFromBigQuery(
user_options.input
)
当我在本地运行代码时,命令行传递user_options.input
的值为--input projectid.dataset_id.table
然而,我有一个错误:
ValueError: A BigQuery table or a query must be specified
我试过了:
通过
projectid:dataset_id.table
使用
bigquery.TableReference
->不可能使用
f'
{user_options.input}'
传递查询->在本地运行时工作,但在GCP上调用模板时不工作。错误声明:
缺少数据集,而请求中未设置默认数据集&"错误":[{"消息":"表名"RuntimeValueProvider(选项:input,类型:str,default_value:None("缺少数据集,而请求中未设置默认数据集。","域":"全局","原因":"无效"}],";状态":"INVALID_ARGUMENT"}}>
我缺少什么?
table
参数必须按名称传递给ReadFromBigQuery
。
BigQuerySource
(已弃用(接受table
作为第一个参数,因此您可以按位置(docs(传入一个参数。但是ReadFromBigQuery
期望gcs_location
作为第一个参数(文档(。因此,如果您正在将代码从使用BigQuerySource
移植到使用ReadFromBigQuery
,并且没有按名称显式传递表,那么它将失败,并显示您收到的错误。
以下是两个工作示例和一个不起作用的示例:
import apache_beam as beam
project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'
if __name__ == "__main__":
args = [
'--temp_location=gs://my_temp_bucket',
]
# This works:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(table=f"{project_id}:{dataset_id}.{table_id}")
)
# So does this:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(table=f"{dataset_id}.{table_id}", project=project_id)
)
# But this doesn't work becuase the table argument is not passed in by name.
# The f"{project_id}:{dataset_id}.{table_id}" string is interpreted as the gcs_location.
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(f"{project_id}:{dataset_id}.{table_id}")
)