识别处理器核心或工作线程 ID 并行 Python



>我正在并行运行进程,但需要为每个要写入的 cpu 进程创建一个数据库。我只想要每台服务器上分配的 cpu 一样多的数据库,因此将 100 个作业写入 3 个数据库,之后可以合并。

是否有可以将每个工作人员标识为的工作人员 ID 号或核心 ID?

def workerProcess(job):
  if workerDBexist(r'c:tempdb' + workerid):
    ##processjob into this database
  else:
    makeDB(r'c:tempdb' + workerid)
    ##first time this 'worker/ core' used, make DB then process
import pp
ppservers = ()
ncpus = 3
job_server = pp.Server(ncpus, ppservers=ppservers)
for work in 100WorkItems:
  job_server.submit(workerProcess, (work,))

我所知,pp的API中没有任何这样的功能。

如果你改用stdlib模块,这将使你的生活更轻松 - 例如,multiprocessing.Pool需要一个initializer参数,你可以用它来初始化每个进程的数据库,然后它可以作为每个任务可以使用的变量。

但是,有一个相对简单的解决方法。

每个进程都有一个唯一的(至少在运行时)进程 ID。 在 Python 中,您可以使用 os.getpid() 访问当前进程的进程 ID。因此,在每个任务中,您可以执行以下操作:

dbname = 'database{}'.format(os.getpid())

然后使用 dbname 打开/创建数据库。我不知道"数据库"是指dbm文件、sqlite3文件、MySQL 服务器上的数据库还是什么。例如,您可能需要在父级中创建一个tempfile.TemporaryDirectory,将其传递给所有子项,并让它们将其os.path.join到 dbname(因此,在所有子项完成后,您可以在os.listdir(the_temp_dir)中获取所有内容)。


这样做的问题是,如果pp.Server重新启动其中一个进程,您最终将得到 4 个数据库而不是 3 个。可能没什么大不了的,但你的代码应该处理这种可能性。(IIRC,pp.Server通常不会重新启动进程,除非您通过restart=True,但如果其中一个进程崩溃,它可能会这样做。

但是,如果(看起来是这样)您实际上是在一个全新的流程中运行每个任务,而不是使用 3 个进程池呢?好吧,那么你最终将得到与进程一样多的数据库,这可能不是你想要的。您在这里的真正问题是您没有使用 3 个进程的池,这是您应该修复的。但是,还有其他方法可以得到您想要的吗?或。

例如,假设您创建了三个锁,每个数据库一个锁,可能作为锁文件。然后,每个任务都可以执行以下伪代码:

for i, lockfile in enumerate(lockfiles):
    try:
        with lockfile:
            do stuff with databases[i]
            break
    except AlreadyLockedError:
        pass
else:
    assert False, "oops, couldn't get any of the locks"

如果你实际上可以锁定数据库本身(使用羊群,或者使用相关数据库的一些 API 等),事情就更容易了:只需尝试依次连接到它们,直到其中一个成功。

只要您的代码实际上没有段错误或类似问题,**如果您实际上一次运行超过 3 个任务,则不可能锁定所有 3 个锁定文件,因此您可以保证获得一个。


* 这并不完全正确,但对于您的目的来说已经足够真实了。例如,在Windows上,每个进程都有一个 唯一的HANDLE ,如果您要求pid则如果还没有,则会生成一个。在某些 *nixes 上,每个线程都有一个唯一的线程 ID,进程的pid是第一个线程的线程 ID。等等。但就您的代码而言,您的每个进程都有一个 独特的pid ,这才是最重要的。

** 即使你的代码崩溃了,你也可以处理它,只是更复杂。例如,使用 pid 文件而不是空锁定文件。获取 pid 文件的读锁定,然后尝试升级到写锁定。如果失败,则从文件中读取 pid,并检查是否存在任何此类进程(例如,在 *nix 上,如果os.kill(pid, 0)引发,则没有这样的进程),如果是,则强制断开锁。无论哪种方式,现在您已经有了写锁定,因此请将 pid 写入文件。

最新更新