添加回调,而不是使用默认实现



我正在使用以下按预期工作的代码。

用户将一个新条目添加到配置testers(现在是硬编码的,但它将来自配置文件)一个新条目,该条目返回他需要检查并通过 http 调用并行运行它们的TAP列表。

我需要支持另一个用例,用户还将提供一个function/method/callback该函数将通过 http/curl/websocket/任何他需要的东西实现调用(而不是 check() 函数) 该函数将返回响应,无论它是 200/400/500。

例如,假设用户实现两个函数/回调,此外还有配置水龙头列表,程序将执行与testers列表相同的函数,这些函数将调用其他站点,例如:"http://www.yahoo.com"https://www.bing.com使用 curl 或 http(只是为了演示差异)甚至一些实现的方法检查以返回一些子进程执行结果。

我怎样才能以干净的方式做到这一点?

package main
import (
"fmt"
"net/http"
"time"
)
type HT interface {
Name() string
Check() (*testerResponse, error)
}
type testerResponse struct {
err  error
name string
res  http.Response
url  string
}
type Tap struct {
url     string
name    string
timeout time.Duration
client  *http.Client
}
func NewTap(name, url string, timeout time.Duration) *Tap {
return &Tap{
url:    url,
name:   name,
client: &http.Client{Timeout: timeout},
}
}
func (p *Tap) Check() testerResponse {
fmt.Printf("Fetching %s %s n", p.name, p.url)
// theres really no need for NewTap
nt := NewTap(p.name, p.url, p.timeout)
res, err := nt.client.Get(p.url)
if err != nil {
return testerResponse{err: err}
}
// need to close body
res.Body.Close()
return testerResponse{name: p.name, res: *res, url: p.url}
}
func (p *Tap) Name() string {
return p.name
}
// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- Tap, taps []Tap) {
for _, t := range taps {
jobs <- t
}
}
// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan testerResponse, taps []Tap) {
for range taps {
r := <-tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
}
fmt.Printf(status)
}
}
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan Tap, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
}
}
var (
testers = []Tap{
{
name:    "1",
url:     "http://google.com",
timeout: time.Second * 20,
},
{
name:    "3",
url:     "http://stackoverflow.com",
timeout: time.Second * 20,
},
}
)
func main() {
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan Tap, buffer)               // Jobs will be of type `Tap`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
// Create worker pool
// Max workers default is 5
maxWorkers := 5
for i := 0; i < maxWorkers; i++ {
go worker(jobsPipe, resultsPipe)
}
makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)
}

我尝试过以下更新https://play.golang.org/p/cRPPzke27dZ

但不确定如何调用custom handlerscheck()方法以并行调用(例如testers配置)从它们获取数据

更新 5(接受答案)


*既然你对这个问题感兴趣,你可能也对这个问题感兴趣。有关如何使用自动取消超时运行每个作业的更多信息,请参阅此处。*


要回答您的问题,您将如何添加随机函数。

我不知道你想返回什么类型,但你可以做任何你想做的事情。

大约有一百万种不同的方法可以做到这一点,这只是一个例子:

package main
import (
"encoding/json"
"fmt"
"github.com/gammazero/workerpool"
)
var (
numWorkers = 10
)
type MyReturnType struct {
Name string
Data interface{}
}
func wrapJob(rc chan MyReturnType, f func() MyReturnType) func() {
return func() {
rc <- f()
}
}
func main() {
// create results chan and worker pool
// should prob make your results channel typed to what you need
jobs := []func() MyReturnType {
func() MyReturnType {
// whatever you want to do here
return MyReturnType{Name: "job1", Data: map[string]string{"Whatever": "You want"}}
},
func() MyReturnType {
// whatever you want to do here
// do a curl or a kubectl or whatever you want
resultFromCurl := "i am a result"
return MyReturnType{Name: "job2", Data: resultFromCurl}
},
}
results := make(chan MyReturnType, len(jobs))
pool := workerpool.New(numWorkers)
for _, job := range jobs {
j := job
pool.Submit(wrapJob(results, j))
}
// Wait for all jobs to finish
pool.StopWait()
// Close results chan
close(results)
// Iterate over results, printing to console
for res := range results {
prettyPrint(res)
}
}
func prettyPrint(i interface{}) {
prettyJSON, err := json.MarshalIndent(i, "", "    ")
if err != nil {
fmt.Printf("Error : %s n", err.Error())
}
fmt.Printf("MyReturnType %sn", string(prettyJSON))
}

