

用户将一个新条目添加到配置testers(现在是硬编码的,但它将来自配置文件)一个新条目,该条目返回他需要检查并通过 http 调用并行运行它们的TAP列表。

我需要支持另一个用例,用户还将提供一个function/method/callback该函数将通过 http/curl/websocket/任何他需要的东西实现调用(而不是 check() 函数) 该函数将返回响应,无论它是 200/400/500。

例如,假设用户实现两个函数/回调,此外还有配置水龙头列表,程序将执行与testers列表相同的函数,这些函数将调用其他站点,例如:"http://www.yahoo.com"https://www.bing.com使用 curl 或 http(只是为了演示差异)甚至一些实现的方法检查以返回一些子进程执行结果。


package main
import (
type HT interface {
Name() string
Check() (*testerResponse, error)
type testerResponse struct {
err  error
name string
res  http.Response
url  string
type Tap struct {
url     string
name    string
timeout time.Duration
client  *http.Client
func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url:    url,
name:   name,
client: &http.Client{Timeout: timeout},
func (p *Tap) Check() testerResponse {
fmt.Printf("Fetching %s %s n", p.name, p.url)
// theres really no need for NewTap
nt := NewTap(p.name, p.url, p.timeout)
res, err := nt.client.Get(p.url)
if err != nil {
return testerResponse{err: err}
// need to close body
return testerResponse{name: p.name, res: *res, url: p.url}
func (p *Tap) Name() string {
return p.name
// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
for _, t := range taps {
jobs <- t
// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan testerResponse, taps []Tap) {
for range taps {
r := <-tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
var (
testers = []Tap{
name:    "1",
url:     "http://google.com",
timeout: time.Second * 20,
name:    "3",
url:     "http://stackoverflow.com",
timeout: time.Second * 20,
func main() {
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
// Create worker pool
// Max workers default is 5
maxWorkers := 5
for i := 0; i < maxWorkers; i++ {
go worker(jobsPipe, resultsPipe)
makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)


但不确定如何调用custom handlerscheck()方法以并行调用(例如testers配置)从它们获取数据

更新 5(接受答案)





package main
import (
var (
numWorkers = 10
type MyReturnType struct {
Name string
Data interface{}
func wrapJob(rc chan MyReturnType, f func() MyReturnType) func() {
return func() {
rc <- f()
func main() {
// create results chan and worker pool
// should prob make your results channel typed to what you need
jobs := []func() MyReturnType {
func() MyReturnType {
// whatever you want to do here
return MyReturnType{Name: "job1", Data: map[string]string{"Whatever": "You want"}}
func() MyReturnType {
// whatever you want to do here
// do a curl or a kubectl or whatever you want
resultFromCurl := "i am a result"
return MyReturnType{Name: "job2", Data: resultFromCurl}
results := make(chan MyReturnType, len(jobs))
pool := workerpool.New(numWorkers)
for _, job := range jobs {
j := job
pool.Submit(wrapJob(results, j))
// Wait for all jobs to finish
// Close results chan
// Iterate over results, printing to console
for res := range results {
func prettyPrint(i interface{}) {
prettyJSON, err := json.MarshalIndent(i, "", "    ")
if err != nil {
fmt.Printf("Error : %s n", err.Error())
fmt.Printf("MyReturnType %sn", string(prettyJSON))


// MyReturnType {
//     "Name": "job2",
//     "Data": "i am a result"
// }
// MyReturnType {
//     "Name": "job1",
//     "Data": {
//         "Whatever": "You want"
//     }
// }

更新 4



package main
import (
var (
numWorkers = 10
urls       = []string{"yahoo.com", "example.com", "google.com"}
func main() {
// create results chan and worker pool
// should prob make your results channel typed to what you need
results := make(chan interface{}, len(urls))
pool := workerpool.New(numWorkers)
// Create jobs by iterating over urls
for i, u := range urls {
url := u
jobNum := i
// Create job
f := func() {
start := time.Now()
c := &http.Client{}
r, e := c.Get("http://" + url)
if e != nil {
took := time.Since(start).Milliseconds()
o := fmt.Sprintf("completed job '%d' to '%s' in '%dms' with status code '%d'n", jobNum, url, took, r.StatusCode)
results <- o
// Add job to workerpool
// Wait for all jobs to finish
// Close results chan
// Iterate over results, printing to console
for res := range results {


// completed job '1' to 'example.com' in '81ms' with status code '200'
// completed job '2' to 'google.com' in '249ms' with status code '200'
// completed job '0' to 'yahoo.com' in '816ms' with status code '200'

更新 3




pool := New(3)
pool.Job(func() {
c := &http.Client{}
r, e := c.Get("http://google.com")
if e != nil {
fmt.Printf("To google.com %dn", r.StatusCode)
pool.Job(func() {
c := &http.Client{}
r, e := c.Get("http://yahoo.com")
if e != nil {
fmt.Printf("To yahoo.com %dn", r.StatusCode)
pool.Job(func() {
c := &http.Client{}
r, e := c.Get("http://example.com")
if e != nil {
fmt.Printf("To example.com %dn", r.StatusCode)


package puddle
import (
const (
idleTimeout = time.Second * 2
// New creates a new puddle (aka worker pool)
func New(maxWorkers int) Puddle {
// There must be at least one worker
if maxWorkers < 1 {
maxWorkers = 1
p := &puddle{
maxWorkers: maxWorkers,
jobs:       make(chan func(), 1),
workers:    make(chan func()),
killswitch: make(chan struct{}),
// Start accepting/working jobs as they come in
go p.serve()
return p
// Puddle knows how to interact with worker pools
type Puddle interface {
Job(f func())
// puddle is a worker pool that holds workers, tasks, and misc metadata
type puddle struct {
maxWorkers int
jobs       chan func()
workers    chan func()
killswitch chan struct{}
queue      List
once       sync.Once
stopped    int32
waiting    int32
wait       bool
// Job submits a new task to the worker pool
func (p *puddle) Job(f func()) {
if f != nil {
p.jobs <- f
// Seal stops worker pool and waits for queued tasks to complete
func (p *puddle) Seal() {
func (p *puddle) stop(wait bool) {
p.once.Do(func() {
p.wait = wait
// Close task queue and wait for currently running tasks to finish
func (p *puddle) killWorkerIfIdle() bool {
select {
case p.workers <- nil:
// Kill worker
return true
// No ready workers
return false
// process puts new jobs onto the queue, and removes jobs from the queue as workers become available.
// Returns false if puddle is stopped.
func (p *puddle) process() bool {
select {
case task, ok := <-p.jobs:
if !ok {
return false
case p.workers <- p.queue.Front().Value.(func()):
// Give task to ready worker
return true
func (p *puddle) serve() {
defer close(p.killswitch)
timeout := time.NewTimer(idleTimeout)
var workerCount int
var idle bool
for {
if p.queue.Len() != 0 {
if !p.process() {
break Serving
select {
case job, ok := <-p.jobs:
if !ok {
break Serving
// Give a task to our workers
select {
case p.workers <- job:
// If we are not maxed on workers, create a new one
if workerCount < p.maxWorkers {
go startJob(job, p.workers)
} else {
// Place a task on the back of the queue
idle = false
case <-timeout.C:
// Timed out waiting for work to arrive.  Kill a ready worker if
// pool has been idle for a whole timeout.
if idle && workerCount > 0 {
if p.killWorkerIfIdle() {
idle = true
// Allow queued jobs to complete
if p.wait {
// Stop all workers before shutting down
for workerCount > 0 {
p.workers <- nil
// work removes each task from the waiting queue and gives it to
// workers until queue is empty.
func (p *puddle) work() {
for p.queue.Len() != 0 {
// A worker is ready, so give task to worker.
p.workers <- p.queue.PopFront()
// startJob runs initial task, then starts a worker waiting for more.
func startJob(job func(), workerQueue chan func()) {
go worker(workerQueue)
// worker executes tasks and stops when it receives a nil task.
func worker(queue chan func()) {
for job := range queue {
if job == nil {
// List wraps `container/list`
type List struct {
// PopFront removes then returns first element in list as func()
func (l *List) PopFront() func() {
f := l.Front()
return f.Value.(func())

更新 2



  • 辅助角色包
  • 如何使用工作线程包


package worker
import "fmt"
type JobResponse struct {
err  error
name string
res  int
url  string
type Job interface {
Name() string
Callback() JobResponse
func Do(jobs []Job, maxWorkers int) {
jobsPool := make(chan Job, len(jobs))
resultsPool := make(chan JobResponse, len(jobs))
for i := 0; i < maxWorkers; i++ {
go worker(jobsPool, resultsPool)
makeJobs(jobsPool, jobs)
getResults(resultsPool, jobs)
func worker(jobs <-chan Job, response chan<- JobResponse) {
for n := range jobs {
response <- n.Callback()
func makeJobs(jobs chan<- Job, queue []Job) {
for _, t := range queue {
jobs <- t
func getResults(response <-chan JobResponse, queue []Job) {
for range queue {
job := <-response
status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'n", job.name, job.url, job.res)
if job.err != nil {
status = fmt.Sprintf(job.err.Error())


package main
import (
func main() {
jobs := []worker.Job{
AddedByUser{name: "1"},
AddedByUser{name: "2"},
AddedByUser{name: "3"},
AddedByUser{name: "4"},
AddedByUser{name: "5"},
AddedByUser{name: "6"},

worker.Do(jobs, 5)
type AddedByUser struct {
name string
func (a AddedByUser) Name() string {
return a.name
func (a AddedByUser) Callback() worker.JobResponse {
// User added func/callback goes here
return worker.JobResponse{}




package main
import (
func main() {
fmt.Println("Hello, playground")
type JobResponse struct {
err  error
name string
res  int
url  string
type Job interface {
Name() string
Callback() JobResponse
func worker(jobs <-chan Job, response chan<- JobResponse) {
for n := range jobs {
response <- n.Callback()
func makeJobs(jobs chan<- Job, queue []Job) {
for _, t := range queue {
jobs <- t
func getResults(response <-chan JobResponse, queue []Job) {
for range queue {
j := <-response
status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'n", j.name, j.url, j.res)
if j.err != nil {
status = fmt.Sprintf(j.err.Error())


type AddedByUser struct {
name string
func (a AddedByUser) Name() string {
return a.name
func (a AddedByUser) Callback() JobResponse {
// User added func/callback goes here
return JobResponse{}


package main
import (
func main() {
jobsPool := make(chan Job, len(testers))
resultsPool := make(chan JobResponse, len(testers))
maxWorkers := 5
for i := 0; i < maxWorkers; i++ {
go worker(jobsPool, resultsPool)
makeJobs(jobsPool, testers)
getResults(resultsPool, testers)
var testers = []Job{
AddedByUser{name: "abu"}, // Using different types in Job
Tap{name: "tap"},         // Using different types in Job
type AddedByUser struct {
name string
func (a AddedByUser) Name() string {
return a.name
func (a AddedByUser) Callback() JobResponse {
// User added func/callback goes here
return JobResponse{}
type Tap struct {
name string
func (t Tap) Name() string {
return t.name
func (t Tap) Callback() JobResponse {
// User added func/callback goes here
return JobResponse{}
type JobResponse struct {
err  error
name string
res  int
url  string
type Job interface {
Name() string
Callback() JobResponse
func worker(jobs <-chan Job, response chan<- JobResponse) {
for n := range jobs {
response <- n.Callback()
func makeJobs(jobs chan<- Job, queue []Job) {
for _, t := range queue {
jobs <- t
func getResults(response <-chan JobResponse, queue []Job) {
for range queue {
job := <-response
status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'n", job.name, job.url, job.res)
if job.err != nil {
status = fmt.Sprintf(job.err.Error())





  • 需要修改您的HT接口,以便Check(...)签名与每种方法匹配
    • 否则,这些结构(TapTap1Tap2)不满足HT接口,因此不实现HT
  • worker(...)makeJobs(...)getResults(...)中的参数类型从[]Tap更改为[]HT
  • 您没有将所有水龙头聚合到一个切片中
    • 我们可以将所有不同的 Taps 用作 HT 的唯一原因是因为它们都实现了 HT



package main
import (
// "os/exec"
type HT interface {
Name() string
Check() testerResponse
type testerResponse struct {
err  error
name string
//res  http.Response
res int
url string
type Tap struct {
url     string
name    string
timeout time.Duration
client  *http.Client
func (p *Tap) Check() testerResponse {
fmt.Printf("[job][Tap1] Fetching %s %s n", p.name, p.url)
p.client = &http.Client{Timeout: p.timeout}
res, err := p.client.Get(p.url)
if err != nil {
return testerResponse{err: err}
// need to close body
return testerResponse{name: p.name, res: res.StatusCode, url: p.url}
func (p *Tap) Name() string {
return p.name
// ---- CUSTOM CHECKS-------------
// ---- 1. NEW specific function -------------
type Tap2 struct {
url     string
name    string
timeout time.Duration
client  *http.Client
func (p *Tap2) Check() testerResponse {
// Do some request here.....
fmt.Printf("[job][Tap2] Fetching %s %s n", p.name, p.url)
return testerResponse{res: 200, url: p.url, name: p.name}
func (p *Tap2) Name() string {
return "yahoo custom check"
// ---- 2. NEW specific function which is not running http
type Tap3 struct {
url     string
name    string
timeout time.Duration
client  *http.Client
func (p *Tap3) Check() testerResponse {
// Do some request here....
fmt.Printf("[job][Tap3] Fetching %s %s n", p.name, p.url)
return testerResponse{res: 200, url: p.url, name: p.name}
func (p *Tap3) Name() string {
return "custom check2"
// makeJobs fills up our jobs channel
func makeJobs(jch chan<- HT, jobs []HT) {
for _, t := range jobs {
jch <- t
// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan testerResponse, jobs []HT) []testerResponse {
var rts []testerResponse
var r testerResponse
for range jobs {
r = <-tr
status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'n", r.name, r.url, r.res)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
rts = append(rts, r)
return rts
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan HT, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
var (
testers1 = []Tap{
name:    "First Tap1",
url:     "http://google.com",
timeout: time.Second * 20,
name:    "Second Tap1",
url:     "http://stackoverflow.com",
timeout: time.Second * 20,
testers2 = []Tap2{
name: "First Tap2",
url:  "http://1.tap2.com",
name: "Second Tap2",
url:  "http://2.tap2.com",
testers3 = []Tap3{
name: "First Tap3",
url:  "http://1.tap3.com",
name: "Second Tap3",
url:  "http://2.tap3.com",
func main() {
// Aggregate all testers into one slice
var testers []HT
for _, t1 := range testers1 {
testers = append(testers, &t1)
for _, t2 := range testers2 {
testers = append(testers, &t2)
for _, t3 := range testers3 {
testers = append(testers, &t3)
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan HT, buffer)                // Jobs will be of type `HT`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
// Create worker pool
// Max workers default is 5
maxWorkers := 5
for i := 0; i < maxWorkers; i++ {
go worker(jobsPipe, resultsPipe)
makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)
//fmt.Println("at the end",tr)


// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com 
// [job][Tap2] Fetching Second Tap2 http://2.tap2.com 
// [job][Tap3] Fetching Second Tap3 http://2.tap3.com 
// [job][Tap3] Fetching Second Tap3 http://2.tap3.com 
// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'
// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'
// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'
// [job][Tap2] Fetching Second Tap2 http://2.tap2.com 
// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com 
// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'
// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'
// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'



// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- HT, taps []Tap) {
for _, t := range taps {
jobs <- t
// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan HT, taps []Tap) {
for range taps {
r := <-tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan HT, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()

现在,如果您看到您的作业队列可以接受实现 HT 接口的任何类型,所以如果您想要一个新的作业,请说 Tap2,您只需

type Tap2 struct{...}
func (p *Tap2) Check() testerResponse {...}
func (p *Tap) Name() string {...}

现在,您可以将 Tap 和 Tap2 推送到与作业相同的作业队列队列 队列接受实现 HT 的任何类型
