>我正在并行运行进程,但需要为每个要写入的 cpu 进程创建一个数据库。我只想要每台服务器上分配的 cpu 一样多的数据库,因此将 100 个作业写入 3 个数据库,之后可以合并。
是否有可以将每个工作人员标识为的工作人员 ID 号或核心 ID?
def workerProcess(job):
if workerDBexist(r'c:tempdb' + workerid):
##processjob into this database
else:
makeDB(r'c:tempdb' + workerid)
##first time this 'worker/ core' used, make DB then process
import pp
ppservers = ()
ncpus = 3
job_server = pp.Server(ncpus, ppservers=ppservers)
for work in 100WorkItems:
job_server.submit(workerProcess, (work,))
我所知,pp
的API中没有任何这样的功能。
如果你改用stdlib模块,这将使你的生活更轻松 - 例如,multiprocessing.Pool
需要一个initializer
参数,你可以用它来初始化每个进程的数据库,然后它可以作为每个任务可以使用的变量。
但是,有一个相对简单的解决方法。
每个进程都有一个唯一的(至少在运行时)进程 ID。 在 Python 中,您可以使用 os.getpid()
访问当前进程的进程 ID。因此,在每个任务中,您可以执行以下操作:
dbname = 'database{}'.format(os.getpid())
然后使用 dbname
打开/创建数据库。我不知道"数据库"是指dbm
文件、sqlite3
文件、MySQL 服务器上的数据库还是什么。例如,您可能需要在父级中创建一个tempfile.TemporaryDirectory
,将其传递给所有子项,并让它们将其os.path.join
到 dbname(因此,在所有子项完成后,您可以在os.listdir(the_temp_dir)
中获取所有内容)。
这样做的问题是,如果pp.Server
重新启动其中一个进程,您最终将得到 4 个数据库而不是 3 个。可能没什么大不了的,但你的代码应该处理这种可能性。(IIRC,pp.Server
通常不会重新启动进程,除非您通过restart=True
,但如果其中一个进程崩溃,它可能会这样做。
但是,如果(看起来是这样)您实际上是在一个全新的流程中运行每个任务,而不是使用 3 个进程池呢?好吧,那么你最终将得到与进程一样多的数据库,这可能不是你想要的。您在这里的真正问题是您没有使用 3 个进程的池,这是您应该修复的。但是,还有其他方法可以得到您想要的吗?或。
例如,假设您创建了三个锁,每个数据库一个锁,可能作为锁文件。然后,每个任务都可以执行以下伪代码:
for i, lockfile in enumerate(lockfiles):
try:
with lockfile:
do stuff with databases[i]
break
except AlreadyLockedError:
pass
else:
assert False, "oops, couldn't get any of the locks"
如果你实际上可以锁定数据库本身(使用羊群,或者使用相关数据库的一些 API 等),事情就更容易了:只需尝试依次连接到它们,直到其中一个成功。
只要您的代码实际上没有段错误或类似问题,**如果您实际上一次运行超过 3 个任务,则不可能锁定所有 3 个锁定文件,因此您可以保证获得一个。
* 这并不完全正确,但对于您的目的来说已经足够真实了。例如,在Windows上,每个进程都有一个 唯一的HANDLE
,如果您要求其pid
则如果还没有,则会生成一个。在某些 *nixes 上,每个线程都有一个唯一的线程 ID,进程的pid
是第一个线程的线程 ID。等等。但就您的代码而言,您的每个进程都有一个 独特的pid
,这才是最重要的。
** 即使你的代码崩溃了,你也可以处理它,只是更复杂。例如,使用 pid 文件而不是空锁定文件。获取 pid 文件的读锁定,然后尝试升级到写锁定。如果失败,则从文件中读取 pid,并检查是否存在任何此类进程(例如,在 *nix 上,如果os.kill(pid, 0)
引发,则没有这样的进程),如果是,则强制断开锁。无论哪种方式,现在您已经有了写锁定,因此请将 pid 写入文件。