我正在Google Cloud数据流上构建一个beam pipeline。
我得到一个错误,云数据流没有权限写到模板目录。(没有storage.objects.create access)
这是我得到的错误。
我已经给了服务帐户Storage Admin和Viewer权限。
我已经从服务帐户中删除了Storage Admin角色,然后重新添加。
管道文件
from __future__ import annotations
import json
from typing import TYPE_CHECKING
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
if TYPE_CHECKING:
from apache_beam.options.pipeline_options import _BeamArgumentParser
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = <apikey.json>
class ArgumentParser(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser: _BeamArgumentParser) -> None:
parser.add_value_provider_argument(
'--input_file',
help='Path to the file to ingest data from',
default='gs://dataflow_marketing_datamart/json_to_bq_test/input_data.jsonl',
type=str
)
parser.add_value_provider_argument(
'--bq_table',
help='Output BigQuery table in the form of <PROJECT>:<DATASET>.<TABLE>',
default='marketing-datamart:dataflow_testing.custom_template_test',
type=str
)
parser.add_value_provider_argument(
'--bq_schema',
help='JSON string of the BigQuery table',
default="""
{
"fields": [
{
"description": "Name of the fruit",
"name": "fruit",
"type": "STRING",
"mode": "REQUIRED"
},
{
"description": "Quantity of the fruit",
"name": "quantity",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"description": "Color of the fruit",
"name": "color",
"type": "STRING",
"mode": "NULLABLE"
}
]
}""",
type=str
)
class FormatInputText(beam.DoFn):
"""beam.io.WriteToBigQuery expects a list of one dictionary, but the raw output from
beam.io.ReadFromText is string. This converts the string to the required format"""
def process(self, line):
return [json.loads(line)]
def main(argv=None, save_main_session=True):
"""Main entry point"""
pipeline_args = []
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=$PROJECTt',
'--region=asia-southeast1',
'--staging_location=$BUCKET/staging',
'--temp_location=$BUCKET/temp',
'--job_name=custom-job-test',
'--template_location=$BUCKET/template/trial3_template'
])
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
template_options = pipeline_options.view_as(ArgumentParser)
template_options.view_as(SetupOptions).save_main_session = save_main_session
with beam.Pipeline(options=pipeline_options) as p:
input_lines = (p
| "Read input schema" >> beam.io.ReadFromText(template_options.input_file)
| "Format lines" >> beam.ParDo(FormatInputText()))
bq_write = input_lines | "Write to BigQuery" >> beam.io.WriteToBigQuery(
table=lambda x: f"{template_options.bq_table.get()}",
schema=lambda x: json.loads(template_options.bq_schema.get()),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
if __name__ == '__main__':
main()
引用
我最近使用了Dataflow,并且在服务帐户的使用方面遇到了一些类似的困惑。对我来说,问题是我没有意识到数据流作业是在一个单独的"工作者服务帐户"上启动的,而不是创建作业的服务帐户。
您将注意到服务帐户名称是-compute@developer.gserviceaccount.com
,这是默认的worker服务帐户,而可能不是创建数据流作业的服务帐户。
我建议阅读数据流权限文档的这一部分。
您可以选择创建一个用户管理的服务帐户用于操作(这是在创建作业时可配置的),或者您必须确保您将IAM权限分配给默认的worker服务帐户。