我有一个方案,我可以在其中处理一个要处理的作业列表,例如网页列表从Internet 中爬行)。每个作业都是独立,并且可以按任何顺序处理这些作业。单个工作可能 Fail 或成功可能必须相应地处理(例如,失败 crawl任务的临时数据可能必须删除,并且在下一轮中新闻稿)
我正在尝试使用基于线程的处理在Python中实现它。为了模仿实际任务,可以说我有一个大量的整数数组列表,并且单个作业是计算每个数组的Sum
或Product
。我要做的是使用JobsProcessor
类对象来实例化JobWorker
类对象的线程,这些线程通过为其他类创建对象(在此处创建Sum
和Product
)来执行实际处理。下面提到了同一代码。显示了片段
from queue import Queue, Empty
from threading import Thread
import time
class Product:
def __init__(self,data):
self.data = data
def doOperation(self):
try:
product =self.data[0]
for d in self.data[1:]:
if d>100000:
raise Exception( "Forcefully throwing exception")
product*=d
time.sleep(1)
return product
except:
return "product computation failed"
class Sum:
def __init__(self,data):
self.data = data
def doOperation(self):
try:
sum =0
for d in self.data:
sum+=d
time.sleep(1)
return sum
except:
return "sum computation failed"
class JobWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
try:
jobitem = self.queue.get_nowait()
if jobitem is None:
break
jobdata, optype = jobitem
if optype =='sum':
opobj = Sum(jobdata)
jobresult = opobj.doOperation()
elif optype =='product':
opobj = Product(jobdata)
jobresult = opobj.doOperation()
else:
print ("Invalid op type")
jobresult = 'Failed'
print(" job result", jobresult)
self.queue.task_done()
except Empty:
break
except:
print ("Some exception occured")
#How to pass it to up to the main jobs processor#
class JobsProcessor(object):
def __init__(self, joblist):
self.joblist = joblist
self.job_queue = Queue()
def process_resources(self):
try:
for job in self.joblist:
self.job_queue.put(job)
for i in range(2):
jobthread = JobWorker(self.job_queue)
jobthread.start()
'''
Write code here to monitor current status for all running jobs
'''
self.job_queue.join()
'''I want to write code here to track progress status for all jobs
Some jobs may have failed, not completed and based on that I may
want to take further action such as retry or flag them'''
print("Finished Jobs")
except:
pass
orgjobList = [ ([1,5,9,4],'sum'),
([5,4,5,8],'product'),
([100,45,678,999],'product'),
([3743,34,44324,543],'sum'),
([100001, 100002, 9876, 83989], 'product')]
mainprocessor = JobsProcessor(orgjobList)
mainprocessor.process_resources()
我想在此过程中添加2个功能。
- 合并:当所有作业线程完成时,我想知道所有
JobWorker
对象的状态(例如,如果成功/完成失败,则它们完成了)。失败/异常可能发生在 jobworker 对象中,或者甚至可能是 sum 或 product 对象。失败/成功状态应传播回 jobsprocessor ,我想根据返回的状态执行其他操作,例如Recocess/delete/send_elsewhere等。 - 监视 - 我也想具有
Monitor
功能,该功能可以连续检查当前运行/完成的作业的状态并执行必要的操作,例如 delete 而不是等到结束 consolidation
请建议我如何添加上述功能,如果其中只有一个足以在诸如crawling Pages 之类的情况下。也欢迎其他任何建议。
您可以以两种方式添加代码中的两个功能 -
- 使用全局变量(最简单的方法)
- 在您的班级中使用
getProgress
和getStatus
方法(优雅方法)
您可以创建2个线程,一个线程进行实际工作并更新进度变量。
对于第二种方法,您可以在__init__
类中设置两个VAR,如以下内容。
def __init__(self):
self.progress = 0
self.success = True
self.isDone = False
self.error = "No Error Occurred"
然后,您可以在代码中包含逻辑,如以下 -
def actualWork(self):
self.isDone = 0
try:
for i in range(1000):
self.progress = i
time.sleep(0.01)
self.isDone = True
except Exception as e:
self.success = False
self.error = str(e)
def getProgress(self):
return self.progress
def getError(self):
return self.error