NodeJS:KafkaJSProtocolError:组成员支持的协议与现有成员的协议不兼容



我正在尝试使用MongoDB debezium连接器从Kafka捕获数据,但是当我尝试使用KafkaJS读取数据时出现错误:

KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members

我正在使用码头工人映像来捕获数据。

以下是步骤,我正在遵循:

  1. 启动动物园管理员

    docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest
    
  2. 启动卡夫卡

    docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest
    
  3. 我已经在复制模式下运行了MongoDB

  4. 启动 debezium Kafka connect

    docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka  debezium/connect:latest
    
  5. 然后发布MongoDB连接器配置

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "mongodb-connector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "rs0/abc.com:27017", "mongodb.name": "fullfillment", "collection.whitelist": "mongodev.test", "mongodb.user": "kafka", "mongodb.password": "kafka01" } }'
    
  6. 有了这个,如果我运行观察程序 docker 容器,我可以在控制台中以 Json 格式数据

    docker run -it --name watchermongo --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k fullfillment.mongodev.test
    

但我想在应用程序中捕获这些数据,以便我可以操作它、处理它并推送到 ElasticSearch。为此,我正在使用

https://github.com/tulios/kafkajs 

但是当我运行消费者代码时,我收到错误。这是代码示例

//'use strict';



// clientId=connect-1, groupId=1
const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'connect-1',
  brokers: ['localhost:9092', 'localhost:9093']
})

// Consuming
const consumer = kafka.consumer({ groupId: '1' })

var consumeMessage = async () => {

await consumer.connect()
await consumer.subscribe({ topic: 'fullfillment.mongodev.test' })

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      value: message.value.toString(),
    })
  },
})

}

consumeMessage();

KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members

您不应该在 Connect 和 KafkaJS 使用者中使用相同的 groupId。如果您这样做,它们将成为同一使用者组的一部分,这意味着消息只会由其中一个或另一个使用,如果它甚至有效的话。

如果你将KafkaJS消费者的groupId更改为唯一的东西,它应该可以工作。

请注意,默认情况下,新的 KafkaJS 使用者组将从最新的偏移量开始使用,因此它不会使用已生成的消息。您可以使用consumer.subscribe调用中的 fromBeginning 标志覆盖此行为。请参阅 https://kafka.js.org/docs/consuming#from-beginning

相关内容

  • 没有找到相关文章

最新更新