segmentio/kafka-go reader客户端不订阅主题和分区



读取器客户端未开始使用消息。这是间歇性发生的,在大多数情况下,当主题中没有消息时发生。

<<p>卡夫卡版本/strong>
Apache Kafka 3.3.0
<<p>kafka-go版本/strong>
v0.4.38

复制行为的资源:

代码:

func main() {
topic_name := "dev-billing"
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
ctx, cancel := context.WithCancel(context.Background())
go func() {
sig := <-signals
fmt.Println("Got signal: ", sig)
cancel()
}()
r := kafka.NewReader(kafka.ReaderConfig{
Brokers:                []string{"0.0.0.0:9092"},
GroupID:                "consumer-group-biller",
GroupTopics:            []string{},
Topic:                  topic_name,
QueueCapacity:          10,
MinBytes:               10e3,
MaxBytes:               10e6,
MaxWait:                3 * time.Second,
PartitionWatchInterval: 5 * time.Second,
WatchPartitionChanges:  true,
StartOffset:            kafka.LastOffset,
ReadBackoffMax:         10 * time.Second,
Logger:                 log.Default(),
OffsetOutOfRangeError:  true,
})
i := 0
// listening for the interrupts in a different channel.
defer func() {
err := r.Close()
if err != nil {
fmt.Println("Error closing consumer: ", err)
return
}
fmt.Println("Consumer closed")
}()
for {
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
msg := m.Value
content := Event{}
json.Unmarshal([]byte(msg), &content)
fmt.Printf("%+vn", content)
if content.StatusCode == 200 {
i++
}
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
fmt.Println("Total:", i)
}
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}

消费者应该在最后一个偏移量启动后立即开始消费主题分区中的消息。

当生产者已经在生产时,消费者无法订阅主题。如果消费者在生产者之前启动,它就会工作。

错误日志:

2022/11/17 14:25:34 entering loop for consumer group, consumer-group-biller
2022/11/17 14:25:37 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 joinGroup succeeded for response, consumer-group-biller.  generationID=44, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff
2022/11/17 14:25:37 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff in generation 44
2022/11/17 14:25:37 received empty assignments for group, consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-98bd8f69-bb6a-4130-a50b-16edf0c2cdff for generation 44
2022/11/17 14:25:37 sync group finished for group, consumer-group-biller
2022/11/17 14:25:37 subscribed to topics and partitions: map[]
2022/11/17 14:25:37 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:25:37 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:25:37 started commit for group consumer-group-biller

工作时:

2022/11/17 14:09:02 entering loop for consumer group, consumer-group-biller
2022/11/17 14:09:04 joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 joinGroup succeeded for response, consumer-group-biller.  generationID=35, memberID=consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273
2022/11/17 14:09:04 Joined group consumer-group-biller as member consumer@localhost.local (github.com/segmentio/kafka-go)-fd0d8373-d966-4600-9cd6-d0dda31a7273 in generation 35
2022/11/17 14:09:04 sync group finished for group, consumer-group-biller
2022/11/17 14:09:04 subscribed to topics and partitions: map[{topic:dev-billing partition:0}:25]
2022/11/17 14:09:04 started heartbeat for group, consumer-group-biller [3s]
2022/11/17 14:09:04 started partition watcher for group, consumer-group-biller, topic dev-billing [5s]
2022/11/17 14:09:04 initializing kafka reader for partition 0 of dev-billing starting at offset 25
2022/11/17 14:09:04 started commit for group consumer-group-biller
2022/11/17 14:09:04 the kafka reader for partition 0 of dev-billing is seeking to offset 25
{RequestID:f9f1971f-3d5b-4ef5-b92b-880fe094887e EventID:0d52de85-8da4-435d-8d19-e267947670c3 Event:example MerchantID:id Status:OK StatusCode:200}
2022/11/17 14:09:04 committed offsets for group consumer-group-biller:

裁判:https://github.com/segmentio/kafka-go

这个库最近出现了问题,我建议暂时降级并在github中报告这个问题。

具体来说,您可以降级到v0.4.35, v0.4.36中引入了一些重构到消费者组,如果您查看问题页面,会导致消费者组出现问题。

相关内容

  • 没有找到相关文章

最新更新