删除和编辑操作无法使用弹性搜索接收器连接器



我正在尝试使用Kafka在MongoDB和Elasticsearch之间构建一个简单的管道。插入的数据成功存储在Elasticsearch中,但当我编辑或删除一个文档时,我只会在Elastic search中存储另一个文档。这是我的MongoDB源连接器

{ "name": "mongo-source", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo1:27017", 
"mongodb.name": "fullfillment" } }

es接收器连接器

{ "name": "sink", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "connection.url": "http://172.21.0.3:9200", "type.name": "subscriber", "topics": "fullfillment.elasticsearchApp.subscriber","schema.ignore": "true","behavior.on.null.values": "delete","key.ignore": "true"    } }

这是我在卡夫卡主题中得到的记录样本

{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"patch"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"ns"},{"type":"int32","optional":false,"field":"sec"},{"type":"int32","optional":false,"field":"ord"},{"type":"int64","optional":true,"field":"h"},{"type":"boolean","optional":true,"default":false,"field":"initsync"}],"optional":false,"name":"io.debezium.connector.mongo.Source","version":1,"field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"fullfillment.elasticsearchApp.subscriber.Envelope"},"payload":{"after":"{"_id" : {"$oid" : "5ea824df92f03c3c92966b04"},"name" : "adem"}","patch":null,"source":{"version":"0.7.5","name":"fullfillment","rs":"rs0","ns":"elasticsearchApp.subscriber","sec":1588077791,"ord":1,"h":0,"initsync":false},"op":"c","ts_ms":1588077791540}}
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:6.8.6
hostname: elasticsearch
container_name: elasticsearch
ports:
- 9200:9200
networks:
- es-network  
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- 2181:2181
networks:
- es-network  
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:5.0.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- 29092:29092
- 9092:9092
networks:
- es-network
environment:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:5.0.0
hostname: schema-registry
container_name: schema-registry
networks:
- es-network
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
SCHEMA_REGISTRY_HOST_NAME: schema-registry
depends_on:
- zookeeper
- kafka
kafka-connect:
image: confluentinc/cp-kafka-connect
hostname: kafka-connect
container_name: kafka-connect
networks:
- es-network
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
#INTERNAL_KEY_CONVERTER: 'false'
#INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE: 'true' 
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR:  1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR:  1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR:  1
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
CONNECT_CONFLUENT_TOPIC_REPLICATION_FACTOR: 1
volumes:
- /home/xroot/Desktop/docker_env/confluent_env/confluent_on_docker/jars/debez:/etc/kafka-connect/jars
depends_on:
- zookeeper
- kafka
- schema-registry
#control-center:
#  image: confluentinc/cp-enterprise-control-center:5.4.0
#  hostname: control-center
#  container_name: control-center
#  depends_on:
#    - zookeeper
#    - kafka
#   - schema-registry
#  ports:
#    - 9021:9021
#  environment:
#    CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka:9092
#    CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181
#    CONTROL_CENTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
#    CONTROL_CENTER_REPLICATION_FACTOR: 1
#    CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
#    CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
#     CONFLUENT_METRICS_TOPIC_REPLICATION: 1
#    PORT: 9021

# MongoDB Replica Set
mongo1:
hostname: mongo1
image: mongo
container_name: mongo1
networks:
- es-network
ports:
- 27018:27017
restart: always
entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
mongo2:
hostname: mongo2
image: mongo
container_name: mongo2
networks:
- es-network
ports:
- 27017:27017
restart: always
entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "rs0" ]
networks:
es-network:
attachable: true

您已经设置了"key.ignore": "true",根据文档,这意味着连接器将使用消息的主题+分区+Elasticsearch文档ID的偏移量。由于Kafka中的每个更新和删除消息都是一个新消息,因此您每次都会得到一个新的Elasticsearch文档。

在接收器连接器中设置"key.ignore": "true",并确保Kafka消息密钥唯一标识要在Elasticsearch中更新/编辑的文档。


要使用MongoDB Source专门处理此问题,您需要将源ID从密钥中的STRUCT中提取出来,方法是将其添加到源连接器:

"transforms":"extractValuefromStruct",
"transforms.extractValuefromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key"
"transforms.extractValuefromStruct.field":"id"

最新更新