如何从python中的主脚本退出多处理池进程中的while循环



EDIT 3:请参阅末尾的最后一个示例。

我需要一个while循环,通过USB连接进行连续的发送和返回操作。在这个连续的操作过程中,我需要(在我的主脚本中的其他内容中(在同一个USB连接上进行一些相同且独立的发送/返回操作。这似乎需要多处理和一些调整。

我想对多处理库使用以下解决方法:

  1. 将连续发送/返回操作放在具有池(apply_async(的另一个线程上
  2. 将此过程置于";保持";当我执行隔离的发送/返回操作时(使用clear(((
  3. 在隔离的发送/返回操作之后,立即恢复连续的发送/返送(使用set(((
  4. 当我到达主脚本的末尾时,停止连续的发送/返回(这里我还没有解决方案,应该是x.Stop((或类似的东西,因为terminate((不行(
  5. 从停止的进程中获取一些返回值(使用Get(((

我已经尝试了几件事,但我无法通过主命令退出while循环。

import multiprocessing
import time
def setup(event):
global unpaused
unpaused = event

class func:
def __init__(self):
self.finished = False

def stop(self):
self.finished = True

def myFunction(self, arg):
i = 0
s=[]
while self.finished == False:
unpaused.wait()
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
return s

if __name__ == "__main__":
x=func()
event = multiprocessing.Event() # initially unset, so workers will be paused at first
pool = multiprocessing.Pool(1, setup, (event,))
result = pool.apply_async(x.myFunction, (10,))
print('We unpause for 2 sec')
event.set()   # unpause
time.sleep(2)
print('We pause for 2 sec')
event.clear() # pause
time.sleep(2)
print('We unpause for 2 sec')
event.set()   # unpause
time.sleep(2)
print('Now we try to terminate in 2 sec')
time.sleep(2)
x.stop()
return_val = result.get()
print('get worked with '+str(return_val))

有人能给我指正确的方向吗?正如所看到的,这不会随着x.stop((而停止。全球价值观也不起作用。

提前谢谢。

编辑:

按照建议,我试图将多处理放在一个单独的对象中。这是通过像下面的例子一样把函数放在类中来完成的吗?

import multiprocessing
import time
class func(object):    
def __init__(self):
self.event = multiprocessing.Event() # initially unset, so workers             will be paused at first
self.pool = multiprocessing.Pool(1, self.setup, (self.event,))

def setup(self):
global unpaused
unpaused = self.event
def stop(self):
self.finished = True

def resume(self):
self.event.set() # unpause

def hold(self):
self.event.clear() #pause
def run(self, arg):
self.pool.apply_async(self.myFunction, (arg,))

def myFunction(self, arg):
i = 0
s=[]
self.finished = False
while self.finished == False:
unpaused.wait()
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
return s

if __name__ == "__main__":
x=func()
result = x.run(10)   
print('We unpause for 2 sec')
x.resume()   # unpause
time.sleep(2)
print('We pause for 2 sec')
x.hold() # pause
time.sleep(2)
print('We unpause for 2 sec')
x.resume()   # unpause
time.sleep(2)
print('Now we try to terminate in 2 sec')
time.sleep(2)
x.stop()
return_val = result.get()
print('get worked with '+str(return_val))

我添加了一个保持和恢复函数,并将设置函数放在一个类中。但是下面的示例甚至不再运行该函数。多么复杂的小问题啊。我对此感到困惑。

第2版:我尝试了一种变通方法来解决目前的发现。使用微访问.pool库时遇到了大麻烦。将其与USB连接一起使用并不简单。。。我提出了一个平庸的解决方法:

from multiprocessing.pool import ThreadPool
import time
class switch:
state = 1
s1 = switch()
def myFunction(arg):
i = 0
while s1.state == 1 or s1.state == 2 or s1.state == 3:
if s1.state == 1:
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
elif s1.state == 2:
print('we entered snippet mode (state 2)')
time.sleep(1)
x = s
return x
pool.close()
pool.join()
elif s1.state == 3:
while s1.state == 3:
time.sleep(1) 
print('holding (state 3)')
return s

if __name__ == "__main__":
global s
s=[]

print('we set the state in the class on top to ' +str(s1.state))
pool = ThreadPool(processes=1)
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we switch mode sir, buckle up')
time.sleep(5)
s1.state = 2
print('we switched for a snippet which is')
snippet = async_result.get()
print(str(snippet[-1])+' this snippet comes from main')
time.sleep(1)
print('now we return to see the full list in the end')
s1.state = 1
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we hold it')
time.sleep(5)
s1.state = 3
print('in 5 sec we exit')
time.sleep(5)
s1.state = 0
return_val = async_result.get()
print('Succsses if you see a list of numbers '+ str(return_val))

编辑3:

from multiprocessing.pool import ThreadPool
import time
class switch:
state = 1
s1 = switch()
def myFunction(arg):
i = 0
while s1.state == 1 or s1.state == 2:
if s1.state == 1:
print(arg+i)
s.append(arg+i)
i=i+1
time.sleep(1)
elif s1.state == 2:
print('we entered snippet mode (state 2)')
time.sleep(1)
x = s
return x
pool.close() #These are not relevant i guess.
pool.join() #These are not relevant i guess.
return s

if __name__ == "__main__":
global s
s=[]
print('we set the state in the class on top to ' +str(s1.state))
pool = ThreadPool(processes=1)
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we switch mode sir, buckle up')
time.sleep(5)
s1.state = 2
snippet = async_result.get()
print(str(snippet[-1])+' this snippet comes from main')
time.sleep(1)
print('now we return to see the full list in the end')
s1.state = 1
async_result = pool.apply_async(myFunction, (10,))
print('in 5 sec we exit')
time.sleep(5)
s1.state = 0
return_val = async_result.get()
print('Succsses if you see a list of numbers '+ str(return_val))

好吧,这就是我想到的。。。不伟大也不可怕。也许更可怕的一面(:

我讨厌在获取一段数据后必须调用函数pool.apply_async(myFunction,(10,((。目前只有ThreadingPool可以工作,在我的实际脚本中没有进一步的代码更改!

在我需要一个进程连续运行,同时偶尔做其他事情的情况下,我喜欢使用asyncio。这是我将如何处理的粗略草案

import asyncio

class MyObject:
def __init__(self):
self.mydatastructure = []
self.finished = False
self.loop = None

async def main_loop(self):
while not self.finished:
new_result = self.get_data()
self.mydatastructure.append(new_result)
await asyncio.sleep(0)

async def timed_loop(self):
while not self.finished:
await asyncio.sleep(2)
self.dotimedtask(self.mydatastructure)

async def run(self):
await asyncio.gather(self.main_loop(), self.timed_loop())

asyncio.run(MyObject().run())

一次只运行一个协同程序,定时协同程序每2秒安排一次。它总是获取最近连续执行中传递的数据。你也可以做一些事情,比如保持对象上的连接打开。根据您的要求(是间隔2秒,还是每隔一秒一次,无论需要多长时间(,有一些库包可以使调度更加优雅。

相关内容

  • 没有找到相关文章

最新更新