是交易,我的数据库中有某些条目。我正在打电话给django:
variables = Variable.objects.order_by('foo').values('foo')
然后,我有一个用于语句,在每个发现的变量上执行:
for x in variables:
#doing something....
我的问题是"做某事"是一项持续的任务...即不会停止。那么我如何能够在第二个变量上运行循环?
我相信这与合并有关,但这是否意味着我只能一次有4个过程?如果我想说50个单独的进程以每50个变量运行,并且每个过程要等到一定时间或任何时候都不会停止..
,我该怎么办。甚至可以做到这一点。
这是我的多处理代码:
if __name__ == '__main__':
x = Variable.objects.order_by('foo').values('foo')
for t in x:
t = t.values()
foo = "".join(t)
info('Starting...')
p = Process(target=myfunction, args=(foo,))
p.start()
p.join()
myfunction是在无限环上运行的东西...
@samuel:
# globals
my_queue = multiprocessing.Manager().Queue() # queue to store our values
stop_event = multiprocessing.Event() # flag which signals processes to stop
my_pool = None
def my_function(foo):
while not stop_event.is_set():
print("starting %s" % foo)
try:
var = my_queue.get_nowait() # getting value from queue
except Queue.Empty:
print "No more items in queue"
# do you logic here
# Since `t` could have unlimited size but do wan't to limit processes
# we'll put all `t` value in queue
x = Company.objects.order_by('ticker').values('ticker')
for t in x:
foo = t.values()
my_queue.put(foo)
MAX_PROCESSES = len(x)
my_pool = multiprocessing.Pool(MAX_PROCESSES)
for i in range(MAX_PROCESSES):
my_pool.apply_async(my_function, args=(foo,))
my_pool.close()
my_pool.join()
这就是可以使用多处理库实现解决方案的方式。
我们将使用Pool
,apply_async
和Queue
# globals
MAX_PROCESSES = 50
my_queue = multiprocessing.Manager().Queue() # queue to store our values
stop_event = multiprocessing.Event() # flag which signals processes to stop
my_pool = None
def my_function(proc_name, var):
while not stop_event.is_set():
# do you logic here with var variable
def var_scanner_process():
# Since `t` could have unlimited size we'll put all `t` value in queue
while not stop_event.is_set(): # forever scan `values` for new items
x = Variable.objects.order_by('foo').values('foo')
for t in x:
t = t.values()
my_queue.put(t)
time.sleep(10)
try:
var_scanner_process = Process(target=var_scanner)
var_scanner_process.start()
my_pool = multiprocessing.Pool(MAX_PROCESSES)
while not stop_event.is_set():
try: # if queue isn't empty, get value from queue and create new process
var = my_queue.get_nowait() # getting value from queue
p = Process(target=my_function, args=("process-%d" % i, var))
p.start()
exception Queue.Empty:
print "No more items in queue"
except KeyboardInterrupt as stop_test_exception:
print(" CTRL+C pressed. Stopping test....")
stop_event.set()