ValueError:必须使用beam.io.gcp.BigQuery.ReadFromBigQuery指定BigQue



我正试图传递一个BigQuery表名作为apachebeam管道模板的值提供程序。根据他们的文档和StackOverflow的回答,可以将值提供程序传递给apache_beam.io.gcp.bigquery.ReadFromBigQuery

这是我的管道的代码

class UserOptions(PipelineOptions):
"""Define runtime argument"""
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--input', type=str)
parser.add_value_provider_argument('--output', type=str)
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
user_options = pipeline_options.view_as(UserOptions)
(p | 'Read from BQ Table' >> beam.io.gcp.bigquery.ReadFromBigQuery(
user_options.input
)

当我在本地运行代码时,命令行传递user_options.input的值为--input projectid.dataset_id.table

然而,我有一个错误:

ValueError: A BigQuery table or a query must be specified

我试过了:

  • 通过projectid:dataset_id.table

  • 使用bigquery.TableReference->不可能

  • 使用f'{user_options.input}'

  • 传递查询->在本地运行时工作,但在GCP上调用模板时不工作。错误声明:

    缺少数据集,而请求中未设置默认数据集&"错误":[{"消息":"表名"RuntimeValueProvider(选项:input,类型:str,default_value:None("缺少数据集,而请求中未设置默认数据集。","域":"全局","原因":"无效"}],";状态":"INVALID_ARGUMENT"}}>

我缺少什么?

table参数必须按名称传递给ReadFromBigQuery

BigQuerySource(已弃用(接受table作为第一个参数,因此您可以按位置(docs(传入一个参数。但是ReadFromBigQuery期望gcs_location作为第一个参数(文档(。因此,如果您正在将代码从使用BigQuerySource移植到使用ReadFromBigQuery,并且没有按名称显式传递表,那么它将失败,并显示您收到的错误。

以下是两个工作示例和一个不起作用的示例:

import apache_beam as beam
project_id = 'my_project'
dataset_id = 'my_dataset'
table_id = 'my_table'
if __name__ == "__main__":
args = [
'--temp_location=gs://my_temp_bucket',
]
# This works:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery' 
>> beam.io.ReadFromBigQuery(table=f"{project_id}:{dataset_id}.{table_id}")
)
# So does this:
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery' 
>> beam.io.ReadFromBigQuery(table=f"{dataset_id}.{table_id}", project=project_id)
)
# But this doesn't work becuase the table argument is not passed in by name.
# The f"{project_id}:{dataset_id}.{table_id}" string is interpreted as the gcs_location.
with beam.Pipeline(argv=args) as pipeline:
query_results = (
pipeline
| 'Read from BigQuery'
>> beam.io.ReadFromBigQuery(f"{project_id}:{dataset_id}.{table_id}")
)

相关内容

  • 没有找到相关文章

最新更新