SQLAlchemy切换到python多处理



我目前正在开发一个网络爬虫。它运行良好,但我想最大限度地提高资源,我正试图切换到多处理。但当我尝试的那一刻,我遇到了一堵回溯墙,我无法缝合以发现我做错了什么,因为我仍然是SQLAlchemy和Python多处理的新手。

以下是父循环的样子:

...
def crawler(url=False):
...
while url:
crawl(url.id)
url = get_new_url()

我正试图把它变成一个并行处理功能,在这里我不必等待上一次爬网/刮擦完成:

from multiprocessing import Process
...
def crawler(url=False):
while url:
p = Process(target=crawl, args=(url.id,))
p.start()
url = get_new_url()

以下是我如何连接数据库:

engine = create_engine('mysql://user:password@domain:3306/mdb01?charset=utf8mb4', pool_recycle=3600)
Session = sessionmaker(bind=engine, autoflush=True)
Base = declarative_base()

以下是进行爬网的数据库交互和数据库工厂导入的模块(我删除了大部分,因为我觉得问题是我如何与sqlalchemy交互,而不是其他代码(:

from news_models.base import Base, Session, engine
database = Session()
def crawl(urlid):
url = database.query(Url).filter_by(id=urlid).first()
print(f"Starting to work on {url.id}: {url.url}")
... scrape page ....
scrape = scrape_url(url)
... running beautifull soup ...
# Retrieve all of the anchor tags
tags = soup('a')
for tag in tags:
... validation ...
make_url(url)
def make_url(url):
...
#domain = ex. abc.com
domain = database.query(Domain).filter_by(domain=domain).first()
database.add(Url(url, domain, vetted))
database.commit()
def scrape_url(url):
scrape = Scrape(page = html, url = url)
database.add(scrape)
database.commit()
return scrape

这是对话框:

Starting to work on 179226: https://bbc.co.uk/sport/football/53891604
Starting to work on 110232: https://theweathernetwork.com/ca/weather/saskatchewan/carragana
Starting to work on 152054: https://ca.images.search.yahoo.com/search/images?p=barack+obama&fr=fp-tts&th=110.1&tw=162.6&imgurl=https%3a%2f%2fimage.cnbcfm.com%2fapi%2fv1%2fimage%2f105055178-gettyimages-680143744rr.jpg%3fv%3d1576513702%26w%3d1400%26h%3d950&rurl=https%3a%2f%2fwww.cnbc.com%2f2019%2f12%2f16%2fbarack-obama-how-women-are-better-leaders-than-men.html&size=123kb&name=barack+obama%3a+how+women+are+better+leaders+than+men&oid=1&h=950&w=1400&turl=https%3a%2f%2ftse1.mm.bing.net%2fth%3fid%3doip.btjoweh9kdcuxxcdksvoiwhafb%26amp%3bpid%3dapi%26rs%3d1%26c%3d1%26qlt%3d95%26w%3d162%26h%3d110&tt=barack+obama%3a+how+women+are+better+leaders+than+men&sigr=4nejz_6_wyyo&sigit=.iypm9cqprc9&sigi=9sv3ee5szhdl&sign=eqzxpc3ps9fm&sigt=eqzxpc3ps9fm
Exception during reset or similar
Traceback (most recent call last):
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 321, in scrape_url
database.add(scrape)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2008, in add
self._save_or_update_state(state)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2021, in _save_or_update_state
self._save_or_update_impl(state)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2371, in _save_or_update_impl
self._save_impl(state)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2324, in _save_impl
to_attach = self._before_attach(state, obj)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2441, in _before_attach
raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Object '<Scrape at 0x7f4f7e1975b0>' is already attached to session '3' (this is '2')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 697, in _finalize_fairy
fairy._reset(pool)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 893, in _reset
pool._dialect.do_rollback(self)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
dbapi_connection.rollback()
MySQLdb._exceptions.ProgrammingError: (2014, "Commands out of sync; you can't run this command now")
Traceback (most recent call last):
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
Process Process-3:
Process Process-1:
self.dialect.do_execute(
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/connections.py", line 259, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (2013, 'Lost connection to MySQL server during query')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "./crawler.py", line 138, in <module>
main()
File "./crawler.py", line 49, in main
crawler(url=url)
File "./crawler.py", line 135, in crawler
url = get_new_url()
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 482, in get_new_url
url = database.query(Url).filter_by(scrape=None, error=False).order_by(sqlalchemy.func.rand()).first()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3402, in first
ret = list(self[0:1])
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3176, in __getitem__
Traceback (most recent call last):
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 321, in scrape_url
database.add(scrape)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2008, in add
self._save_or_update_state(state)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2021, in _save_or_update_state
self._save_or_update_impl(state)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2371, in _save_or_update_impl
self._save_impl(state)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2324, in _save_impl
to_attach = self._before_attach(state, obj)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2441, in _before_attach
raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Object '<Scrape at 0x7f4f7e1e3790>' is already attached to session '3' (this is '2')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 749, in _rollback_impl
self.engine.dialect.do_rollback(self.connection)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
dbapi_connection.rollback()
MySQLdb._exceptions.OperationalError: (2013, 'Lost connection to MySQL server during query')
return list(res)
The above exception was the direct cause of the following exception:
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3508, in __iter__
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 62, in crawl
soup = scrape_and_soup(url)
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 331, in scrape_and_soup
scrape = scrape_url(url)
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 325, in scrape_url
database.rollback()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1006, in rollback
self.transaction.rollback()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 574, in rollback
util.raise_(rollback_err[1], with_traceback=rollback_err[2])
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 534, in rollback
t[1].rollback()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1753, in rollback
self._do_rollback()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1791, in _do_rollback
self.connection._rollback_impl()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 751, in _rollback_impl
self._handle_dbapi_exception(e, None, None, None, None)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 749, in _rollback_impl
self.engine.dialect.do_rollback(self.connection)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
dbapi_connection.rollback()
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (2013, 'Lost connection to MySQL server during query')
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Traceback (most recent call last):
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 321, in scrape_url
database.add(scrape)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2008, in add
self._save_or_update_state(state)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2021, in _save_or_update_state
self._save_or_update_impl(state)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2371, in _save_or_update_impl
self._save_impl(state)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2324, in _save_impl
to_attach = self._before_attach(state, obj)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 2441, in _before_attach
raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Object '<Scrape at 0x7f4f7e1e3a60>' is already attached to session '3' (this is '2')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 749, in _rollback_impl
self.engine.dialect.do_rollback(self.connection)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
dbapi_connection.rollback()
MySQLdb._exceptions.OperationalError: (2013, 'Lost connection to MySQL server during query')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 62, in crawl
soup = scrape_and_soup(url)
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 331, in scrape_and_soup
scrape = scrape_url(url)
File "/home/fabrice/workbench/news/news_crawler/crawl_tools.py", line 325, in scrape_url
database.rollback()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1006, in rollback
self.transaction.rollback()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 574, in rollback
util.raise_(rollback_err[1], with_traceback=rollback_err[2])
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 534, in rollback
t[1].rollback()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1753, in rollback
self._do_rollback()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1791, in _do_rollback
self.connection._rollback_impl()
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 751, in _rollback_impl
self._handle_dbapi_exception(e, None, None, None, None)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 749, in _rollback_impl
self.engine.dialect.do_rollback(self.connection)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/dialects/mysql/base.py", line 2475, in do_rollback
dbapi_connection.rollback()
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (2013, 'Lost connection to MySQL server during query')
(Background on this error at: http://sqlalche.me/e/13/e3q8)
return self._execute_and_instances(context)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3533, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self._handle_dbapi_exception(
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/home/fabrice/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/home/fabrice/.local/lib/python3.8/site-packages/MySQLdb/connections.py", line 259, in query
_mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (2013, 'Lost connection to MySQL server during query')
[SQL: SELECT urls.id AS urls_id, urls.url AS urls_url, urls.error AS urls_error, urls.vetted AS urls_vetted, urls.useful AS urls_useful, urls.date_discovered AS urls_date_discovered, urls.last_parse AS urls_last_parse, urls.domain_id AS urls_domain_id, urls.publisher_id AS urls_publisher_id 
FROM urls 
WHERE NOT (EXISTS (SELECT 1 
FROM scrapes 
WHERE urls.id = scrapes.url_id)) AND urls.error = false ORDER BY rand() 
LIMIT %s]
[parameters: (1,)]
(Background on this error at: http://sqlalche.me/e/13/e3q8)

我尝试过使用create_engine,添加pool_size=20、max_overflow=0或autoflush=True/False,但都没有成功。

有人能指出我做错了什么吗?

解决方案是在crawl函数开始时,在每个进程中创建一个新的数据库会话(然后将其作为单独的参数传递给make_urlscrape_url,或者使它们成为一个对象的所有方法(。您应该使用with closing(...)语句来确保会话在crawl完成时关闭。

代码中还有另一个问题:while url循环还需要等待所有的scraper完成,以防其中一个scraper发现其他也需要scrape的URL。

作为改进的建议,您可以使用multiprocessing.Pool,而不是直接使用Process;这将使您能够控制并行运行的scraper的数量,这可能是您最终想要做到的(以避免CPU、RAM、网络和/或数据库过载(。此时,您仍然可以为每个crawl调用使用一个单独的数据库会话,或者为每个池工作进程使用一个数据库会话。

相关内容

  • 没有找到相关文章

最新更新