如何将多个goroutine同步到选定goroutine的终止(即Thread.join())



我在上一个问题中问过这个问题,但有些人觉得我最初的问题不够详细("你为什么想要一个定时条件等待??"),所以这里有一个更具体的问题。

我有一个goroutine正在运行,称之为服务器。它已经启动,将执行一段时间,并完成它的任务。然后,它将退出,因为它完成了。

在执行过程中,会启动大量其他goroutine。如果你愿意,可以称它们为"客户端"线程。他们运行步骤A和步骤B。然后,他们必须等待"服务器"goroutine在指定的时间内完成,如果"服务器没有完成"则退出并显示状态,如果完成则运行步骤C。

(请不要告诉我如何重组此工作流程。这是假设的,是给定的。无法更改。)

一种正常、合理的方法是让服务器线程用selectAll或Broadcast函数发出条件变量的信号,并让其他线程处于定时等待状态来监视条件变量。

func (s *Server) Join(timeMillis int) error {
s.mux.Lock()
defer s.mux.Unlock()
while !s.isFinished {
err = s.cond.Wait(timeMillis)
if err != nil {
stepC()
}
}
return err
}

其中,服务器将进入isFinished变为true的状态,并相对于互斥对象以原子方式广播条件变量。但这是不可忽略的,因为Go不支持定时条件等待。(但是有一个Broadcast())

那么,"以行动为中心"的方法是什么呢?我已经了解了所有的Go博客和文档,尽管它很明显,但这种模式或其等效模式从未出现,也没有对基本问题进行任何等效的"重构",即IPC风格的通道位于一个例程和另一个例程之间。是的,有扇入/扇出,但请记住,这些线索不断出现和消失。这应该很简单,而且至关重要的是,当mux通道的另一个"分支"(计时器)发出信号/时,成千上万的"等待状态"goroutine会一直等待服务器死亡。

请注意,上面的一些"客户端"可能在服务器goroutine启动之前启动(这是通常创建通道的时候),有些可能在期间出现,有些可能出现在之后。。。在所有情况下,它们都应该运行stepC,当且仅当服务器已经运行并在输入Join()函数后的时间毫秒后退出时。。。

一般来说,当有多个消费者时,渠道设施似乎非常缺乏。"首先建立一个侦听器映射到的通道的注册表"one_answers"有一个非常漂亮的递归数据结构,它通过它作为字段的通道发送自己"都是so.not.ok,作为漂亮、可靠、友好、明显的替代:等待(forSomeTime)

我认为可以通过在单个共享通道上进行选择,然后在完成后让服务器关闭它来完成您想要的操作。

假设我们创建了一个全球性的"退出通道",在所有goroutines中共享。它可以在创建"服务器"goroutine之前创建。重要的是,服务器goroutine从不向通道发送任何内容,而是简单地关闭它

现在客户端goroutines,只需执行以下操作:

select {
case <- ch:
fmt.Println("Channel closed, server is done!")
case <-time.After(time.Second):
fmt.Println("Timed out. do recovery stuff")
}

而服务器goroutine只是这样做:

close(ch)

更完整的示例:

package main
import(
"fmt"
"time"
)

func waiter(ch chan struct{}) {
fmt.Println("Doing stuff")
fmt.Println("Waiting...")
select {
case <- ch:
fmt.Println("Channel closed")
case <-time.After(time.Second):
fmt.Println("Timed out. do recovery stuff")
}
}

func main(){
ch := make(chan struct{})
go waiter(ch)
go waiter(ch)
time.Sleep(100*time.Millisecond)
fmt.Println("Closing channel")
close(ch)
time.Sleep(time.Second)
}

这可以抽象为以下实用程序API:

type TimedCondition struct {
ch chan struct{}
}
func NewTimedCondition()*TimedCondition {
return &TimedCondition {
ch: make(chan struct{}),
}
}
func (c *TimedCondition)Broadcast() {
close(c.ch)
}
func (c *TimedCondition)Wait(t time.Duration) error {
select {
// channel closed, meaning broadcast was called
case <- c.ch:
return nil
case <-time.After(t):
return errors.New("Time out")   
}
}

最新更新