我有两个go routines
。
- NewFileEntryCheck((
- UploadFromQueueToCloud((
go NewFileEntryCheck()
此函数将以fsnotify.Watcher()
作为参数,并在无限循环中监视指定目录中的任何文件更改。
如果发生任何新的file CREATE
事件,它会将文件名添加到Queue
go UploadFromQueueToCloud()
这个go例程应该从Queue
中获取文件名,从路径中读取文件并将其上传到云。
现在,我的问题是,如何使UploadFromQueueToCloud()
函数从队列中获取值并将其上传到云?
由于NewFileEntryCheck()
函数将持续检查新文件并将其添加到队列中,因此队列的长度不是固定的。我无法通过len(Queue)
循环并通过通道接收数据。
有没有其他方法可以实现这一点?
这只是pub子模型的一个常见示例。您有一个发布者(NewFileEntryCheck)
和一个订阅者(UploadFromQueueToCloud)
。因为这可以以Pub-Sub模型的形式进行建模,所以您应该进行类似的设计。
你可能已经收到@jimb的一条评论,询问Queue
是否是一个频道。这是有原因的,因为golang通道的行为就像一个队列。因此,一个解决方案肯定是使用通道对其进行建模。
使用通道,您可以将一次性例程中的任务/文件名放入。从另一个go例程UploadFromQueueToCloud
,您可以使用该通道并异步处理数据。
这是的样本代码
func UploadFromQueueToCloud(channel *chan string, exit *chan bool) {
for {
select {
case file := <- channel:
// Upload the cloud..
// Other business logics
case exit := <- exit:
break;
}
}
}
func NewFileEntryCheck(notifier *fsnotify.Watcher) {
channel := make(chan string)
exit := make(chan bool)
for {
if (some breaking condition) {
exit <- true;
break;
}
// Check for the file notifier
// If file CREATE happened,
// get the file name into FileName
channel <- FileName;
go UploadFromQueueToCloud(channel, exit)
}
}
注意:您可以通过将通道更改为缓冲通道来提高吞吐量
希望能有所帮助。
我的案例是:
func main() {
allFileCheck := make(chan bool)
fileUpload := make(chan string)
ctx, cancel := context.WithCancel(context.Background())
go NewFileEntryCheck(ctx, fileUpload, allFileCheck)
L:
for {
select {
case file := <- fileUpload:
go UploadFromQueueToCloud(ctx, file)
case <-allFileCheck:
fmt.Println("done")
break L
case <-time.After(5 * time.Second):
cancel()
}
}
}
func UploadFromQueueToCloud(ctx context.Context, file string) {
fmt.Printf("Upload file %vn", file)
}
func NewFileEntryCheck(ctx context.Context, fileUpload chan string, allFileCheck chan bool) {
fmt.Println("Check file")
for i := 0; i < 5; i++ {
fileUpload <- "name " + strconv.Itoa(i)
}
allFileCheck <- true
}