对于下面的用例,我应该使用哪种go并发模式



我有两个go routines

  • NewFileEntryCheck((
  • UploadFromQueueToCloud((

go NewFileEntryCheck()

此函数将以fsnotify.Watcher()作为参数,并在无限循环中监视指定目录中的任何文件更改。

如果发生任何新的file CREATE事件,它会将文件名添加到Queue

go UploadFromQueueToCloud()

这个go例程应该从Queue中获取文件名,从路径中读取文件并将其上传到云。

现在,我的问题是,如何使UploadFromQueueToCloud()函数从队列中获取值并将其上传到云?

由于NewFileEntryCheck()函数将持续检查新文件并将其添加到队列中,因此队列的长度不是固定的。我无法通过len(Queue)循环并通过通道接收数据。

有没有其他方法可以实现这一点?

这只是pub子模型的一个常见示例。您有一个发布者(NewFileEntryCheck)和一个订阅者(UploadFromQueueToCloud)。因为这可以以Pub-Sub模型的形式进行建模,所以您应该进行类似的设计。

你可能已经收到@jimb的一条评论,询问Queue是否是一个频道。这是有原因的,因为golang通道的行为就像一个队列。因此,一个解决方案肯定是使用通道对其进行建模。

使用通道,您可以将一次性例程中的任务/文件名放入。从另一个go例程UploadFromQueueToCloud,您可以使用该通道并异步处理数据。

这是的样本代码

func UploadFromQueueToCloud(channel *chan string, exit *chan bool) {
for {
select {
case file := <- channel:
// Upload the cloud..
// Other business logics
case exit := <- exit:
break;
}
}
}
func NewFileEntryCheck(notifier *fsnotify.Watcher) {
channel := make(chan string)
exit := make(chan bool)
for {
if (some breaking condition) {
exit <- true;
break;
}
// Check for the file notifier
// If file CREATE happened,
// get the file name into FileName
channel <- FileName;
go UploadFromQueueToCloud(channel, exit)
}
} 

注意:您可以通过将通道更改为缓冲通道来提高吞吐量

希望能有所帮助。

我的案例是:

func main() {
allFileCheck := make(chan bool)
fileUpload := make(chan string)
ctx, cancel := context.WithCancel(context.Background())
go NewFileEntryCheck(ctx, fileUpload, allFileCheck)
L:
for {
select {
case file := <- fileUpload:
go UploadFromQueueToCloud(ctx, file)
case <-allFileCheck:
fmt.Println("done")
break L
case <-time.After(5 * time.Second):
cancel()
}
}
}
func UploadFromQueueToCloud(ctx context.Context, file string) {
fmt.Printf("Upload file %vn", file)
}
func NewFileEntryCheck(ctx context.Context, fileUpload chan string, allFileCheck chan bool) {
fmt.Println("Check file")
for i := 0; i < 5; i++ {
fileUpload <- "name " + strconv.Itoa(i)
}
allFileCheck <- true
}

最新更新