其中返回:

// MyReturnType {
//     "Name": "job2",
//     "Data": "i am a result"
// }
// MyReturnType {
//     "Name": "job1",
//     "Data": {
//         "Whatever": "You want"
//     }
// }

更新 4

在深入研究了几个小时之后,我建议使用类似的东西workerpool,你可以在这里找到。老实说,使用workerpool在这里似乎最有意义。它看起来已准备好生产,并且被少数相当大的名称使用(请参阅其存储库中的自述文件)。

我写了一个小例子来展示如何使用workerpool

package main
import (
"fmt"
"net/http"
"time"
"github.com/gammazero/workerpool"
)
var (
numWorkers = 10
urls       = []string{"yahoo.com", "example.com", "google.com"}
)
func main() {
// create results chan and worker pool
// should prob make your results channel typed to what you need
results := make(chan interface{}, len(urls))
pool := workerpool.New(numWorkers)
// Create jobs by iterating over urls
for i, u := range urls {
url := u
jobNum := i
// Create job
f := func() {
start := time.Now()
c := &http.Client{}
r, e := c.Get("http://" + url)
if e != nil {
fmt.Println(e.Error())
}
took := time.Since(start).Milliseconds()
o := fmt.Sprintf("completed job '%d' to '%s' in '%dms' with status code '%d'n", jobNum, url, took, r.StatusCode)
results <- o
}
// Add job to workerpool
pool.Submit(f)
}
// Wait for all jobs to finish
pool.StopWait()
// Close results chan
close(results)
// Iterate over results, printing to console
for res := range results {
fmt.Printf(res.(string))
}
}

哪些输出:

// completed job '1' to 'example.com' in '81ms' with status code '200'
// completed job '2' to 'google.com' in '249ms' with status code '200'
// completed job '0' to 'yahoo.com' in '816ms' with status code '200'

更新 3

我继续编写了一个工作线程池库(在workerpool的帮助下),因为我也想更深入地研究通道和并发设计。

您可以在此处找到存储库,并在下面找到代码。

如何使用:

pool := New(3)
pool.Job(func() {
c := &http.Client{}
r, e := c.Get("http://google.com")
if e != nil {
panic(e.Error())
}
fmt.Printf("To google.com %dn", r.StatusCode)
})
pool.Job(func() {
c := &http.Client{}
r, e := c.Get("http://yahoo.com")
if e != nil {
panic(e.Error())
}
fmt.Printf("To yahoo.com %dn", r.StatusCode)
})
pool.Job(func() {
c := &http.Client{}
r, e := c.Get("http://example.com")
if e != nil {
panic(e.Error())
}
fmt.Printf("To example.com %dn", r.StatusCode)
})
pool.Seal()

工作线程池(水坑)的代码

