如何使用Python自动将文件从谷歌云存储上传到大查询



我使用的是python代码片段,并使用云功能进行部署,目标是将csv数据从存储桶自动上传到大型查询表,函数触发器是"每当存储桶中上传新文件时"。但是,代码崩溃了,如果我做错了什么,请告诉我。

import gcsfs
import os
import pandas as pd
import re
import numpy as np
from google.cloud import bigquery
from google.cloud import storage
from google.cloud.exceptions import NotFound

# Environment variables
metric = "availability"
table = "availability_daily_2" 
bucket = "tintin_bucket" 
staging_folder = "profitero/staging/daily/"+metric
processed_folder = "profitero/processed/daily/"+metric
dataset = "tintin_2"

# Returns a list with all blobs in a given bucket
def list_blobs(bucket):
storage_client = storage.Client()
blobs = storage_client.list_blobs(bucket)
blobs_list = []
for blob in blobs:
blobs_list.append(blob.name)
return(blobs_list)
# Function to process file names into organized data
def processFileNames(list_of_file_names):
# Define helper functions
def searchFunction(pattern,x): 
output = re.search(pattern,x)
if output is None:
return(None)
else:
return(output.group(0))
def getdates(x): return(searchFunction(r"(d{4}-d{2}-d{2})",x))
def getcountry(x): return(searchFunction(r"([A-Z]{2})",x))
def getmetric(x): return(searchFunction(r"(Placement|Pricing|Ratings|Availability|Content|Assortment)",x)) 
def getfiletype(x): return(searchFunction(r"(zip|csv)",x))
def isDaily(x): return(searchFunction(r"(Daily)",x))
# Create empty dataframe
d = {'filename': list_of_file_names}
df = pd.DataFrame(data=d)
# Fill dataframe
df['date'] = df.filename.apply(lambda x: getdates(x) )
df['date'] = pd.to_datetime(df['date'])
df['country'] = df.filename.apply(lambda x: getcountry(x) )
df['metric'] = df.filename.apply(lambda x: getmetric(x) )
df['filetype'] = df.filename.apply(lambda x: getfiletype(x) )
df['isDaily'] = df.filename.apply(lambda x: isDaily(x) )
df.replace('',np.nan,inplace=True)
#df.dropna(inplace=True)
return(df)
def cleanCols(x):
#x = re.sub('[^0-9a-zA-Z]+', '', x)
x = x.replace(" ", "_")
#x = x.lower() 
x = x.replace("-","_")
x = x.replace("#","no")
x = x.replace("3p","third_party")
x = x.replace("3P","third_party")
x = x.replace("&","and")
x = x.replace("'","")
return(x)
# Function to move processed blobs into processed folder
def move_blob(bucket, file):
storage_client = storage.Client()
source_bucket = storage_client.bucket(bucket)
source_blob = source_bucket.blob(file)
destination_bucket = storage_client.bucket(bucket)
destination_blob_name = "profitero/processed/daily/"+metric+"/"+file.rsplit("/",1)[1]
try:
blob_copy = source_bucket.copy_blob(source_blob, destination_bucket, destination_blob_name)
blob_delete = source_bucket.delete_blob(file)
print("Blob {} moved to blob {}.".format(source_blob.name,blob_copy.name))
except NotFound:
print("Not found error")
pass
# Main function - Lists CSVs in bucket, reads them into memory, loads them into BigQuery
def csv_loader(data,context):
#request_json = request.get_json(silent=True)
print(data['name'])
p = re.compile('profitero/staging/daily/'+metric+'/.*csv')
if p.match(data['name']):
try: 
df = pd.read_csv("gs://"+bucket+"/"+data['name'])
print("Read CSV")
df['event_id'] = context.event_id
print("Attached event id")
df['event_timestamp'] = context.timestamp
print("Attached timestamp")
df.rename(columns=lambda x: cleanCols(x),inplace=True)
df['RPC'] = df['RPC'].astype(str)
print("Cleaned column names")
df = df[['Date', 'Country', 'Retailer', 'Product_Title', 'Match_Type', 'Availability', 'URL', 'Manufacturer', 'Brand', 'Sub_Brand', 'Account_Category','RPC']]
print("Selected relevant columns")
df.to_gbq("tintin_2."+table,if_exists="append",project_id="emea-devices-services")
print("Added to table")
move_blob(bucket,data['name'])
print("Moved file")
except Exception as e: 
print(e)
else:
pass
# Notify of sucess
return("Sucess!")

代码中的csv_loader函数,更具体地说,将CSV读入内存的pd.read_csv()方法,很可能是导致Cloud函数崩溃的罪魁祸首,具体取决于CSV的大小。这可能是一项内存密集型任务,释放数据帧使用的内存可能是一个棘手的任务。

根据要处理的CSV文件的大小,为您的云功能提供足够的内存(默认为256MB,最大为2048MB(,以确保您的功能不会遇到OOM问题和崩溃。

另一种选择是,为了避免应用程序出现这种瓶颈,可以考虑将数据从云存储流式传输到BigQuery中,这将在下面的文章中进行详细解释。在此处查找相关存储库。

您可以研究多种解决方案:

文件大小<50 MB:将云存储中的文本文件(.txt(加载到大查询表中

文件大小>50 MB:无法使用云功能将340 MB文件加载到BigQuery中

注意:第一个解决方案利用了Cloud Function的计算能力,其中as是第二个解决方案,利用了BigQuery的计算能力。

相关内容

  • 没有找到相关文章

最新更新