BeautifulSoup 4 在 Python 中的多处理问题



我使用大部分或全部内核更快地处理文件时遇到问题,它可以一次读取多个文件或使用多个内核读取单个文件。

我更喜欢使用多个内核来读取单个文件,然后再将其移动到下一个文件。

我尝试了下面的代码,但似乎无法用完所有核心。

下面的代码基本上会检索 *.txt 包含 htmls 的目录中的文件,采用 json 格式。

#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
import json
import urlparse
import os
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool  # This is a thread-based Pool
from multiprocessing import cpu_count
def crawlTheHtml(htmlsource):
htmlArray = json.loads(htmlsource)
for eachHtml in htmlArray:
soup = BeautifulSoup(eachHtml['result'], 'html.parser')
if all(['another text to search' not in str(soup),
'text to search' not in str(soup)]):
try:
gd_no = ''
try:
gd_no = soup.find('input', {'id': 'GD_NO'})['value']
except:
pass
r = requests.post('domain api address', data={
'gd_no': gd_no,
})
except:
pass

if __name__ == '__main__':
pool = Pool(cpu_count() * 2)
print(cpu_count())
fileArray = []
for filename in os.listdir(os.getcwd()):
if filename.endswith('.txt'):
fileArray.append(filename)
for file in fileArray:
with open(file, 'r') as myfile:
htmlsource = myfile.read()
results = pool.map(crawlTheHtml(htmlsource), f)

最重要的是,我不确定,f代表什么。

问题 1 :

我没有正确利用所有内核/线程吗?

问题2 :

有没有更好的方法来使用 try : 除了 :因为有时值不在页面中,这会导致脚本停止。在处理多个变量时,我最终会得到很多 try & except 语句。

回答问题 1,您的问题是这一行:

from multiprocessing.dummy import Pool  # This is a thread-based Pool

答案取自:Python中的multiprocessing.dummy没有使用100%的cpu

当你使用multiprocessing.dummy时,你使用的是线程,而不是进程:

multiprocessing.dummy 复制了 multiprocessing 的 API,但不是 不仅仅是线程模块周围的包装器。

这意味着您受到全局解释器锁 (GIL( 的限制,并且一次只有一个线程可以实际执行 CPU 密集型操作。这将使您无法充分利用CPU。如果你想在所有可用内核上获得完全并行性,你将需要解决你在multiprocessing.Pool遇到的酸洗问题。

我有这个问题 你需要做

from multiprocessing import Pool
from multiprocessing import freeze_support

最后你需要做

if __name__ = '__main__':
freeze_support()

你可以继续你的脚本

from  multiprocessing import Pool, Queue
from os import getpid
from time import sleep
from random import random
MAX_WORKERS=10
class Testing_mp(object):
def __init__(self):
"""
Initiates a queue, a pool and a temporary buffer, used only
when the queue is full.
"""
self.q = Queue()
self.pool = Pool(processes=MAX_WORKERS, initializer=self.worker_main,)
self.temp_buffer = []
def add_to_queue(self, msg):
"""
If queue is full, put the message in a temporary buffer.
If the queue is not full, adding the message to the queue.
If the buffer is not empty and that the message queue is not full,
putting back messages from the buffer to the queue.
"""
if self.q.full():
self.temp_buffer.append(msg)
else:
self.q.put(msg)
if len(self.temp_buffer) > 0:
add_to_queue(self.temp_buffer.pop())
def write_to_queue(self):
"""
This function writes some messages to the queue.
"""
for i in range(50):
self.add_to_queue("First item for loop %d" % i)
# Not really needed, just to show that some elements can be added
# to the queue whenever you want!
sleep(random()*2)
self.add_to_queue("Second item for loop %d" % i)
# Not really needed, just to show that some elements can be added
# to the queue whenever you want!
sleep(random()*2)
def worker_main(self):
"""
Waits indefinitely for an item to be written in the queue.
Finishes when the parent process terminates.
"""
print "Process {0} started".format(getpid())
while True:
# If queue is not empty, pop the next element and do the work.
# If queue is empty, wait indefinitly until an element get in the queue.
item = self.q.get(block=True, timeout=None)
print "{0} retrieved: {1}".format(getpid(), item)
# simulate some random length operations
sleep(random())
# Warning from Python documentation:
# Functionality within this package requires that the __main__ module be
# importable by the children. This means that some examples, such as the
# multiprocessing.Pool examples will not work in the interactive interpreter.
if __name__ == '__main__':
mp_class = Testing_mp()
mp_class.write_to_queue()
# Waits a bit for the child processes to do some work
# because when the parent exits, childs are terminated.
sleep(5)

最新更新