僵尸进程,我们再来一次



我在多处理/线程/子处理方面挣扎了很多。我基本上要做的是执行计算机上可用的每个二进制文件,我编写了一个python脚本来执行此操作。但是我一直有僵尸进程("已失效"(,如果我的所有 4 个工人都处于这种状态,最终会陷入僵局。 我尝试了很多不同的东西,但似乎没有什么能:(

以下是体系结构的外观:

|   _ python -m dataset --generate
|       _ worker1
|       |   _ [thread1] firejail bin1
|       _ worker2
|       |   _ [thread1] firejail bin1
|       |   _ [thread2] firejail bin2
|       |   _ [thread3] firejail bin3
|       _ worker3
|       |   _ [thread1] [firejail] <defunct>
|       _ worker4
|       |   _ [thread1] [firejail] <defunct>

我创建了 4 个工人:

# spawn mode prevents deadlocks https://codewithoutrules.com/2018/09/04/python-multiprocessing/
with get_context("spawn").Pool() as pool:
results = []
for binary in binaries:
result = pool.apply_async(legit.analyse, args=(binary,),
callback=_binary_analysis_finished_callback,
error_callback=error_callback)
results.append(result)

(注意我使用"生成"池,但现在我想知道它是否有任何用处......

每个工作线程将创建多个线程,如下所示:

threads = []
executions = []
def thread_wrapper(*args):
flows, output, returncode = _exec_using_firejail(*args)
executions.append(Execution(*args, flows, is_malware=False))
for command_line in potentially_working_command_lines:
thread = Thread(target=thread_wrapper, args=(command_line,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()

并且每个线程都会在消防监狱沙盒中启动一个新进程:

process = subprocess.Popen(FIREJAIL_COMMAND +
["strace", "-o", output_filename, "-ff", "-xx", "-qq", "-s", "1000"] + command_line,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)
try:
out, errs = process.communicate(timeout=5, input=b"YnYnYnYnYnYnYnYnYnYnYnYnYnYnYnYn")
# print("stdout:", out)
# print("stderr:", errs)
except subprocess.TimeoutExpired:
# print(command_line, "timed out")
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
out, errs = process.communicate()

我使用os.killpg()而不是process.kill()因为出于某些原因,我的 Popen 进程的子进程没有被杀死...... 这要归功于preexec_fn=os.setsid它设置了所有后代的 gid。但即使使用这种方法,某些进程(例如 zsh(也会引发僵尸进程,因为它看起来像 zsh 改变了它的 gid,所以我的os.killpg无法按预期工作......

我正在寻找一种方法来 100% 确定所有进程都将死亡。

如果要为此使用subprocess模块,则应直接使用process对象的.kill方法,而不是使用os模块。 使用communicate是一种阻止操作;所以Python会等到响应。 使用timeout参数会有所帮助,但对于许多进程来说会很慢。

import subprocess
cmd_list = (
FIREJAIL_COMMAND 
+ ["strace", "-o", output_filename, "-ff", "-xx", "-qq", "-s", "1000"] 
+ command_line
) 
proc = subprocess.Popen(
cmd_list,
stdout=subprocess.PIPE, 
stderr=subprocess.PIPE, 
preexec_fn=os.setsid
)
try:
out, errs = proc.communicate(timeout=5, input=b"Yn" * 16)
except subprocess.TimeoutExpired:
proc.kill()
out, errs = None, None
ret_code = process.wait()

如果要在一组进程的非阻塞循环中运行它,那就是使用poll. 下面是一个示例。 这假定您有一个要提供给流程创建的filenames和相应command_lines的列表。

import subprocess
import time
def create_process(output_filename, command_line):
cmd_list = (
FIREJAIL_COMMAND 
+ ["strace", "-o", output_filename, "-ff", "-xx", "-qq", "-s", "1000"] 
+ command_line
) 
proc = subprocess.Popen(
cmd_list,
stdout=subprocess.PIPE, 
stderr=subprocess.PIPE, 
preexec_fn=os.setsid
)
return {proc: (output_filename, command_line)}
processes = [create_process for f, c in zip(filenames, command_lines)]
TIMEOUT = 5
WAIT = 0.25  # how long to wait between checking the processes
finished = []
for _ in range(round(TIMEOUT / WAIT)):
finished_new = []
if not processes:
break
for proc in processes:
if proc.poll():
finished_new.append(proc)
# cleanup
for proc in finished_new:
process.remove(proc)
finished.extend(finished_new)
time.sleep(WAIT)

相关内容

  • 没有找到相关文章

最新更新