尝试从数据流管道将CSV文件写入谷歌云存储时出错



我正在构建一个数据流管道,该管道从我的云存储存储桶中读取一个CSV文件(包含250000行(,修改每行的值,然后将修改后的内容写入同一存储桶中的新CSV。使用下面的代码,我可以读取和修改原始文件的内容,但当我试图在GCS中写入新文件的内容时,我会出现以下错误:

google.api_core.exceptions.TooManyRequests: 429 POST https://storage.googleapis.com/upload/storage/v1/b/my-bucket/o?uploadType=multipart: {
"error": {
"code": 429,
"message": "The rate of change requests to the object my-bucket/product-codes/URL_test_codes.csv exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
"errors": [
{
"message": "The rate of change requests to the object my-bucket/product-codes/URL_test_codes.csv exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
"domain": "usageLimits",
"reason": "rateLimitExceeded"
}
]
}
}
: ('Request failed with status code', 429, 'Expected one of', <HTTPStatus.OK: 200>) [while running 'Store Output File']

我在数据流中的代码:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import traceback
import sys
import pandas as pd
from cryptography.fernet import Fernet
import google.auth
from google.cloud import storage
fernet_secret = 'aD4t9MlsHLdHyuFKhoyhy9_eLKDfe8eyVSD3tu8KzoP='
bucket = 'my-bucket'
inputFile = f'gs://{bucket}/product-codes/test_codes.csv'
outputFile = 'product-codes/URL_test_codes.csv'
#Pipeline Logic
def product_codes_pipeline(project, env, region='us-central1'):
options = PipelineOptions(
streaming=False,
project=project,
region=region,
staging_location="gs://my-bucket-dataflows/Templates/staging",
temp_location="gs://my-bucket-dataflows/Templates/temp",
template_location="gs://my-bucket-dataflows/Templates/Generate_Product_Codes.py",
subnetwork='https://www.googleapis.com/compute/v1/projects/{}/regions/us-central1/subnetworks/{}-private'.format(project, env)
)

# Transform function
def genURLs(code):
f = Fernet(fernet_secret)
encoded = code.encode()
encrypted = f.encrypt(encoded)
decrypted = f.decrypt(encrypted.decode().encode())
decoded = decrypted.decode()
if code != decoded:
print(f'Error: Code {code} and decoded code {decoded} do not match')
sys.exit(1)
url = 'https://some-url.com/redeem/product-code=' + encrypted.decode()
return url

class WriteCSVFIle(beam.DoFn):
def __init__(self, bucket_name):
self.bucket_name = bucket_name
def start_bundle(self):
self.client = storage.Client()
def process(self, urls):
df = pd.DataFrame([urls], columns=['URL'])
bucket = self.client.get_bucket(self.bucket_name)
bucket.blob(f'{outputFile}').upload_from_string(df.to_csv(index=False), 'text/csv')


# End function
p = beam.Pipeline(options=options)
(p | 'Read Input CSV' >> beam.io.ReadFromText(inputFile, skip_header_lines=1)
| 'Map Codes' >> beam.Map(genURLs)
| 'Store Output File' >> beam.ParDo(WriteCSVFIle(bucket)))
p.run()

代码在我的存储桶中生成URL_test_codes.csv,但文件只包含一行(不包括"URL"头(,这告诉我的代码在处理每一行时正在写入/覆盖文件。有没有一种方法可以批量写入整个文件的内容,而不是发出一系列更新文件的请求?我是Python/Dataflow的新手,所以非常感谢您的帮助。

让我们指出问题:显而易见的问题是GCS方面的配额问题,反映在"429"错误代码中。但正如您所指出的,这源于固有问题,该问题更多地与如何将数据写入blob有关。

由于Beam Pipeline生成元素的Parallel Collection,当您将元素添加到PCollection时,每个管道步骤都将针对每个元素执行,换句话说,您的ParDo函数将尝试为PCollection中的每个元素向输出文件写入一次内容。

因此,WriteCSVFIle函数存在一些问题。例如,为了将您的PCollection写入GCS,最好使用一个单独的管道任务,专注于写入整个PCollection,例如:

首先,您可以导入Apache Beam:中已经包含的此函数

from apache_beam.io import WriteToText

然后,您在管道的末端使用它:

| 'Write PCollection to Bucket' >> WriteToText('gs://{0}/{1}'.format(bucket_name, outputFile))

使用此选项,您不需要创建存储客户端或引用blob,函数只需要接收GCS URI,它将在其中写入最终结果,您可以根据文档中的参数进行调整。

这样,您只需要处理在WriteCSVFIle函数中创建的Dataframe。每个管道步骤都会创建一个新的PCollection,因此,如果Dataframe创建者函数应该从url的PColletion接收一个元素,那么根据您当前的逻辑,由Dataframe函数产生的新PCollection元素每个url将有一个数据帧,但由于考虑到"url"是数据帧中唯一的列,您似乎只想从genURL中写入结果,也许直接从genURL转到WriteToText可以输出您想要的内容。

无论哪种方式,您都可以相应地调整管道,但至少通过WriteToText转换,它可以将整个最终PCollection写入云存储桶。

最新更新