go channels with kafka consumer



我是新手,开始学习频道。我正在使用融合的 kafka 消费者来创建功能消费者。我想完成的是将消息发送到缓冲通道 (2,000(...然后使用管道将通道中的消息写入 Redis 。我已经通过一个接一个地对消息进行println,直到它到达偏移量的末尾来工作,但是当我尝试添加通道时,它似乎在switch中击中了default:的情况,然后只是冻结。

看起来我也没有正确填充频道?此fmt.Println("count is: ", len(redisChnl))始终打印0

这是我到目前为止所拥有的:

// Example function-based high-level Apache Kafka consumer
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"os"
"os/signal"
"syscall"
"time"
"encoding/json"
"regexp"
"github.com/go-redis/redis"
"encoding/binary"
)
var client *redis.Client
func init() {
client = redis.NewClient(&redis.Options{
Addr:         ":6379",
DialTimeout:  10 * time.Second,
ReadTimeout:  30 * time.Second,
WriteTimeout: 30 * time.Second,
PoolSize:     10,
PoolTimeout:  30 * time.Second,
})
client.FlushDB()
}
type MessageFormat struct {
MetricValueNumber float64     `json:"metric_value_number"`
Path              string      `json:"path"`
Cluster           string      `json:"cluster"`
Timestamp         time.Time   `json:"@timestamp"`
Version           string      `json:"@version"`
Host              string      `json:"host"`
MetricPath        string      `json:"metric_path"`
Type              string      `json:"string"`
Region            string      `json:"region"`
}
//func redis_pipeline(ky string, vl string) {
//  pipe := client.Pipeline()
//
//  exec := pipe.Set(ky, vl, time.Hour)
//
//  incr := pipe.Incr("pipeline_counter")
//  pipe.Expire("pipeline_counter", time.Hour)
//
//  // Execute
//  //
//  //     INCR pipeline_counter
//  //     EXPIRE pipeline_counts 3600
//  //
//  // using one client-server roundtrip.
//  _, err := pipe.Exec()
//  fmt.Println(incr.Val(), err)
//  // Output: 1 <nil>
//}
func main() {

sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers":               "kafka.com:9093",
"group.id":                        "testehb",
"security.protocol":               "ssl",
"ssl.key.location":                "/Users/key.key",
"ssl.certificate.location":        "/Users/cert.cert",
"ssl.ca.location":                 "/Users/ca.pem",
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %sn", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %vn", c)
err = c.SubscribeTopics([]string{"jmx"}, nil)
redisMap := make(map[string]string)
redisChnl := make(chan []byte, 2000)
run := true
for run == true {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminatingn", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
//fmt.Printf("%% Message on %s:n%sn",
//  e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %vn", e.Headers)
}
str := e.Value
res := MessageFormat{}
json.Unmarshal([]byte(str), &res)

fmt.Println("size", binary.Size([]byte(str)))
host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)
redisMap[host] = string(str)
fmt.Println("count is: ", len(redisChnl)) //this always prints "count is:  0"
redisChnl <- e.Value //I think this is the write way to put the messages in the channel?
case kafka.PartitionEOF:
fmt.Printf("%% Reached %vn", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %vn", e)
run = false
default:
fmt.Printf("Ignored %vn", e)
}
<- redisChnl // I thought I could just empty the channel like this once the buffer is full?

}
}
fmt.Printf("Closing consumern")
c.Close()
}

-------编辑-------

好的,我想我通过将<- redisChnl移动到default内来让它工作,但现在我看到default内的count before readcount after read总是打印2,000......我会以为先count before read = 2,000然后count after read = 0因为通道是空的??

select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminatingn", sig)
run = false
default:
ev := c.Poll(100)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
//fmt.Printf("%% Message on %s:n%sn",
//  e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %vn", e.Headers)
}
str := e.Value
res := MessageFormat{}
json.Unmarshal([]byte(str), &res)

//fmt.Println("size", binary.Size([]byte(str)))
host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)
redisMap[host] = string(str)
go func() {
redisChnl <- e.Value
}()

case kafka.PartitionEOF:
fmt.Printf("%% Reached %vn", e)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %vn", e)
run = false
default:
fmt.Println("count before read: ", len(redisChnl))
fmt.Printf("Ignored %vn", e)
<-redisChnl
fmt.Println("count after read: ", len(redisChnl)) //would've expected this to be 0
}

}

我认为简化这段代码的更大方法是将管道分成多个goroutines。

频道的优点是多人可以同时在频道上写作和阅读。在此示例中,这可能意味着让一个 go 例程排队到通道上,另一个从通道拉出并将内容放入 redis 中。

像这样:

c := make(chan Message, bufferLen)
go pollKafka(c)
go pushToRedis(c)

如果要添加批处理,可以添加一个中间阶段,该中间阶段从 kafka 通道轮询,并附加到切片,直到切片已满,然后将该切片排队到通道上以进行 redis。

如果像这样的并发不是目标,那么将代码中的通道替换为切片可能更容易。如果只有一个 goroutine 作用于一个对象,那么尝试使用通道不是一个好主意。

最新更新