如何编写一个方法来检查代理是否完成了 Osbrain 中的任务?



我有一个关于如何编写一个正确的函数来检查代理是否在 Osbrain 中完成了他们的任务的问题。我有三个代理,传输代理、节点代理和协调代理。协调代理的主要任务是同步其他代理的操作。协调代理绑定到SYNC_PUB,节点和传输代理 SUB 绑定到协调代理。我的初始实现在第一次时间步/迭代后挂起。我是否错误地实现了status_checker方法?

from osbrain import run_nameserver, run_agent, Agent
import time
SYNCHRONIZER_CHANNEL_1 = 'coordinator1'

class TransportAgent(Agent):
def transportAgent_first_handler(self, message):
# time.sleep(2)
self.log_info(message)
self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')
def process_reply(self, message):
yield 1

class NodeAgent(Agent):
def NodeAgent_first_handler(self, message):
time.sleep(2)
self.log_info(message)
self.send(SYNCHRONIZER_CHANNEL_1, 'is_done', handler='process_reply')
def process_reply(self, message):
yield 1

class SynchronizerCoordinatorAgent(Agent):
def on_init(self):
self.network_agent_addr = self.bind('SYNC_PUB', alias=SYNCHRONIZER_CHANNEL_1, handler='status_handler')
self.status_list = []
def first_synchronization(self, time_step, iteration):
self.send(SYNCHRONIZER_CHANNEL_1, message={'time_step': time_step, 'iteration': iteration},
topic='first_synchronization')
def status_handler(self, message):
yield 'I have added you to the status_list'
self.status_list.append(message)
def status_checker(self):
count = 0
while len(self.status_list) < 2:
count += 1
time.sleep(1)
return
self.status_list.clear()

def init_environment(self):
self.TransportAgent = run_agent('TransportAgent', base=TransportAgent)
self.NodeAgent = run_agent('NodeAgent', base=NodeAgent)
self.TransportAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
handler={'first_synchronization': TransportAgent.transportAgent_first_handler})
self.NodeAgent.connect(self.network_agent_addr, alias=SYNCHRONIZER_CHANNEL_1,
handler={'first_synchronization': NodeAgent.NodeAgent_first_handler})

if __name__ == '__main__':
ns = run_nameserver()
synchronizer_coordinator_agent = run_agent('Synchronizer_CoordinatorAgent',
base=SynchronizerCoordinatorAgent)
synchronizer_coordinator_agent.init_environment()
for iteration in range(1, 2):
for time_step in range(0, 90, 30):
synchronizer_coordinator_agent.first_synchronization(time_step=time_step, iteration=iteration)
synchronizer_coordinator_agent.status_checker()
time.sleep(1)

它打印这个然后挂起

(网络代理(: {'time_step': 0, '迭代': 1}

(RMOAgent(: {'time_step': 0, '迭代': 1}

是的,看来您的status_checker()方法可能被破坏了。我猜您希望该方法阻止,直到status_list中有 2 条消息(一条来自节点代理,另一条来自传输代理(。

因此,您可能正在寻找类似以下内容的内容:

def status_checker(self):
while True:
if len(self.status_list) == 2:
break
time.sleep(1)
self.status_list.clear()

但是,当您从代理调用该方法时:

synchronizer_coordinator_agent.status_checker()

协调器正在阻止执行该调用,因此它不会处理其他传入消息。一个快速而肮脏的解决方法是使用如下所示的unsafe调用:

synchronizer_coordinator_agent.unsafe.status_checker()

我在这里看到的主要问题是您处理来自__main__的状态检查器的方式。您应该将同步/步骤移动到协调器。这意味着:

  • __main__调用协调器上的.start_iterations()方法,以便协调器执行第一步
  • 然后让你的协调器对传入的消息做出反应(一旦它收到 2 条消息,它就会执行下一步(
  • 协调器继续执行步骤,直到完成
  • 从main开始,只需定期监控您的协调代理以查看何时完成
  • 使用ns.shutdown()关闭系统

您的主图可能如下所示:

if __name__ == "__main__":
ns = run_nameserver()
coordinator = run_agent(...)
coordinator.init_environment()
coordinator.start_iterations()
while not coordinator.finished():
time.sleep(0.5)
ns.shutdown()

与此无关,但请注意,您当前的range(1, 2)只会导致一次迭代(尽管这可能是故意的(。如果需要 2 次迭代,可以使用range(2).

最新更新