package puddle
import (
"container/list"
"fmt"
"net/http"
"sync"
"time"
)
const (
idleTimeout = time.Second * 2
)
// New creates a new puddle (aka worker pool)
func New(maxWorkers int) Puddle {
// There must be at least one worker
if maxWorkers < 1 {
maxWorkers = 1
}
p := &puddle{
maxWorkers: maxWorkers,
jobs:       make(chan func(), 1),
workers:    make(chan func()),
killswitch: make(chan struct{}),
}
// Start accepting/working jobs as they come in
go p.serve()
return p
}
// Puddle knows how to interact with worker pools
type Puddle interface {
Job(f func())
Seal()
}
// puddle is a worker pool that holds workers, tasks, and misc metadata
type puddle struct {
maxWorkers int
jobs       chan func()
workers    chan func()
killswitch chan struct{}
queue      List
once       sync.Once
stopped    int32
waiting    int32
wait       bool
}
// Job submits a new task to the worker pool
func (p *puddle) Job(f func()) {
if f != nil {
p.jobs <- f
}
}
// Seal stops worker pool and waits for queued tasks to complete
func (p *puddle) Seal() {
p.stop(true)
}
func (p *puddle) stop(wait bool) {
p.once.Do(func() {
p.wait = wait
// Close task queue and wait for currently running tasks to finish
close(p.jobs)
})
<-p.killswitch
}
func (p *puddle) killWorkerIfIdle() bool {
select {
case p.workers <- nil:
// Kill worker
return true
default:
// No ready workers
return false
}
}
// process puts new jobs onto the queue, and removes jobs from the queue as workers become available.
// Returns false if puddle is stopped.
func (p *puddle) process() bool {
select {
case task, ok := <-p.jobs:
if !ok {
return false
}
p.queue.PushBack(task)
case p.workers <- p.queue.Front().Value.(func()):
// Give task to ready worker
p.queue.PopFront()
}
return true
}
func (p *puddle) serve() {
defer close(p.killswitch)
timeout := time.NewTimer(idleTimeout)
var workerCount int
var idle bool
Serving:
for {
if p.queue.Len() != 0 {
if !p.process() {
break Serving
}
continue
}
select {
case job, ok := <-p.jobs:
if !ok {
break Serving
}
// Give a task to our workers
select {
case p.workers <- job:
default:
// If we are not maxed on workers, create a new one
if workerCount < p.maxWorkers {
go startJob(job, p.workers)
workerCount++
} else {
// Place a task on the back of the queue
p.queue.PushBack(job)
}
}
idle = false
case <-timeout.C:
// Timed out waiting for work to arrive.  Kill a ready worker if
// pool has been idle for a whole timeout.
if idle && workerCount > 0 {
if p.killWorkerIfIdle() {
workerCount--
}
}
idle = true
timeout.Reset(idleTimeout)
}
}
// Allow queued jobs to complete
if p.wait {
p.work()
}
// Stop all workers before shutting down
for workerCount > 0 {
p.workers <- nil
workerCount--
}
timeout.Stop()
}
// work removes each task from the waiting queue and gives it to
// workers until queue is empty.
func (p *puddle) work() {
for p.queue.Len() != 0 {
// A worker is ready, so give task to worker.
p.workers <- p.queue.PopFront()
}
}
// startJob runs initial task, then starts a worker waiting for more.
func startJob(job func(), workerQueue chan func()) {
job()
go worker(workerQueue)
}
// worker executes tasks and stops when it receives a nil task.
func worker(queue chan func()) {
for job := range queue {
if job == nil {
return
}
job()
}
}
// List wraps `container/list`
type List struct {
list.List
}
// PopFront removes then returns first element in list as func()
func (l *List) PopFront() func() {
f := l.Front()
l.Remove(f)
return f.Value.(func())
}

更新 2

既然您询问如何使用代码,这就是您将如何这样做。

我把worker变成了它自己的包,并写了另一个回购来展示如何使用那个包。

  • 辅助角色包
  • 如何使用工作线程包

worker

package worker
import "fmt"
type JobResponse struct {
err  error
name string
res  int
url  string
}
type Job interface {
Name() string
Callback() JobResponse
}
func Do(jobs []Job, maxWorkers int) {
jobsPool := make(chan Job, len(jobs))
resultsPool := make(chan JobResponse, len(jobs))
for i := 0; i < maxWorkers; i++ {
go worker(jobsPool, resultsPool)
}
makeJobs(jobsPool, jobs)
getResults(resultsPool, jobs)
}
func worker(jobs <-chan Job, response chan<- JobResponse) {
for n := range jobs {
response <- n.Callback()
}
}
func makeJobs(jobs chan<- Job, queue []Job) {
for _, t := range queue {
jobs <- t
}
}
func getResults(response <-chan JobResponse, queue []Job) {
for range queue {
job := <-response
status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'n", job.name, job.url, job.res)
if job.err != nil {
status = fmt.Sprintf(job.err.Error())
}
fmt.Printf(status)
}
}

如何使用工作线程包

package main
import (
"github.com/oze4/worker"
)
func main() {
jobs := []worker.Job{
AddedByUser{name: "1"},
AddedByUser{name: "2"},
AddedByUser{name: "3"},
AddedByUser{name: "4"},
AddedByUser{name: "5"},
AddedByUser{name: "6"},
}

worker.Do(jobs, 5)
}
type AddedByUser struct {
name string
}
func (a AddedByUser) Name() string {
return a.name
}
func (a AddedByUser) Callback() worker.JobResponse {
// User added func/callback goes here
return worker.JobResponse{}
}

更新

我重命名了一些东西,希望能帮助它更清楚一点。

这是您需要的基础知识:

