在单独的进程上运行Goroutines(多处理)



我目前有一个MQTT代码,它可以订阅一个主题,打印出收到的消息,然后将进一步的指令发布到一个新的主题。订阅/打印在一个Goroutine中完成,发布则在另一个Gorroutine中完成。这是我的代码:

var wg, pg sync.WaitGroup
// All messages are handled here - printing published messages and publishing new messages
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
wg.Add(1)
pg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("%sn", msg.Payload())
//fmt.Println(os.Getpid())
}()
go func(){
defer pg.Done()
message := ""
//Changing configurations
if strings.Contains(string(msg.Payload()), "arduinoLED") == true {
message = fmt.Sprintf("change configuration")
}
if  strings.Contains(string(msg.Payload()), "NAME CHANGED") == true{
message = fmt.Sprintf("change back")
}
// Publish further instructions to "sensor/instruction"
token := client.Publish("sensor/instruction", 0, false, message)
//fmt.Println(os.Getpid())
token.Wait()
}()
}
func main() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
opts := MQTT.NewClientOptions().AddBroker("tcp://test.mosquitto.org:1883")
opts.SetDefaultPublishHandler(f)
// Topic to subscribe to for sensor data
topic := "sensor/data"
opts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(topic, 0, f); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
// Creating new client
client := MQTT.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
fmt.Printf("Connected to servern")
}
wg.Wait()
pg.Wait()
<-c
}

注释掉的os.Getpid()行是为了检查我在哪个进程上运行Goroutine。现在它们都显示相同的数字(这意味着它们都在同一个进程上运行?)。

我的问题是:如何在独立的进程上运行这两个Goroutine?有办法吗?

编辑:如果无法完成,我想使用通道编写此代码。这是我的代码:

var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
sensorData := make(chan []byte)
wg.Add(1)
pg.Add(1)
go func() {
defer wg.Done()
//fmt.Printf("%sn", msg.Payload())
sensorData <- string(msg.Payload())
fmt.Println(<-sensorData) //currently not printing anything
}()
go func(){
defer pg.Done()
message := ""
//Changing configurations
if strings.Contains(<-sensorData, "arduinoLED") == true{
message = fmt.Sprintf("change configuration")
}
if strings.Contains(<-sensorData, "NAME CHANGED") == true{
message = fmt.Sprintf("change back")
}
// Publish further instructions to "sensor/instruction"
token := client.Publish("sensor/instruction", 0, false, message)
token.Wait()
}()
}

但是,我无法使用通道打印出任何数据。我做错了什么?

您可能来自Python,对吧?;-)

它的模块名为multiprocessing在它的stdlib中,这可能很好地解释了您为什么使用你问题标题中的这个名字,以及你为什么明显很难理解@JimB说的意思

如果你需要一个单独的过程,你需要自己执行

";"多处理";在Python中

问题是,Python的multiprocessing是一个相当高级的藏在引擎盖下的东西很多。当您生成multiprocessing.Process并使其运行时一个函数,真正发生的是:

  1. Python解释器创建另一个操作系统的过程(使用类Unix系统上的fork(2)或Windows上的CreateProcess)并进行排列它也可以执行Python interter。

    关键的一点是,您现在将有两个过程运行两个Python interpers。

  2. 在子进程有一种与Python通信的方式父进程中的解释器。

    这个";通信链路">必然涉及某种形式提到的IPC@JimB。根本没有其他方式来传达数据和行动之间的独立过程正是因为商品现代操作系统提供了严格的过程分离。

  3. 当您在进程之间交换Python对象时解释器串行化反序列化在通过IPC链路发送之前和接收之后相应地,从那里开始。这是使用pickle模块实现的。

返回页首

Go没有任何直接的解决方案匹配Python的multiprocessing,我真的怀疑它是否能已合理实施。

这主要是因为Go比Python低得多,因此它没有拥有Python的奢侈,可以对它管理的价值观类型,它也努力拥有在其结构中尽可能少的隐藏成本。

Go还努力避开";框架风格";方法以解决问题;图书馆风格";解决方案可能的("框架与图书馆"的简要介绍;例如,在这里给出。)Go的标准库中有所有要实现的内容类似于Python的multiprocessing,但没有现成的框架式解决方案。

因此,你可以做的是沿着以下路线进行:

  1. 使用os/exec运行您自己进程的另一个副本。

    • 确保派生的进程";知道";它开始了在特殊的";从";模式——采取相应的行动
    • 使用任何形式的IPC与新流程进行通信。通过标准I/O流交换数据儿童过程的最简单的滚动方式(除非需要交换打开了文件,但这是一个更难的主题,所以我们不要离题)
  2. 使用encoding/层次结构中任何合适的包(如binarygobxml)进行序列化并在交换时对数据进行反序列化。

    ";转到";解决方案应该是encoding/gob但是CCD_ 16也会做得很好。

  3. 发明并实现一个简单的协议来告诉子进程要做什么以及使用哪些数据,以及如何将结果传达回master。

真的值得麻烦吗

我想说不,它没有——原因有很多:

  • Go没有什么比可怕的GIL更好的了,因此,没有必要回避它来实现真正的并行性当它是自然可能的时候。

  • 记忆安全掌握在你手中,实现它当你尽职尽责地遵守原则时,就没那么难了通过通道发送的内容现在归所有接收器换句话说,通过通道发送值也是这些价值的所有权转移。

  • Go工具链集成了种族检测器,因此您可以使用-race标志运行测试套件并创建评估使用go build -race构建程序目的:当以这种方式插入的程序运行时,种族检测器一检测到任何不同步的读/写存储器访问。该崩溃导致的打印输出包括关于什么、哪里出错的解释性消息,具有堆栈跟踪。

  • IPC是缓慢的,所以收益很可能被损失所抵消。

总而言之,除非你在写一个类似于电子邮件处理服务器的东西这一概念自然产生。

Channel用于goroutine之间的通信,您不应该在同一goroutine中使用它,如以下代码:

sensorData <- string(msg.Payload())
fmt.Println(<-sensorData) //currently not printing anything

如果你喜欢按通道测试打印,你可以在同一个goroutine中使用缓冲通道来避免阻塞,比如:

sensorData := make(chan []byte, 1)

干杯

好的,这里有两件事。当您使用goroutines时,go运行时会根据程序的需要对其进行调度。在您的示例程序中,不需要在不同的进程上调度goroutine,因此go运行时不使用单独的进程。

如果同时发送和接收更多的事件,那么go运行时最终可能会使用单独的进程。

goroutines抽象的意义在于,作为开发人员,您不必担心流程。此外,goroutines的实现比进程甚至线程要轻得多。

尽管如此,你的问题的解决方案并不涉及过程。

如果您通过值而不是通过引用传递数据,go将复制该值,以防止程序的两个部分编辑同一内存。

https://go.dev/tour/methods/4

假设您使用eclipse/paho.mqt.golang库,则消息是一个结构,有效负载是一个字节数组(https://github.com/eclipse/paho.mqtt.golang/blob/master/message.go#L41)

为了防止编辑有效负载,您可以对其进行复制或将其强制转换为字符串。

这应该不那么复杂,而且比生成额外的过程更容易理解。

相关内容

  • 没有找到相关文章