Python 多处理作业提交基于所有正在运行的作业的聚合条件



基于Python多处理作业提交的聚合标准,适用于所有正在运行的作业

我有一份工作需要在 Teradata 数据库上做一些工作,并将数据库会话数作为参数。数据库的最大数据库会话数限制为 60。我是否可以使用多处理有条件地处理作业,以便所有活动子进程 <= max_num_db_sessions 中的 sum(num_db_sessions( ?

我只是在下面粘贴一些伪代码:

import multiprocessing as mp
import time
def dbworker(db_object, num_db_sessions):
# do work on db_object #####
# The sum(num_db_sessions) <= max_num_db_sessions 
print (db_object, num_db_sessions)
# The db_objs with larger num_db_sessions take longer to finish
time.sleep(num_db_sessions)
return
if __name__ == "__main__":
max_num_db_sessions = 60
# JobsList (db_object,num_db_sessions)
jobs_list = [('A', 15), ('B', 15), ('C', 15), ('D', 15)
, ('E', 1), ('F', 1), ('G', 1), ('H', 1)
, ('I', 1), ('J', 1), ('K', 1), ('L', 1)
, ('M', 2), ('N', 1), ('O', 1), ('P', 1)
, ('Q', 2), ('R', 2), ('S', 2), ('T', 2)
, ('U', 2), ('V', 2), ('W', 2), ('X', 2)
, ('Y', 2), ('Z', 2)]
## Submit jobs_list to mutltiprocessing ####
for db_object,num_db_sessions in jobs_list:
dbworker(db_object,num_db_sessions) ## -->>> sum(num_db_sessions) <=  max_num_db_sessions
## Is this possible ??

我已经想通了。下面的代码执行此操作。关键要素是:

1( 运行单独的守护进程以将任务放入队列中。此目标函数执行业务流程

2( 将计数器实现为多处理值,用于跟踪当前正在运行的会话数。计数器的实现取自 https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

3( 实现 multiprocessing.manager((.list(( 来跟踪未提交的作业。

4(使用毒丸通过发送无*number_of_child_processes来破坏工作进程,如毒丸方法中实现的那样。这是从 https://pymotw.com/3/multiprocessing/communication.html

worker 函数使用 time.sleep(num_db_sessions( 作为模拟工作负载的方式(处理时间更长(

这是代码。

import multiprocessing
import time
class Counter(object):
def __init__(self, initval=0):
self.val = multiprocessing.Value('i', initval)
self.lock = multiprocessing.Lock()
def increment(self,val):
with self.lock:
self.val.value += val
def value(self):
with self.lock:
return self.val.value
def queue_manager(tasks,results,jobs_list,counter,max_num_db_sessions,num_consumers):
proc_name = multiprocessing.current_process().name
while len(jobs_list) > 0:
current_counter = counter.value()
available_sessions = max_num_db_sessions - current_counter
if available_sessions > 0:
prop_list = [(p,s) for p,s in jobs_list if s <= available_sessions]
if (len(prop_list)) > 0:
with multiprocessing.Lock():
print(prop_list[0])
tasks.put(prop_list[0][0])
jobs_list.remove(prop_list[0])
counter.increment(prop_list[0][1])
print("Process: {} -- submitted:{} Counter is:{} Sessions:{}".format(proc_name
    , prop_list[0][0]
    , current_counter
    , available_sessions)
)
else:
print("Process: {} -- Sleeping:{} Counter is:{} Sessions:{}".format(proc_name
           , str(5)
           , current_counter
           , available_sessions)
)
time.sleep(5)
else:
for i in range(num_consumers):
tasks.put(None)
def worker(tasks,counter,proc_list):
proc_name = multiprocessing.current_process().name
while True:
obj = tasks.get()
if obj is None:
break
name,age = [(name,sess) for name,sess in proc_list if name == obj][0]
print("Process: {} -- Processing:{} Sleeping for:{} Counter is:{}".format(proc_name
        ,name
        ,age
        ,counter.value())
)
time.sleep(age)
counter.increment(-age)
print("Process: {} -- Exiting:{} Sleeping for:{} Counter is:{}".format(proc_name
        ,name
        ,age
        ,counter.value())
)
if __name__ == '__main__':
max_num_db_sessions = 60
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue() # This will be unused now. But will use it.
mpmanager = multiprocessing.Manager()
proc_list = [('A', 15), ('B', 15), ('C', 15), ('D', 15)
, ('E', 1), ('F', 1), ('G', 1), ('H', 1)
, ('I', 1), ('J', 1), ('K', 1), ('L', 1)
, ('M', 2), ('N', 1), ('O', 1), ('P', 1)
, ('Q', 2), ('R', 2), ('S', 2), ('T', 2)
, ('U', 2), ('V', 2), ('W', 2), ('X', 2)
, ('Y', 2), ('Z', 2)]
jobs_list = mpmanager.list(proc_list)
counter = Counter(0)
num_cpu = 3
d = multiprocessing.Process(name='Queue_manager_proc'
,target=queue_manager
,args=(tasks, results, jobs_list, counter
, max_num_db_sessions, num_cpu)
)
d.daemon = True
d.start()
jobs = []
for i in range(num_cpu):
p = multiprocessing.Process(name="Worker_proc_{}".format(str(i+1))
,target=worker
,args=(tasks,counter,proc_list)
)
jobs.append(p)
p.start()
for job in jobs:
job.join()
d.join()

最新更新