如何从AWS S3桶中抓取更大的csv文件?



我试图浏览所有的csv文件,我有一个AWS S3桶来抓取这些csv文件中的所有数据,并将它们放入数据框架中。我提供的代码首先捕获所有csv文件名,然后捕获每个单独的文件,并通过csv.reader运行它以捕获数据,然后将它们放入列表中,然后从中创建一个数据框。我的问题是代码跳过了大于100 KB的文件,其中一些文件大于300 KB。我试图抓住每一个文件,有KB的数据,然后放入一个数据框。

这是我的代码:

# Set the S3 bucket and directory path where CSV files are stored
aws_access_key_id ='XXXXXXXXXX'
aws_secret_access_key='XXXXXXXXXXXXXX'
s3_bucket_name = 'arcodp'
folder_name = 'lab_data/'

# Get a list of all CSV files in the S3 bucket directory
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=s3_bucket_name, Prefix=folder_name)

csv_files = [obj['Key'] for page in pages for obj in page['Contents'] if obj['Key'].endswith('.csv')]

# Create an empty list to store the dataframes
df_list = []
ARCID_lst =  []
# Read each CSV file into a dataframe and append it to the df_list
for file in csv_files:
try: 
response = s3.get_object(Bucket=s3_bucket_name, Key=file)
data = response['Body'].read().decode('utf-8')

# Read the CSV file line by line and append each line to a list
rows_list = []
csv_reader = csv.reader(data.splitlines(), delimiter='|', quoting=csv.QUOTE_NONE)
for row in csv_reader:
rows_list.append(row)
df_list.extend(rows_list)
except:
ARCID_no_hit = file.split('/')[1].split('_')[0]
ARCID_lst.append(ARCID_no_hit)
# Convert the list of rows into a pandas dataframe
df_par = pd.DataFrame(df_list)
# Print the first 5 rows of the combined dataframe
df_par[0:10]

是否有一个关键字参数csv.reader为了读取更大的文件?我在网上没有找到任何能满足这一论点的东西。我也尝试使用dask,但使用这个代码,我只得到No such file or directory: '/user/user/documents/extract_data/"1000231"|"None"|"20221130".文件不在我的本地计算机上,所以不知道为什么会发生这种情况。下面是' ' dask ' '的代码:

# Set the S3 bucket and directory path where CSV files are stored
aws_access_key_id ='XXXXXXXXXXXXX'
aws_secret_access_key='XXXXXXXXXX'
s3_bucket_name = 'arcodp'
folder_name = 'lab_data/'

# Get a list of all CSV files in the S3 bucket directory
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=s3_bucket_name, Prefix=folder_name)

csv_files = [obj['Key'] for page in pages for obj in page['Contents'] if obj['Key'].endswith('.csv')]
# Create an empty list to store the dataframes
df_list = []
ARCID_lst =  []
for file in csv_files:
try:
response = s3.get_object(Bucket=s3_bucket_name, Key=file)
data = response['Body'].read().decode('utf-8')

# Create a delayed Dask dataframe for each CSV file
df = delayed(dd.read_csv)(data, sep='|', header=None, blocksize=None, quoting=csv.QUOTE_NONE, engine='c')
df_list.append(df)
except:
ARCID_no_hit = file.split('/')[1].split('_')[0]
ARCID_lst.append(ARCID_no_hit)
# Combine all delayed Dask dataframes into a single Dask dataframe
df_combined = dd.from_delayed(df_list)
# Compute the final pandas dataframe
df_par = df_combined.compute()
# Print the first 5 rows of the combined dataframe
df_par.head()

下面是使用dask

完成此操作的代码
import dask.dataframe as dd
df = dd.read_csv("s3://{s3_bucket_name}/{folder_name}/*.csv", 
storage_options=dict(key='XXXXXXXXXXXXX', secret='XXXXXXXXXX',
sep='|', ...)

就是这样。

相关内容

  • 没有找到相关文章

最新更新