使用 Kafka Connect 更新现有文档上的 Elasticsearch 字段,而不是创建新文档



我使用 Elasticsearch 连接器运行了 Kafka 设置,并且我成功地根据特定主题的传入消息将新文档索引到 ES 索引中。

但是,根据另一个主题的传入消息,我需要将数据附加到同一索引中特定文档的字段。

以下伪模式:

{
"_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"title": "A title",
"body": "A body",
"created_at": 164584548,
"views": []
}

^ 本文档正在根据上述主题中的数据在 ES 中创建。

但是,我如何使用来自其他主题的消息将项目添加到views字段中。这样:

article-view主题架构:

{
"article_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"user_id": 123456,
"timestamp: 136389734
}

而不是简单地在article-view索引上创建一个新文档(我什至不想拥有(。它会将其附加到文章文档的views字段中,相应的_id等于消息中的article_id

因此,一条消息后的最终结果将是:

{
"_id": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"uuid": "6993e0a6-271b-45ef-8cf5-1c0d0f683acc",
"title": "A title",
"body": "A body",
"created_at": 164584548,
"views": [
{
"user_id": 123456,
"timestamp: 136389734
}
]
}

使用 ES API,可以使用脚本。这样:

{
"script": {
"lang": "painless",
"params": {
"newItems": [{
"timestamp": 136389734,
"user_id": 123456
}]
},
"source": "ctx._source.views.addAll(params.newItems)"
}
}

我可以动态地批量生成上述脚本,然后使用 ES Python 库中的helpers.bulk函数以这种方式批量更新文档。

这可以通过Kafka Connect/Elasticsearch实现吗?我还没有在Confluent的网站上找到任何文档来解释如何做到这一点。

这似乎是一个相当标准的要求,也是人们需要使用 Kafka/A 像 ES 这样的接收器连接器做的一件显而易见的事情。

谢谢!

编辑:使用write.method=upsert(src(可以进行部分更新


Elasticsearch 连接器不支持此功能。您可以就地更新文档,但需要发送完整的文档,而不是用于附加的增量,我认为这是您所追求的。

最新更新