在使用有限数量的内核/线程的同时在后台执行函数并将额外的执行排队?



我想使用有限数量的线程(最多2)在类中运行一个函数,以便在后台删除磁盘上的一些文件。同一类中的其余代码独立于此后台函数,并且可能会比后台函数执行数十倍。但是,我仍然需要强制执行核心/线程限制。因此,后台作业可能会超过2,我需要将它们排队。请注意,我的后台函数不接受任何参数。

我对多线程和多处理很陌生,但我想我已经做了功课,并查看了Stack Overflow上的许多帖子并尝试了几种方法。但是,这些方法似乎都不适合我。这是我的代码结构:

class myClass(object):
def __init__(self):
#some stuff
def backgroundFunc(self):
# delete some files on disk
def mainFunc(self, elem):
# Do some other things
self.backgroundFunc() #I want to run this in the background

这是我运行代码的方式

import myClass
myClassInstance = myClass()
For element in someList:
myClassInstance.mainFunc(elem=element)

请注意,在mainFunc中的内容开始运行之前,我无法启动后台作业。

这是我第一次尝试在我的类文件中使用threading

from threading import Thread
class myClass(object):
def __init__(self):
#some stuff
def backgroundFunc(self):
# delete some files on disk
def mainFunc(self, elem):
# Do some other things
thr = Thread(target=self.backgroundFunc)
thr.start()

然而,这种方法的问题在于程序在随机时间崩溃;有时在prpgram执行开始时,有时稍后erro消息每次也不同。我想这可能是因为线程不会阻塞一段内存,并且可能正在从这些内存单元中写入/读取内容。或者,不太可能,这可能是因为我在服务器上运行我的代码,并且服务器对分配的资源强制实施了一些限制。此外,我无法设置线程数限制,也无法执行队列,以防mainFunc代码在我已经有两个后台作业运行时执行两次以上。

这是multiprocessing.Process的另一个尝试:

from multiprocessing import Process
class myClass(object):
def __init__(self):
#some stuff
def backgroundFunc(self):
# delete some files on disk
def mainFunc(self, elem):
# Do some other things
p = Process(target=self.backgroundFunc)
p.start()

这种方法的问题在于,Process 将使用我的机器可以处理的尽可能多的线程/内核,并且由于我的其余代码自动并行运行,因此一切都会很快变得非常慢。

我最终到达了multiprocessing.Pool但我仍然对如何有效地使用它感到非常困惑。无论如何,这是我对Pool的尝试:

from multiprocessing import Pool
class myClass(object):
def __init__(self):
#some stuff
self.pool = Pool(processes=2)
def backgroundFunc(self):
# delete some files on disk
print('some stuff')
def mainFunc(self, elem):
# Do some other things
self.pool.apply_async(self.backgroundFunc)

但是,apply_async似乎不起作用。我在backgroundFunc中的任何print语句都不会在屏幕上打印任何内容。我在apply_async后添加了self.pool.close(),但在第二个进程启动后不久就出现了一些错误。我尝试使用self.pool.apply和其他一些东西,但它们似乎需要一个接受有限参数的函数。但我的backgroundFunc不接受任何论据。最后,我不知道如何使用 Pool 执行我之前解释的queuing

另外,我想控制要运行backgroundFunc的次数和时间。此外,mainFunc不应等待所有线程完成运行后再退出。如果发生这种情况,我将不会从多线程中受益,因为后台函数可能需要很长时间才能完成。也许我应该在这个问题上更清楚;对此感到抱歉。

因此,如果有人可以帮助我,我将不胜感激。我很困惑。提前感谢!

程序随机崩溃。我想

一次专注于一个问题,而不用猜测会更容易,那么,崩溃是什么?

以下是基于queue示例的threading测试,可能会激发您的灵感。

#!python3
#coding=utf-8
""" https://stackoverflow.com/q/49081260/ """
import sys, time, threading, queue
print(sys.version)
class myClass:
""" """
def __init__(self):
""" """
self.q = queue.Queue()
self.threads = []
self.num_worker_threads = 2
def backgroundFunc(self):
""" """
print("thread started")
while True:
item = self.q.get()
if item is None:
#self.q.task_done()
break
print("working on ", item)
time.sleep(0.5)
self.q.task_done()
print("thread stopping")
def mainFunc(self):
""" """
print("starting thread(s)")
for i in range(self.num_worker_threads):
t = threading.Thread(target=self.backgroundFunc)
t.start()
self.threads.append(t)
print("giving thread(s) some work")
for item in range(5):
self.q.put(item)
print("giving thread(s) more work")
for item in range(5,10):
self.q.put(item)
# block until all tasks are done
print("waiting for thread(s) to finish")
self.q.join()
# stop workers
print("stopping thread(s)")
for i in range(self.num_worker_threads):
self.q.put(None)
for t in self.threads:
self.q.join()
print("finished")

if __name__ == "__main__":
print("instance")
myClassInstance = myClass()
print("run")
myClassInstance.mainFunc()
print("end.")

它打印

3.6.1 (v3.6.1:69c0db5, Mar 21 2017, 17:54:52) [MSC v.1900 32 bit (Intel)]
instance
run
starting thread(s)
thread started
thread started
giving thread(s) some work
giving thread(s) more work
waiting for thread(s) to finish
working on  0
working on  1
working on  2
working on  3
working on  4
working on  5
working on  6
working on  7
working on  8
working on  9
stopping thread(s)
thread stopping
thread stopping
finished
end.

相关内容

  • 没有找到相关文章

最新更新