package main
import (
"fmt"
)
func main() {
fmt.Println("Hello, playground")
}
type JobResponse struct {
err  error
name string
res  int
url  string
}
type Job interface {
Name() string
Callback() JobResponse
}
func worker(jobs <-chan Job, response chan<- JobResponse) {
for n := range jobs {
response <- n.Callback()
}
}
func makeJobs(jobs chan<- Job, queue []Job) {
for _, t := range queue {
jobs <- t
}
}
func getResults(response <-chan JobResponse, queue []Job) {
for range queue {
j := <-response
status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'n", j.name, j.url, j.res)
if j.err != nil {
status = fmt.Sprintf(j.err.Error())
}
fmt.Printf(status)
}
}

只要我满足Job接口,我就可以将其传递给worker,makeJobs和getResults:

type AddedByUser struct {
name string
}
func (a AddedByUser) Name() string {
return a.name
}
func (a AddedByUser) Callback() JobResponse {
// User added func/callback goes here
return JobResponse{}
}

这样:

package main
import (
"fmt"
)
func main() {
jobsPool := make(chan Job, len(testers))
resultsPool := make(chan JobResponse, len(testers))
maxWorkers := 5
for i := 0; i < maxWorkers; i++ {
go worker(jobsPool, resultsPool)
}
makeJobs(jobsPool, testers)
getResults(resultsPool, testers)
}
var testers = []Job{
AddedByUser{name: "abu"}, // Using different types in Job
Tap{name: "tap"},         // Using different types in Job
}
type AddedByUser struct {
name string
}
func (a AddedByUser) Name() string {
return a.name
}
func (a AddedByUser) Callback() JobResponse {
// User added func/callback goes here
return JobResponse{}
}
type Tap struct {
name string
}
func (t Tap) Name() string {
return t.name
}
func (t Tap) Callback() JobResponse {
// User added func/callback goes here
return JobResponse{}
}
type JobResponse struct {
err  error
name string
res  int
url  string
}
type Job interface {
Name() string
Callback() JobResponse
}
func worker(jobs <-chan Job, response chan<- JobResponse) {
for n := range jobs {
response <- n.Callback()
}
}
func makeJobs(jobs chan<- Job, queue []Job) {
for _, t := range queue {
jobs <- t
}
}
func getResults(response <-chan JobResponse, queue []Job) {
for range queue {
job := <-response
status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'n", job.name, job.url, job.res)
if job.err != nil {
status = fmt.Sprintf(job.err.Error())
}
fmt.Printf(status)
}
}

原始答案

[添加此答案是因为OP和我一直在这个线程之外谈论]

你的代码中有几个错误,但归根结底,你所要做的就是接受人们给你的建议。你只需要连接这些点。我建议对代码进行故障排除并尝试完全了解问题所在。老实说,这是唯一的学习方式。

我记得的最大问题是:

  • 需要修改您的HT接口,以便Check(...)签名与每种方法匹配
    • 否则,这些结构(TapTap1Tap2)不满足HT接口,因此不实现HT
  • worker(...)makeJobs(...)getResults(...)中的参数类型从[]Tap更改为[]HT
  • 您没有将所有水龙头聚合到一个切片中
    • 我们可以将所有不同的 Taps 用作 HT 的唯一原因是因为它们都实现了 HT

像这样的东西是你要找的吗?

https://play.golang.org/p/zLmKOKAnX4C

