我有一个函数,它解析文件并使用SQLAlchemy将数据插入MySQL。我一直在按顺序运行os.listdir()
的结果函数,一切都工作得很好。
因为大部分时间都花在读取文件和写入DB上,所以我想使用多处理来加快速度。下面是我的伪代码,因为实际代码太长了:
def parse_file(filename):
f = open(filename, 'rb')
data = f.read()
f.close()
soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')
# parse file here
db_record = MyDBRecord(parsed_data)
session.add(db_record)
session.commit()
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
我看到的问题是,脚本挂起,永远不会完成。我通常会把63条记录中的48条录入数据库。有时更多,有时更少。
我试过使用pool.close()
和pool.join()
的组合,似乎都没有帮助。
我如何得到这个脚本完成?我做错了什么?我在Linux系统上使用Python 2.7.8
您需要将所有使用多处理的代码放入其自己的函数中。当多进程在单独的进程中重新导入你的模块时,这会阻止它递归地启动新的池:
def parse_file(filename):
...
def main():
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
if __name__ == '__main__':
main()
请参阅有关确保模块可导入的文档,以及在Windows(tm)上运行的建议
问题是两件事的组合:
- 我的池代码被多次调用(感谢@Peter Wood)
- 我的数据库代码打开了太多的会话(和/或)共享会话
我做了以下更改,现在一切正常:原始文件
def parse_file(filename):
f = open(filename, 'rb')
data = f.read()
f.close()
soup = BeautifulSoup(data,features="lxml", from_encoding='utf-8')
# parse file here
db_record = MyDBRecord(parsed_data)
session = get_session() # see below
session.add(db_record)
session.commit()
pool = mp.Pool(processes=8)
pool.map(parse_file, ['my_dir/' + filename for filename in os.listdir("my_dir")])
DB文件
def get_session():
engine = create_engine('mysql://root:root@localhost/my_db')
Base.metadata.create_all(engine)
Base.metadata.bind = engine
db_session = sessionmaker(bind=engine)
return db_session()