我有三个节点(1个主节点,2个从节点(,每个节点运行第N次迭代的循环,例如100。一旦主节点运行了它的第一次迭代,从节点运行了第一次迭代。我们有一个名为sum_of_all的变量,它基本上是一个计数器。由于每个循环有100次迭代,我们希望在从节点和主节点之间来回发送计数器值,这意味着主节点只有在从节点接收到确认或变量值时才会启动下一次迭代,而从节点只有在主节点完成了第二次迭代并更新并发送了新的sum_of_all值时才能运行第二次循环。
编程语言:Python、Bash
我试着在grpc和apache Kafka中探索这些功能,但我不确定哪一个能很好地工作,我已经用grpc制作了一个原型,但它太bug了,因为我没有见过其他应用程序用grpc这样。
tl;dr-我认为集中式数据库会更容易。
如果你想用Kafka来做这件事;"领导者";迭代完成时。两个";追随者";可以使用唯一的CCD_ 1值来单独跟踪该事件。当他们使用这个时,你在每个"中运行另一个迭代循环;"跟随者";,然后它们有自己的事件。
您将希望在";"领导者";使得";追随者";可以做出适当的反应。
例如,您至少需要3个主题
领导者事件
uuid1 - done # sent by the leader after iterations
跟随者状态事件
uuid1 - {"id":1, "state":"waiting"} # send when follower starts-up
uuid1 - {"id":1, "state":"processing"} # send when iteration starts
uuid1 - {"id":1, "state":"done"} # after first follower iterations
...
uuid1 - {"id":2, "state":"done"} # after second follower iterations
然后,您需要另一个使用者为匹配的uuid1
记录关键字创建一个基于N个state=="done"
事件的聚合主题。
您可以使用Kafka Streams/ksqlDB对数据执行reduce()
/aggregate()
函数,并为每个UUID构建最终状态的KTable。
聚合事件(制作压缩主题(
uuid1 - 1 # after first follower is finished
uuid1 - 2 # after second follower is finished
你的领导者将使用这个聚合主题,并检查计数器以了解何时开始下一个迭代循环,然后它将生成一个新的ID并再次启动流程。
如果你想要一个全球性的";全部总和";计数器,然后可以在aggregate-events
主题上运行max
函数。