kafkaconnectelasticsearch:如何根据kafka主题标题中的某个值删除文档



我正在使用Kafka Connect加快速度。尝试使用Kafka Connect Elasticsearch服务接收器连接器将我们的数据从Kafka移动到Elasticsearch。我有一个处理流,看起来像这样:

s3中的文件记录->从发布到->卡夫卡主题->Kafka连接->弹性搜索

这适用于创建/更新的场景。但是,我们希望处理文件的删除场景。我们的应用程序发布了一个删除操作的事件,并将其设置为Kafka消息中头值的一部分。我们希望删除文档本身,而不是使用此删除操作信息更新弹性文档。

我们如何使用Kafka Connect读取这个头值并从Elastic中删除给定密钥的文档?

感谢您提前提供的帮助。

谨致问候,Vikas

编辑时间:我试图转换的消息示例:

[{
"key": "fileid=05ffefea-a71d-4bb7-091e-08d8f9229806",
"rownum": 0,
"metadata": {
"offset": 1468950,
"partition": 3,
"timestamp": 1617773161088,
"__keysize": 43,
"__valsize": 596
},
"headers": {
"sub-tenant-id": "",
"actiondate": "2021-04-07T05:26:01.0790010Z",
"action": "uploaded",
"contentversion": "V1"
},
"value": {
"id": "fil.05ffefeaa71d4bb7091e08d8f9229806",
"name": "4.txt",
"volumeId": "vol.e25196dc9e2f460bb27308d8f8405691",
"volumeName": "projdmck0405",
"type": "text/plain",
"subTenantId": "",
"path": "/4.txt",
"timeCreated": "2021-04-07T05:25:46.129Z",
"timeModified": "2021-04-07T05:25:46.129Z",
"urn": "urn:mycompany:product:test:app:file:fil.05ffefeaa71d4bb7091e08d8f9229806#/4.txt",
"sizeInBytes": 76,
"isUploaded": true,
"archiveStatus": "None",
"storageTier": "Standard",
"eTag": "11fb9ec5531d90d571b331cc39e43175"
}
}]

我正在尝试将action报头fieldvalue添加到消息的正文中。

以下是我使用示例使用的变换:https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/examples/HeaderToField.headertofield.html

"transforms"                            : "dropNullRecords,headerToField",
"transforms.headerToField.type"             : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
"transforms.headerToField.header.mappings"  : "action:STRING:actioninbody"

我确实用映射值";动作:字符串;就在下面的例子中,我注意到提到的格式是:

The format is <header name>:<header type>[:field name]. 

我缺少什么?

我能够做到这一点。最终编写了一个自定义SMT。使用SMT,我可以访问连接记录,包括标题和值。所以我只是逐个读取头值,当遇到我感兴趣的值时,我将连接记录的值设置为null。除此之外,Kafka Connect还公开了以下参数:

behavior.on.null.values
How to handle records with a non-null key and a null value (for example, Kafka tombstone records). Valid options are IGNORE, DELETE, and FAIL.
Type: string
Default: FAIL
Valid Values: (case insensitive) [DELETE, IGNORE, FAIL]
Importance: low

我将值设置为DELETE,它开始从ES索引中删除记录。

我遵循了Confluent的自定义SMT示例:https://github.com/confluentinc/kafka-connect-insert-uuid

对理解概念和连接记录类结构本身有很大帮助。

希望这能帮助其他人。

相关内容

  • 没有找到相关文章

最新更新