使用了三个多处理器。self.state
标志未更新时current_time == self.scheduled_time
print("Time reached", self.state)
工作时current_time == self.scheduled_time
.但是printer_part()
多处理下的print(f"{self.state}")
不起作用,它总是打印False
import multiprocessing as mp
import time
from datetime import datetime
class test:
def __init__(self,scheduled_time):
self.scheduled_time = scheduled_time
self.state = False
def time_checker(self):
while True:
current_time = (datetime.now()).strftime("%H:%M:%S")
if current_time == self.scheduled_time:
self.state = True
print("Time reached", self.state)
def printer_part(self):
while True:
time.sleep(1)
print(f"{self.state}")
def start_rec(self):
print("Phase 1")
def main_pro(self):
print_proc = mp.Process(target=self.printer_part, args=())
checker_proc = mp.Process(target=self.time_checker, args=())
print_proc.start()
checker_proc.start()
while True:
start_proc = mp.Process(target=self.start_rec, args=())
start_proc.start()
time.sleep(1)
start_proc.terminate()
print_proc.terminate()
checker_proc.terminate()
if __name__ == "__main__":
scheduled_time = "02:17:00"
test_obj= test(scheduled_time)
test_obj.main_pro()
我不得不承认,我不太明白为什么你需要对像示例代码中这样的任务进行多处理。但这可能是由于我对上下文和/或您为使问题变得可呈现所做的简化不够了解。
进程可以通过多种方式相互通信:管道、队列和共享内存对象。在下面的尝试中,我使用了管道,因为这是一种简单的方法:
import multiprocessing as mp, time
from datetime import datetime
class Test:
def __init__(self, scheduled_time):
self.scheduled_time = scheduled_time
def time_checker(self, conn):
while True:
if datetime.now().strftime("%H:%M:%S") == self.scheduled_time:
conn.send(True)
print("time_checker: Time reached!")
break
print("time_checker: Still waiting...")
time.sleep(1)
def printer_part(self, conn):
while True:
if conn.poll():
state = conn.recv()
if state is True:
print("printer_part:", True)
break
print("printer_part:", False)
time.sleep(1)
def main_pro(self):
p_conn, c_conn = mp.Pipe(duplex=False)
print_proc = mp.Process(target=self.printer_part, args=(p_conn,))
checker_proc = mp.Process(target=self.time_checker, args=(c_conn,))
print_proc.start()
checker_proc.start()
print_proc.join()
checker_proc.join()
if __name__ == "__main__":
scheduled_time = "23:22:00"
test_obj = Test(scheduled_time)
test_obj.main_pro()
请注意,代码不是很优雅和高效。那是因为我的重点主要是展示什么是可能的。
也许它有帮助...