防止多进程Python应用中的线程重复



我有一个Python应用程序,它在multiprocessing.Process启动的子进程中运行多个作业。父应用程序还启动一个线程,向数据库报告进度。但是,我注意到,如果任何作业启动它们自己的子进程,它们就会复制这个线程,从而导致数据库中的数据损坏。例如,当一个孙子子进程完成时,它的线程在数据库中将父进程标记为完成,因为它认为它父进程,即使父进程仍在运行。

我如何使用multiprocess.Process,使它不复制任何当前正在运行的线程?最简单的选项是记录原始PID在我的线程,如果"当前"PID不匹配这个值,然后立即退出?

我在去年看到过类似的问题,但它似乎被忽略了。

您对问题的描述表明父进程中的后台线程继续存在并在子进程中执行。这是不可能的;至少在POSIX系统上是不可能的。你的情况是另一回事。我将在下面提供一些推测,然后是如何避免这个问题的建议。依次考虑这些要点……

1。只有一个线程存活。

只有调用fork()的线程分叉后仍然存活。下面是一个小示例,说明其他线程不会在子进程中继续执行:

def output():
    time.sleep(3)
    print "Thread executing in process: %d" % os.getpid()
thread = threading.Thread(target=output)
thread.start()
os.fork()
print "Pid: %d" % os.getpid()

您将看到父线程和子线程都将它们的pid输出到stdout,但是第二个线程将只在父线程中产生输出。

所以让监控线程检查它的pid或其他方式检查它正在运行的进程的条件不会有什么不同;该线程只能在一个进程中执行。

2。在某些情况下,分叉可能会导致类似您所看到的问题。

分叉会以各种方式导致程序状态的损坏。例如:

    由于fork而死亡的线程中引用的对象可能超出作用域,因此会调用它们的终结器。如果这样一个对象表示一个网络资源,并且调用它的del方法导致连接的一端意外关闭,那么这可能会导致问题。
  • 任何缓冲的IO都会导致问题,因为缓冲区在子进程中是重复的。

注意,第二点甚至不需要线程。考虑以下内容:

f = open("testfile", "w", 1024)
f.write("a")
os.fork()

我们写了一个字符到testfile,我们在父节点分叉之前这样做了。但是我们在内容未刷新的情况下进行了分叉,因此:

alp:~ $ wc -c testfile
      2 testfile

文件包含两个字符,因为输出缓冲区被复制到子进程,父进程和子进程最终都刷新了它们的缓冲区。

我怀疑你的问题是由第二个问题引起的(尽管我很高兴地承认这纯粹是猜测)。

3。重新架构以避免此类问题。

你在评论中提到,你不能启动监视器线程后产卵你的工人,因为你需要反复创建新的工人。为了避免这种情况,重组你正在做的事情可能比你想象的要容易。与其为每个新的工作单元生成一个进程,不如创建一组由控制进程管理的长期工作:控制器向队列提供需要处理的作业的规范;它在闲暇时这样做。每个worker都无限循环,在队列到达时从队列中提取作业并执行它们。(multiprocessing的队列实现将保证每个作业描述仅由一个工人绘制。)因此,您只需要在早期生成一次worker,并且可以在所有分叉完成后创建监视器线程。

下面是这种组织的示意图:

from multiprocessing import Process, Queue
def work(q):
    while True:
        job = q.get()
        if job is None:
            # We've been signaled to stop.
            break
        do_something_with(job)
queue = Queue()
NUM_WORKERS = 3
NUM_JOBS = 20
# Start workers.
for _ in range(NUM_WORKERS):
    p = Process(target=work, args=(queue,))
    p.start()
# Create your monitor thread here.
# Put work in the queue.  This continues as long as you want.
for i in range(NUM_JOBS):
    queue.put(i)
# When there's no more work, put sentinel values in the queue so workers
# know to gracefully exit.
for _ in range(NUM_WORKERS):
    queue.put(None)

最新更新