TL;DR - 使用者进程完成但不加入,没有引发错误,脚本无限运行,卡在join
语句上
?我的目标是加快数据检索过程,但是我不知道可能有多少"任务"(要检索的数据片段(。因此,我制作了毒丸方法的修改版本,以便任务识别何时不再检索信息,并触发毒丸if
语句。
我发布了一个证明,这是我的毒丸方法的一个工作示例,以及一个完整的脚本,顾名思义就是完整的脚本。(两者都应该能够按原样运行(
证明:
import multiprocessing
class Task:
def __init__(self, number):
self.number = number
def __call__(self):
"""Find officer and company data and combine and save it"""
try:
# 'gather some data!'
self.result = self.number*2
print(self.number)
# 'fake' finding no data
if self.result >= 8:
raise NameError
except NameError:
# become poison pill once latest is done
self.result = None
def output(self):
return self.result
class Consumer(multiprocessing.Process):
"""Handle process and re-queue complete tasks"""
def __init__(self, waiting_queue, complete_queue):
multiprocessing.Process.__init__(self)
self.waiting_queue = waiting_queue
self.complete_queue = complete_queue
def run(self):
"""process tasks until queue is empty"""
proc_name = self.name
while True:
current_task = self.waiting_queue.get()
current_task()
if current_task.output() is None:
print('{}: Exiting, poison pill reached'.format(proc_name))
self.waiting_queue.task_done()
break
self.waiting_queue.task_done()
self.complete_queue.put(current_task)
print('{}: complete'.format(proc_name))
class Shepard:
"""Handle life cycle of Consumers, Queues and Tasks"""
def __init__(self):
pass
def __call__(self, start_point):
# initialize queues
todo = multiprocessing.JoinableQueue()
finished = multiprocessing.JoinableQueue()
# start consumers
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(todo, finished) for i in range(num_consumers)]
for q in consumers:
q.start()
# decide on (max) end limit (make much longer than suspected amount of data to be gathered
start = int(start_point)
max_record_range = 100
end = start + max_record_range
# Enqueue jobs
for i in range(start, end):
todo.put(Task(i))
print('Processes joining')
# wait for processes to join
for p in consumers:
p.join()
print('Processes joined')
# process results - UNFINISHED
pass
# return results - UNFINISHED
return 'results!'
if __name__ == '__main__':
# load start points:
start_points = {'cat1': 1, 'cat2': 3, 'cat3': 4}
master = Shepard()
cat1 = master(start_points['cat1'])
print('cat1 done')
cat2 = master(start_points['cat2'])
print('cat2 done')
cat3 = master(start_points['cat3'])
所以这是完整的脚本:
import time
import requests
import sys
import json
import pandas as pd
import multiprocessing
import queue
class CompaniesHouseRequest:
"""Retreive information from Companies House"""
def __init__(self, company, catagory_url=''):
"""Example URL: '/officers'"""
self.company = str(company)
self.catagory_url = str(catagory_url)
def retrieve(self, key='Rn7RLDV9Tw9v4ShDCotjDtJFBgp1Lr4d-9GRYZMo'):
"""retrieve data from Companies House"""
call = 'https://api.companieshouse.gov.uk/company/' + self.company + self.catagory_url
retrieve_complete = False
while retrieve_complete is False:
resp = requests.get(call, auth=requests.auth.HTTPBasicAuth(key, ''))
code = resp.status_code
if code == 404:
print(resp.status_code)
raise NameError('Company not found')
elif code == 200:
try:
self.data = json.loads(resp.content.decode('UTF8'))
retrieve_complete = True
except json.decoder.JSONDecodeError:
print('Decode Error in Officers!')
else:
print("Error:", sys.exc_info()[0])
print('Retrying')
time.sleep(5)
return self.data
class Company:
"""Retrieve and hold company details"""
def __init__(self, company_number):
self.company_number = company_number
def __call__(self):
"""Create request and process data"""
# make request
req = CompaniesHouseRequest(self.company_number)
data = req.retrieve()
# extract data
try:
line = [self.company_number,
data['company_name'],
data['registered_office_address'].get('premises', ''),
data['registered_office_address'].get('address_line_1', ''),
data['registered_office_address'].get('address_line_2', ''),
data['registered_office_address'].get('country', ''),
data['registered_office_address'].get('locality', ''),
data['registered_office_address'].get('postal_code', ''),
data['registered_office_address'].get('region', '')]
except KeyError:
line = ['' for i in range(0, 9)]
# save as pandas dataframe
return pd.DataFrame([line], columns=['company_number', 'company_name', 'company_address_premises',
'company_address_line_1', 'company_address_line_2',
'company_address_country', 'company_address_locality',
'company_address_postcode', 'company_address_region'])
def name_splitter(name):
split = name.split(', ')
if len(split) > 2:
return [split[2], split[1], split[0]]
else:
return ['', split[1], split[0]]
class Officers:
"""Retrieve and hold officers details"""
def __init__(self, company_number):
self.company_number = company_number
def __call__(self):
"""Create request and process data"""
# make request
req = CompaniesHouseRequest(self.company_number, '/officers')
data = req.retrieve()
# extract data
for officer in data['items']:
if officer['officer_role'] == 'director':
name = name_splitter(officer['name'])
line = [name[0],
name[1],
name[2],
officer.get('occupation'),
officer.get('country_of_residence'),
officer.get('nationality'),
officer.get('appointed_on', ''),
officer['address'].get('premises', ''),
officer['address'].get('address_line_1', ''),
officer['address'].get('address_line_2', ''),
officer['address'].get('country', ''),
officer['address'].get('locality', ''),
officer['address'].get('postal_code', ''),
officer['address'].get('region', '')]
break
director_count = sum(map(lambda x: x['officer_role'] == 'director', data['items']))
if director_count > 1:
line += [True]
elif director_count == 1:
line += [False]
else:
line = ['no directors'] * 3 + [''] * 12
return pd.DataFrame([line], columns=['title', 'first_name', 'surname', 'occupation', 'country_of_residence',
'nationality', 'appointed_on',
'address_premises', 'address_line_1', 'address_line_2',
'address_country', 'address_locality', 'address_postcode',
'address_region', 'multi_director'])
class Task:
def __init__(self, prefix, company_number):
self.prefix = prefix
self.company_number = company_number
def __call__(self):
"""Find officer and company data and combine and save it"""
comp_id = self.prefix + str(self.company_number)
print(comp_id)
try:
# initialise company class
comp = Company(comp_id)
# initialise officer class
off = Officers(comp_id)
# retrieve and concatonate
self.result = pd.concat([comp(), off()], axis=1)
except NameError:
# become poison pill once latest is done
self.result = None
def output(self):
return self.result
class Consumer(multiprocessing.Process):
"""Handle process and re-queue complete tasks"""
def __init__(self, waiting_queue, complete_queue):
multiprocessing.Process.__init__(self)
self.waiting_queue = waiting_queue
self.complete_queue = complete_queue
def run(self):
"""process tasks until queue is empty"""
proc_name = self.name
while True:
current_task = self.waiting_queue.get()
current_task()
if current_task.output() is None:
print('{}: Exiting, poison pill reached'.format(proc_name))
self.waiting_queue.task_done()
break
self.waiting_queue.task_done()
self.complete_queue.put(current_task)
print('{}: complete'.format(proc_name))
class Shepard:
"""Handle life of Consumers, Queues and Tasks"""
def __init__(self):
pass
def __call__(self, prefix, start_point):
# initialize queues
todo = multiprocessing.JoinableQueue()
finished = multiprocessing.JoinableQueue()
# start consumers
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(todo, finished) for i in range(num_consumers)]
for q in consumers:
q.start()
# decide on (max) end limit
start = int(start_point)
max_record_range = 1000
end = start + max_record_range
# Enqueue jobs
for i in range(start, end):
todo.put(Task(prefix, i))
print('Processes joining')
# wait for processes to join
for p in consumers:
p.join()
print('Processes joined')
# process results - UNFINISHED
pass
# return results - UNFINISHED
return 'results!'
if __name__ == '__main__':
# paths to data
data_directory = r'C:UsershdewintonOneDrive - Advanced Payment SolutionsPythonCorporate DMdata'
base = r'base'
# load start points:
init = {"England": 10926071, "Scotland": 574309, "Ireland": 647561}
# gather data for each catagory
master = Shepard()
ireland = master('NI', init['Ireland'])
scotland = master('SC', init['Scotland'])
england = master('', init['England'])
TL;DR-后果(在消费者无法加入时陷入困境(可以通过更改以下内容来解决:
finished = multiprocessing.JoinableQueue()
对此:
mananger = multiprocessing.Manager()
finished = mananger.Queue()
详细信息-"将对象放入队列时,将酸洗该对象,后台线程稍后会将酸洗的数据刷新到基础管道。这会产生一些令人惊讶的后果,但不应该造成任何实际困难——如果它们真的困扰你,那么你可以使用与经理一起创建的队列。
第二个队列是成品,如果添加了一定数量的任务,则会触发上述令人惊讶的后果之一。低于限制没有问题,超过限制就会发生后果。这不会在虚拟人中发生,因为第二个队列虽然存在,但未使用。限制取决于Task
对象的大小和复杂性,因此我认为这与仅在达到一定数据量后才会刷新腌制数据有关 - 数据量会触发此后果
附录- 实施修复后,还会出现另一个错误:当todo
队列的使用者在队列之前终止时,会发生管道错误 为空,使管道在队列对象中没有要向其发送数据的连接对象。这将触发WinError 232
。不过不用担心,可以通过在退出使用者之前清空队列来修复管道错误。 只需将其添加到消费者类运行方法中:
while not self.waiting_queue.empty():
try:
self.waiting_queue.get(timeout=0.001)
except:
pass
self.waiting_queue.close()
这会从队列中删除每个元素,确保它在主while
循环之后,并且不应发生管道错误,因为使用者将在终止之前清空 will 队列。