sqlite3中的多进程插入比串行python 3花费更多的时间



我试图实现的是提取给定特定目录的文件元数据,并使用python的sqlite3将其存储在.db文件中。我使用python的多处理来利用每个文件的一些哈希值的并行计算优势(一次为每个文件使用一个核心(,尽管串行代码的计算时间总是比并行代码少。我在许多拥有2-8gb内存和2-4核的机器上测试了这一点,从许多文件中收集数据(其中一些文件的大小大于1gb(,结果总是一样的。下面我向您介绍串行和并行python脚本,任何想法都会非常有用。

多处理脚本:

import itertools
import multiprocessing
from multiprocessing import Pool
import os, sys
import stat
import sqlite3
import time
import hashlib

def worker(filename):
conn = sqlite3.connect('metadata.db', timeout=30.0)
c = conn.cursor()   #database cursor
result = os.stat(filename)  #stat instance to get info about the current file
print("Gathering metadata for file: " + filename)
split_filename = filename.split('/')
path_to_file = '/'.join(split_filename[:-1])
file_name = split_filename[len(split_filename) - 1]
#just things to get info about file
if '.' in file_name:
file_type = file_name.split('.', 1)
name = file_type[0]
file_type = file_type[1]
else:
file_type = 'null'
name = file_name
hash_md5 = hashlib.md5()
with open(path_to_file + '/' + file_name, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
md5 = hash_md5.hexdigest()
hash_sha256 = hashlib.sha256()
with open(path_to_file + '/' + file_name, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_sha256.update(chunk)
sha256 = hash_sha256.hexdigest()

print('Current process: ' + str(multiprocessing.current_process()))
#insert into database
try:
c.execute("INSERT INTO metadata VALUES (null,?,?,?,?,?,?,?,?,?,?,?)", (name, file_name, result.st_mode, result.st_size,
result.st_atime, result.st_mtime, result.st_ctime, path_to_file, file_type, md5, sha256))
except sqlite3.Error as e:
print('!!Error Aqlite3: ' + e)
conn.commit()

def main():
conn = sqlite3.connect('metadata.db', timeout=30.0)
c = conn.cursor()
#check if table exists otherwise create one
tb_exists = "SELECT name FROM sqlite_master WHERE type='table' AND name='metadata'"
if not conn.execute(tb_exists).fetchone():
c.execute('''CREATE TABLE metadata
(unique_id INTEGER PRIMARY KEY AUTOINCREMENT, name text, full_name text, mode text, size real,
atime real, mtime real, ctime real, location text, type text, md5 text, sha256 text)''')
conn.close()    
print('Number of CPUs: ' + str(multiprocessing.cpu_count()))
pool = Pool(multiprocessing.cpu_count()) # pool of cpu_count processes
walk = os.walk("/directoy/you/want/to/make/extraction")
fn_gen = itertools.chain.from_iterable((os.path.join(root, file)
for file in files)
for root, dirs, files in walk)
t1 = time.time()
results_of_work = pool.map(worker, fn_gen) # this does the parallel processing
pool.close()
pool.join()
print('Entire Computation took: ' + str(time.time() - t1) + ' seconds')
if __name__ == '__main__':
main()

串行脚本:

import itertools
import multiprocessing
from multiprocessing import Pool
import os, sys
import stat
import sqlite3
import time
import hashlib

def worker(file_list, conn):
c = conn.cursor()   #database cursor
for file_name in file_list:
result = os.stat(file_name) #stat instance to get info about the current file
print("Gathering metadata for file: " + file_name)
split_filename = file_name.split('/')
path_to_file = '/'.join(split_filename[:-1])
file_name = split_filename[len(split_filename) - 1]
#just things to get info about file
if '.' in file_name:
file_type = file_name.split('.', 1)
name = file_type[0]
file_type = file_type[1]
else:
file_type = 'null'
name = file_name
hash_md5 = hashlib.md5()
with open(path_to_file + '/' + file_name, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
md5 = hash_md5.hexdigest()
hash_sha256 = hashlib.sha256()
with open(path_to_file + '/' + file_name, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hash_sha256.update(chunk)
sha256 = hash_sha256.hexdigest()
#insert into database
try:
c.execute("INSERT INTO metadata VALUES (null,?,?,?,?,?,?,?,?,?,?,?)", (name, file_name, result.st_mode, result.st_size,
result.st_atime, result.st_mtime, result.st_ctime, path_to_file, file_type, md5, sha256))
except sqlite3.Error as e:
print('!!Error Aqlite3: ' + e)
conn.commit()

def main():
conn = sqlite3.connect('metadata_serial.db', timeout=30.0)
c = conn.cursor()
#check if table exists otherwise create one
tb_exists = "SELECT name FROM sqlite_master WHERE type='table' AND name='metadata'"
if not conn.execute(tb_exists).fetchone():
c.execute('''CREATE TABLE metadata
(unique_id INTEGER PRIMARY KEY AUTOINCREMENT, name text, full_name text, mode text, size real,
atime real, mtime real, ctime real, location text, type text, md5 text, sha256 text)''')

walk = os.walk("/directoy/you/want/to/make/extraction")
fn_gen = itertools.chain.from_iterable((os.path.join(root, file)
for file in files)
for root, dirs, files in walk)
file_list = list(fn_gen)
t1 = time.time()
worker(file_list, conn)
print('Entire Computation took: ' + str(time.time() - t1) + ' seconds')
conn.close()
if __name__ == '__main__':
main()

看起来根本原因是sqlite3不能很好地与多处理一起工作:

SQLite本身不欢迎高并发事务,因为它将在写入期间锁定文件。为了绕过这一限制,应该建立一个排队系统。。。

来源:如何使用SQLite 在Python中进行多处理/多线程

最新更新