进程池不可靠,要么被卡住,要么运行良好



您好,感谢您的帮助:(

我正在编写的程序基本上是反汇编可执行文件。我只是想看看我是否可以通过使用pathos来使它更快。问题是它运行不可靠。我稍后将解释我所说的可靠是什么意思。

程序运行方式如下:

from ControlFlow import Disassembler, DisassemblerWorker
from ControlFlow import FlowGraph
import networkx as nx
import time
file_path = "/vagrant/SimpleTestBinaries/example3-x64"
start = time.time()
flow = Disassembler(file_path)
graph = FlowGraph(flow)
end = time.time()
print("Finished in: ", end - start, " seconds")

通常它会回复:

Finished in: 0.8992343274389473 seconds

但有时它似乎被卡住了。毕竟,正如你在上面看到的,这应该需要不到一秒钟的时间。所以我继续杀死它,它给了我一堆错误,这可能暗示了它在哪里被卡住了。

Process ForkPoolWorker-11:
Process ForkPoolWorker-13:
Process ForkPoolWorker-10:
Traceback (most recent call last):
Traceback (most recent call last):
Process ForkPoolWorker-12:
File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
self.run()
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
self.run()
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
task = get()
File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 337, in get
with self._rlock:
File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
task = get()
File "/usr/local/lib/python3.6/dist-packages/multiprocess/synchronize.py", line 101, in __enter__
return self._semlock.__enter__()
File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
task = get()
File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
task = get()
File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 337, in get
with self._rlock:
File "/usr/local/lib/python3.6/dist-packages/multiprocess/synchronize.py", line 101, in __enter__
return self._semlock.__enter__()
File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 337, in get
with self._rlock:
KeyboardInterrupt
File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 338, in get
res = self._reader.recv_bytes()
File "/usr/local/lib/python3.6/dist-packages/multiprocess/synchronize.py", line 101, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
File "/usr/local/lib/python3.6/dist-packages/multiprocess/connection.py", line 219, in recv_bytes
buf = self._recv_bytes(maxlength)
KeyboardInterrupt
File "/usr/local/lib/python3.6/dist-packages/multiprocess/connection.py", line 410, in _recv_bytes
buf = self._recv(4)
File "/usr/local/lib/python3.6/dist-packages/multiprocess/connection.py", line 382, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
in 
6 file_path = "/vagrant/SimpleTestBinaries/example3-x64"
7 start = time.time()
----> 8 flow = Disassembler(file_path)
9 graph = FlowGraph(flow)
10 end = time.time()
/vagrant/BinaryResearch/ControlFlow.py in __init__(self, path)
34         self.regs_write_map = {}
35         self.section_map = {}
---> 36         self._analyze_flow()
37 
38     def disassembler_setup(self, architecture, details=True):
/vagrant/BinaryResearch/ControlFlow.py in _analyze_flow(self)
77             jumps = p.amap(worker.get_jump_map, imagebase)
78             returns = p.amap(worker.get_return_map, imagebase)
---> 79             p.close(), p.join()
80 
81             call_results, jump_results, return_results = calls.get()[0], jumps.get()[0], returns.get()[0]
/usr/local/lib/python3.6/dist-packages/pathos/multiprocessing.py in join(self)
206         _pool = __STATE.get(self._id, None)
207         if _pool and self.__nodes == _pool.__nodes:
--> 208             _pool.join()
209         return
210     # interface
/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py in join(self)
544         util.debug('joining pool')
545         assert self._state in (CLOSE, TERMINATE)
--> 546         self._worker_handler.join()
547         self._task_handler.join()
548         self._result_handler.join()
/usr/lib/python3.6/threading.py in join(self, timeout)
1054 
1055         if timeout is None:
-> 1056             self._wait_for_tstate_lock()
1057         else:
1058             # the behavior of a negative timeout isn't documented, but
/usr/lib/python3.6/threading.py in _wait_for_tstate_lock(self, block, timeout)
1070         if lock is None:  # already determined that the C code is done
1071             assert self._is_stopped
-> 1072         elif lock.acquire(block, timeout):
1073             lock.release()
1074             self._stop()
KeyboardInterrupt: 

所以,我去检查它引用的代码部分。我不知道这是否意味着它被卡在p.close()p.join()之间。这是它指向的ControlFlow.py中的片段。

from pathos.multiprocessing import ProcessPool
# More code ...
p = ProcessPool()
for section in available_sections:
worker = DisassemblerWorker(self.architecture, section.content, section.virtual_address)
p.clear()
calls = p.amap(worker.get_call_map, imagebase)
jumps = p.amap(worker.get_jump_map, imagebase)
returns = p.amap(worker.get_return_map, imagebase)
p.close(), p.join()
call_results, jump_results, return_results = calls.get()[0], jumps.get()[0], returns.get()[0]
# More code ...

所以我真的不知道是什么原因导致它不可靠。我知道这听起来很疯狂,但一旦这个项目第一次"成功",它似乎在之后运行得很好。此外,我应该说,我正在一个Jupyter笔记本中运行这个。我读到multiprocessing与笔记本电脑不兼容,所以我使用multiprocess

有什么想法吗?

再次感谢!

我在底层python多处理库(Pool(上也遇到过同样的问题:很多次都很好用,有时似乎会遇到问题。

文档指出,需要使用调用之外的池对方法进行多处理。https://docs.python.org/2/library/multiprocessing.html

在实践中,if似乎需要在if main之外定义方法。我不知道为什么这种行为看起来不一致,但遵循指南在某些情况下对我有效。

我发现还有其他情况不起作用,最终的解决方案似乎是将并行的方法写在一个单独的文件中。我在下面的问题中找到了最后一点,这为我解决了问题:Jupyter笔记本电脑从未使用多处理(Python 3(完成处理

下面是将函数f放在一个单独的文件中并位于main之外的示例。

**file 1: my_methods.py**
def f(x):
return x.count()
**file 2: main.py or your jupyter notebook, in the same directory here**
import multiprocessing as mp
from my_methods import f
def parallelize(df, func, n_cores=4):
df_split = np.array_split(df, n_cores)
pool = mp.Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df   
if __name__ == '__main__':
output = parallelize(
df=chosen_input_dataframe,
func = f,
n_cores=4,
)

print(output)

相关内容

  • 没有找到相关文章

最新更新