主循环不等待python多处理池完成和冻结



这是我10年后的第一个python项目,也是我第一次使用python多处理,所以可能只是有一些我没见过的非常基本的错误。

我被 python 和多处理网络爬虫困住了。我的爬虫会检查主页是否有更改,然后并行遍历子类别,将项目添加到列表中。然后并行检查这些项目并通过硒提取(因为我无法弄清楚如何做到这一点,因为单击项目时内容会动态加载到页面中(。

主循环:

import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import time
from bs4 import BeautifulSoup
import pickledb
import random
import multiprocessing
import itertools
import config

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

def getAllSubCategories(pageNumber, items):
# check website and look for subcategories that are "worth" extracting
url = 'https://www.google.com' + str(pageNumber)
response = requests.get(url, verify=False, headers=config.headers, cookies=config.cookies)
pageSoup = BeautifulSoup(response.content, features='html.parser')
elements = soup.find(...)
if not elements: # website not loading properly
return getAllSubCategories(items)
for element in elements:
items.append(element)

def checkAndExtract(item, ignoredItems, itemsToIgnore):
# check if items are already extracted; if not, extract them if they contain a keyword
import checker
import extractor
if item not in ignoredItems:
if checker.check(item):
extractor.extract(item, itemsToIgnore)
else: itemsToIgnore.append(item)

if __name__ == '__main__':
multiprocessing.freeze_support()
itemsToIgnore = multiprocessing.Manager().list()
crawlUrl = 'https://www.google.com/'
db = pickledb.load('myDB.db', False)
while True:
try:
# check main website for changes
response = requests.get(crawlUrl, verify=False, headers=config.headers, cookies=config.cookies)
soup = BeautifulSoup(response.content, features='html.parser')
mainCondition = soup.find(...)
if mainCondition:
numberOfPages = soup.find(...)
ignoredItems = db.get('ignoredItems')
if not ignoredItems:
db.lcreate('ignoredItems')
ignoredItems = db.get('ignoredItems')
items = multiprocessing.Manager().list()
# get all items from subcategories
with multiprocessing.Pool(30) as pool:
pool.starmap(getAllSubCategories, zip(range(numberOfPages, 0, -1), itertools.repeat(items)))
itemsToIgnore[:] = []
# loop through all items
with multiprocessing.Pool(30) as pool:
pool.starmap(checkAndExtract, zip(items, itertools.repeat(ignoredItems), itertools.repeat(itemsToIgnore)))
for item in itemsToIgnore:
if item not in db.get('ignoredItems'): db.ladd('ignoredItems', item)
db.dump()
time.sleep(random.randint(10, 20))
except KeyboardInterrupt:
break
except Exception as e:
print(e)
continue

检查:

import config
def check(item):
title = item...
try:
for keyword in config.keywords: # just a string array
if keyword.lower() in title.lower():
return True
except Exception as e:
print(e)
return False

提取:

from selenium import webdriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
import time
import config
def extract(item, itemsToIgnore):
driver = webdriver.Chrome('./chromedriver')
driver.implicitly_wait(3)
driver.get('https://www.google.com')
for key in config.cookies:
driver.add_cookie({'name': key, 'value': config.cookies[key], 'domain': '.google.com'})
try:
driver.get('https://www.google.com')
wait = WebDriverWait(driver, 10)
if driver.title == 'Page Not Found':
extract(item, itemsToIgnore)
return
driver.find_element_by_xpath('...').click()
time.sleep(1)
button = wait.until(EC.element_to_be_clickable((By.XPATH, '...')))
button.click()
# and some extraction magic
except:
extract(item, itemsToIgnore) # try again

一切正常,一些测试运行成功。但有时循环会在池完成工作之前再次开始。在日志中,我可以看到项目检查器如何返回 true,但提取器甚至没有启动,主进程开始下一次迭代:

2019-12-23 00:21:16,614 [SpawnPoolWorker-6220] [INFO ] check returns true
2019-12-23 00:21:18,142 [MainProcess         ] [DEBUG] starting next iteration
2019-12-23 00:21:39,630 [SpawnPoolWorker-6247] [INFO ] checking subcategory

另外,我猜游泳池不会以某种方式清理,因为我怀疑SpawnPoolWorker-XXXX数字应该那么高。它也会在~1小时后冻结。这可能与此问题有关。

我修复了从 Win7 切换到 Win10 或从星图切换到 starmap_async 并在之后对结果调用 get(( 的循环问题。

冻结很可能是由调用 requests.get(( 而不传递超时值引起的。

您可以尝试将以下内容用于池作业:

poolJob1 = pool.starmap(getAllSubCategories, zip(range(numberOfPages, 0, -1), itertools.repeat(items)))
poolJob1.close()
poolJob1.join()

相关内容

  • 没有找到相关文章