连续执行多个独立作业



我有一组相互独立的作业。因此,这些作业中的每一个都可以使用goroutines并发运行。请注意,一旦单个作业完成,它应该等待几秒钟,然后再次启动(适用于所有作业(,这将在一个循环中进行,直到Go API服务停止。还要注意,所有这些作业都执行相同的goroutine(进行REST调用(。在Go中实现这一点的最佳模式是什么。请注意,在我的服务关闭之前,我还想等待当前正在执行的作业完成。

如果我说得对,你正在寻找这样的东西这是一个带有使用者池的服务,用于同时执行作业。当一项作业完成后,它将在一段时间后再次重复,直到您停止服务。

type job struct {
id     int
result chan error
}
func newJob(id int) job {
return job{
id:     id,
result: make(chan error, 1),
}
}
type service struct {
pending chan job
consumerLimit  int
repeatInterval time.Duration
isClosed chan struct{}
shutdown chan chan error
}
func newService(repeatInterval time.Duration, consumerLimit int, pendingChannelSize int) *service {
s := &service{
pending:        make(chan job, pendingChannelSize),
consumerLimit:  consumerLimit,
repeatInterval: repeatInterval,
isClosed:       make(chan struct{}, consumerLimit),
shutdown:       make(chan chan error),
}
for i := 0; i < s.consumerLimit; i++ {
go s.consumer()
}
return s
}
func (s *service) do(ctx context.Context, job job) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.pending <- job:
return <-job.result
case <-s.isClosed:
return errors.New("service has been shut down")
}
}
func (s *service) consumer() {
for {
select {
case j := <-s.pending:
//Simulate working process
time.Sleep(time.Duration(rand.Intn(200)) + 200)
j.result <- nil
fmt.Println(fmt.Sprintf("job %v is done", j.id))
go func() {
//Repeat after a time
time.Sleep(s.repeatInterval)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := s.do(ctx, newJob(j.id)); err != nil {
fmt.Println(fmt.Errorf("failed to send job to repeat: %v", err))
}
}()
case result := <-s.shutdown:
result <- nil
return
}
}
}
func (s *service) close() error {
result := make(chan error, 1)
for i := 0; i < s.consumerLimit; i++ {
s.shutdown <- result
}
close(s.isClosed)
return <-result
}
func main() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
service := newService(time.Second, 5, 1000)
//Assign jobs
for i := 1; i < 10; i++ {
go func(i int) {
if err := service.do(context.Background(), newJob(i)); err != nil {
fmt.Println(fmt.Errorf("failed to send job: %v", err))
}
}(i)
}
select {
case <-interrupt:
switch err := service.close(); err {
case nil:
fmt.Println("service has been shutdown successfully")
default:
fmt.Println(fmt.Errorf("failed to graceful shut down service: %w", err))
}
return
}
}

如果我理解正确,您正在寻找这样的东西。

此代码将在循环中运行worker,worker作为一个组并行运行,直到您发送结束信号退出程序,但请等待当前循环完成后再退出。

func main() {
srv := server{
workers: 5,
}
srv.Run()
}
// inspired by: https://goinbigdata.com/golang-wait-for-all-goroutines-to-finish/#:~:text=A%20WaitGroup%20allows%20to%20wait,until%20all%20goroutines%20have%20finished.
func work(wg *sync.WaitGroup, i int) {
defer wg.Done()
rand.Seed(time.Now().UnixNano())
n := rand.Intn(10)
fmt.Printf("Worker %v: Startedn", i)
time.Sleep(time.Duration(n) * time.Second)
fmt.Printf("Worker %v: Finishedn", i)
}
type server struct {
running bool
workers int
}
func (srv *server) Run() {
done := make(chan bool, 1) // this channel
signalCh := make(chan os.Signal, 1) // this channel will get a signal on system call
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signalCh
srv.running = false
done <- true
}()
srv.running = true
for srv.running {
var wg sync.WaitGroup
for i := 0; i < srv.workers; i++ {
wg.Add(1)
go work(&wg, i)
}
wg.Wait()
}
<-done
}

您想要实现工作池。以下是创建工作人员池的简单方法。您可以根据自己的需求定制员工方法和工作类型。

package main
import (
"fmt"
"sync"
)
type Jobs struct {
ID string
// or anything you want to add
}
func main() {
jobs := make(Jobs)
var wg sync.WaitGroup
numWorker := 16
for i := 0; i < numWorker; i++ {
wg.Add(1)
go func() {
worker(jobs)
wg.Done()
}()
}
tasks := []Jobs{}
// inset your task here
for _, i := range tasks {
jobs <- i
}
close(jobs)
wg.Wait()
}
func worker(jobs chan Jobs) {
for job := range jobs {
// do whatever you want to do
doSomething(job)
}
}
func doSomething(job Jobs) {
fmt.Println(job)
}

最新更新