如何通过OAUTH ACCESS TOKEN凭证来运行GCP数据流管道



我正在尝试从云sdk创建一个数据流管道。我已经设置了环境变量'GOOGLE_OAUTH_ACCESS_TOKEN',我不确定如何在pipeline.run()中传递此凭据。

def run(argv=None):


pipeline_options = PipelineOptions(flags=argv)
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'prjoject name'
google_cloud_options.region = 'region'
google_cloud_options.job_name = 'jobname'
google_cloud_options.staging_location = 'gs://staging-location' 
google_cloud_options.temp_location = 'gs://temp-location' 
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(SetupOptions).setup_file = "./setup.py"
user_options = pipeline_options.view_as(DataInput)

p = beam.Pipeline(options=pipeline_options)

input_meta = str(user_options.input)+" "+project+ " "+bucket_name+ " "+ model_path+ " "+destination_name
header = (p
| "CREATE BEAM" >> beam.Create([input_meta]) 
| "Inside read header" >> beam.Map(read_header_from_filename))
result = p.run()
result.wait_until_finish()

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

我得到以下错误

INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://staging-location/staging/telco-churn-prediction.1633999477.963380/pickled_main_session...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:oauth2client.client:Failed to retrieve access token: {
"error": "invalid_grant",
"error_description": "reauth related error (invalid_rapt)",
"error_subtype": "invalid_rapt"
}

当您在Google Cloud上运行工作负载时,您不需要特定的访问令牌或服务帐户密钥文件。最好使用标准库和ADC。和定制工人服务帐户。

使用您自己的服务帐户,授予所需的权限并运行您的管道。这更容易,更安全。

最新更新