package main
import (
"fmt"
"net/http"
// "os/exec"
"time"
)
type HT interface {
Name() string
Check() testerResponse
}
type testerResponse struct {
err  error
name string
//res  http.Response
res int
url string
}
type Tap struct {
url     string
name    string
timeout time.Duration
client  *http.Client
}
func (p *Tap) Check() testerResponse {
fmt.Printf("[job][Tap1] Fetching %s %s n", p.name, p.url)
p.client = &http.Client{Timeout: p.timeout}
res, err := p.client.Get(p.url)
if err != nil {
return testerResponse{err: err}
}
// need to close body
res.Body.Close()
return testerResponse{name: p.name, res: res.StatusCode, url: p.url}
}
func (p *Tap) Name() string {
return p.name
}
// ---- CUSTOM CHECKS-------------
// ---- 1. NEW specific function -------------
type Tap2 struct {
url     string
name    string
timeout time.Duration
client  *http.Client
}
func (p *Tap2) Check() testerResponse {
// Do some request here.....
fmt.Printf("[job][Tap2] Fetching %s %s n", p.name, p.url)
return testerResponse{res: 200, url: p.url, name: p.name}
}
func (p *Tap2) Name() string {
return "yahoo custom check"
}
// ---- 2. NEW specific function which is not running http
type Tap3 struct {
url     string
name    string
timeout time.Duration
client  *http.Client
}
func (p *Tap3) Check() testerResponse {
// Do some request here....
fmt.Printf("[job][Tap3] Fetching %s %s n", p.name, p.url)
return testerResponse{res: 200, url: p.url, name: p.name}
}
func (p *Tap3) Name() string {
return "custom check2"
}
// makeJobs fills up our jobs channel
func makeJobs(jch chan<- HT, jobs []HT) {
for _, t := range jobs {
jch <- t
}
}
// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan testerResponse, jobs []HT) []testerResponse {
var rts []testerResponse
var r testerResponse
for range jobs {
r = <-tr
status := fmt.Sprintf("[result] '%s' to '%s' was fetched with status '%d'n", r.name, r.url, r.res)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
}
fmt.Printf(status)
rts = append(rts, r)
}
return rts
}
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan HT, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
}
}
var (
testers1 = []Tap{
{
name:    "First Tap1",
url:     "http://google.com",
timeout: time.Second * 20,
},
{
name:    "Second Tap1",
url:     "http://stackoverflow.com",
timeout: time.Second * 20,
},
}
testers2 = []Tap2{
{
name: "First Tap2",
url:  "http://1.tap2.com",
},
{
name: "Second Tap2",
url:  "http://2.tap2.com",
},
}
testers3 = []Tap3{
{
name: "First Tap3",
url:  "http://1.tap3.com",
},
{
name: "Second Tap3",
url:  "http://2.tap3.com",
},
}
)
func main() {
// Aggregate all testers into one slice
var testers []HT
for _, t1 := range testers1 {
testers = append(testers, &t1)
}
for _, t2 := range testers2 {
testers = append(testers, &t2)
}
for _, t3 := range testers3 {
testers = append(testers, &t3)
}
// Make buffered channels
buffer := len(testers)
jobsPipe := make(chan HT, buffer)                // Jobs will be of type `HT`
resultsPipe := make(chan testerResponse, buffer) // Results will be of type `testerResponse`
// Create worker pool
// Max workers default is 5
maxWorkers := 5
for i := 0; i < maxWorkers; i++ {
go worker(jobsPipe, resultsPipe)
}
makeJobs(jobsPipe, testers)
getResults(resultsPipe, testers)
//fmt.Println("at the end",tr)
}

哪些输出:

// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com 
// [job][Tap2] Fetching Second Tap2 http://2.tap2.com 
// [job][Tap3] Fetching Second Tap3 http://2.tap3.com 
// [job][Tap3] Fetching Second Tap3 http://2.tap3.com 
// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'
// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'
// [result] 'Second Tap3' to 'http://2.tap3.com' was fetched with status '200'
// [job][Tap2] Fetching Second Tap2 http://2.tap2.com 
// [job][Tap1] Fetching Second Tap1 http://stackoverflow.com 
// [result] 'Second Tap2' to 'http://2.tap2.com' was fetched with status '200'
// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'
// [result] 'Second Tap1' to 'http://stackoverflow.com' was fetched with status '200'

据我了解,您希望您的员工接受其他测试人员

查看代码后,您似乎将所有部分都放在正确的位置,并且需要在此处进行一些小的更改

// makeJobs fills up our jobs channel
func makeJobs(jobs chan<- HT, taps []Tap) {
for _, t := range taps {
jobs <- t
}
}
// getResults takes a job from our jobs channel, gets the result, and
// places it on the results channel
func getResults(tr <-chan HT, taps []Tap) {
for range taps {
r := <-tr
status := fmt.Sprintf("'%s' to '%s' was fetched with status '%d'n", r.name, r.url, r.res.StatusCode)
if r.err != nil {
status = fmt.Sprintf(r.err.Error())
}
fmt.Printf(status)
}
}
// worker defines our worker func. as long as there is a job in the
// "queue" we continue to pick up  the "next" job
func worker(jobs <-chan HT, results chan<- testerResponse) {
for n := range jobs {
results <- n.Check()
}
}

现在,如果您看到您的作业队列可以接受实现 HT 接口的任何类型,所以如果您想要一个新的作业,请说 Tap2,您只需

type Tap2 struct{...}
func (p *Tap2) Check() testerResponse {...}
func (p *Tap) Name() string {...}

现在,您可以将 Tap 和 Tap2 推送到与作业相同的作业队列队列 队列接受实现 HT 的任何类型

最新更新