我目前正在使用Python和AWS wrangler函数编写一个Lambda函数来读取平均100MB到200MB的parquet文件。思路是读取文件并将其转换为csv:
import awswrangler as wr
from io import StringIO
print('Loading function')
s3 = boto3.client('s3')
dest_bucket = "mydestbucket"
def lambda_handler(event, context):
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
print("CONTENTO TYPE: " + response['ContentType'])
if key.endswith('.parquet'):
dfs = wr.s3.read_parquet(path=['s3://' + bucket + '/' + key], chunked=True, use_threads=True)
count=0
for df in dfs:
csv_buffer = StringIO()
df.to_csv(csv_buffer)
s3_resource = boto3.resource('s3')
#s3_resource.Object(dest_bucket, 'dfo.csv').put(Body=df)
s3_resource.Object(dest_bucket, 'dfo_' + str(count) + '.csv').put(Body=csv_buffer.getvalue())
count += 1
return "File written"
当我使用小文件时,该函数工作正常,但一旦我尝试使用大文件(100MB),它就会超时。
我已经为Lambda分配了3GB内存,并设置了10分钟的超时,但是,它似乎没有达到目的。
你知道如何提高性能除了分配更多的内存吗?
谢谢!
我通过使用fastparquet创建一个层来解决这个问题,该层以比aws wrangler更优化的方式处理内存
from io import StringIO
from datetime import datetime
import boto3
import fastparquet as fp
import s3fs
import urllib.parse
#S3 fs initialization
s3_fs = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()
s3fs_path = fs.glob(path=s3_path)
my_open = s3_fs.open
# Read parquet object using fastparquet
fp_obj = fp.ParquetFile(s3fs_path, open_with=my_open)
# Filter columns and build a pandas df
new_df = fp_obj.to_pandas()
# csv buffer to perform the parquet --> csv transformation
csv_buffer = StringIO()
new_df.to_csv(csv_buffer)
s3_resource.Object(
dest_bucket,
f"{file_path}",
).put(Body=csv_buffer.getvalue())