Python-多线程帮助-读取多个文件-ETL到SQL Server中



我正在开发一个程序,该程序从本地驱动器读取DBF文件并将数据加载到sql server表中。我对Python非常熟悉,我发现了一些关于多线程的细节,其中大部分都令人困惑。读取和插入的性能很慢,从我的CPU使用情况来看,我有足够的容量。我也在运行SSD。

此代码将扩展为从大约400个zip中的大约20个DBF文件中读取。所以我们讨论的是总共8000个DBF文件。

我做这件事很难。你能提供指针吗?

这是我的代码(有点乱,但我稍后会清理),

import os, pyodbc, datetime, shutil
from dbfread import DBF
from zipfile import ZipFile
# SQL Server Connection Test
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhosttest;DATABASE=TEST_DBFIMPORT;UID=test;PWD=test')
cursor = cnxn.cursor()
dr = 'e:\Backups\dbf\'
work = 'e:\Backups\work\'
archive = 'e:\Backups\archive\'

for r in os.listdir(dr):
curdate = datetime.datetime.now()
filepath = dr + r
process = work + r
arc = archive + r
pth = r.replace(".sss","")
zipfolder = work + pth
filedateunix = os.path.getctime(filepath)
filedateconverted=datetime.datetime.fromtimestamp(int(filedateunix)
).strftime('%Y-%m-%d %H:%M:%S')
shutil.move(filepath,process)
with ZipFile(process) as zf:
zf.extractall(zipfolder)

cursor.execute(
"insert into tblBackups(backupname, filedate, dateadded) values(?,?,?)",
pth, filedateconverted, curdate)
cnxn.commit()
for dirpath, subdirs, files in os.walk (zipfolder):
for file in files:
dateadded = datetime.datetime.now()
if file.endswith(('.dbf','.DBF')):
dbflocation = os.path.abspath(os.path.join(dirpath,file)).lower()
if dbflocation.__contains__("\bk.dbf"):
table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
for record in table.records:
rec1 = str(record['code'])
rec2 = str(record['name'])
rec3 = str(record['addr1'])
rec4 = str(record['addr2'])
rec5 = str(record['city'])
rec6 = str(record['state'])
rec7 = str(record['zip'])
rec8 = str(record['tel'])
rec9 = str(record['fax'])
cursor.execute(
"insert into tblbk(code,name,addr1,addr2,city,state,zip,tel,fax) values(?,?,?,?,?,?,?,?,?)",
rec1, rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9, rec10, rec11, rec12, rec13)
cnxn.commit()

if dbflocation.__contains__("\cr.dbf"):
table = DBF(dbflocation, lowernames=True, char_decode_errors='ignore')
for record in table.records:
rec2 = str(record['cal_desc'])
rec3 = str(record['b_date'])
rec4 = str(record['b_time'])
rec5 = str(record['e_time'])
rec6 = str(record['with_desc'])
rec7 = str(record['recuruntil'])
rec8 = record['notes']
rec9 = dateadded
cursor.execute(
"insert into tblcalendar(cal_desc,b_date,b_time,e_time,with_desc,recuruntil,notes,dateadded) values(?,?,?,?,?,?,?,?)",
rec2, rec3, rec4, rec5, rec6, rec7, rec8, rec9)
cnxn.commit() 
shutil.move(process, archive)
shutil.rmtree(zipfolder)

tl;dr:先测量,后修复


请注意,在最常见的Python实现(CPython)中,一次只能有一个线程执行Python字节码。因此,线程不是提高CPU绑定性能的好方法。如果工作是I/O绑定的,它们可以很好地工作。

但您首先应该做的是度量。这再怎么强调也不为过。如果你不知道是什么原因导致性能不足,你就无法修复它!

编写完成这项工作的单线程代码,并在探查器下运行。先试用内置的cProfile。如果这还不能给你足够的信息,那就试着用一个行分析器。

评测应该告诉您哪些步骤消耗的时间最多。一旦你知道了这一点,你就可以开始进步了。

例如,如果将数据填充到SQL server中需要花费最多的时间,那么使用multiprocessing读取DBF文件是没有意义的!这甚至可能会减慢速度,因为几个进程正在争夺SQL服务器的注意力。

如果SQL服务器不是瓶颈,它可以处理多个连接,我会使用multiprocessing,可能是Pool.map()并行读取DBF并将数据填充到SQL服务器中。在这种情况下,应该在DBF文件名列表上Pool.map,以便在辅助进程中打开这些文件。

您可以尝试executemany()方法,而不是在循环中插入一个。以下是一些ETL脚本中插入函数的示例:

def sql_insert(table_name, fields, rows, truncate_table = True):
if len(rows) == 0:
return
cursor = mdwh_connection.cursor()
cursor.fast_executemany = True
values_sql = ('?, ' * (fields.count(',') + 1))[:-2]
if truncate_table:
sql_truncate(table_name, cursor)

insert_sql = 'insert {0} ({1}) values ({2});'.format(table_name, fields, values_sql)
current_row = 0
batch_size = 50000
while current_row < len(rows):
cursor.executemany(insert_sql, rows[current_row:current_row + batch_size])
mdwh_connection.commit()
current_row += batch_size
logging.info(
'{} more records inserted. Total: {}'.format(
min(batch_size,len(rows)-current_row+batch_size),
min(current_row, len(rows))
)
)    

最新更新