在 Elasticsearch 中进行唯一性检查,无需不断刷新索引



我在Elasticsearch(通过NEST(中索引了来自多个进程的大量数据,每个进程运行多个线程。索引文档的一部分是找出我们以前是否见过类似的文档。此功能是通过在文档上生成一组字段的哈希值并检查我们在 Elasticsearch 中是否有具有相同哈希的文档来实现的。在为文档编制索引之前,我进行以下查询:

var result = elasticClient
.Index(indexName)
.Count<MyDocument>(c => c
.Query(q => q
.ConstantScore(qs => qs
.Filter(f => f
.Term(field => field.Hash, hash))))
...

这将返回具有指定hash的现有文档的计数。目前为止,一切都好。事情正在发挥作用。如果进程在同一秒内为具有相同哈希的两个文档编制索引,则计数检查不起作用,因为第一个文档尚不可用于搜索。我以默认刷新间隔(1 秒(运行。现在,我在索引每个文档后添加了一个刷新调用:

var refreshResponse = client.Refresh(indexName);

这也可以工作,但在索引大量文档时它不会扩展(索引变得很慢,正如这里已经指出的那样:https://www.elastic.co/guide/en/elasticsearch/reference/current/tune-for-indexing-speed.html(。

关于如何避免打电话给Refresh但仍能够执行唯一性检查的任何想法?我正在考虑在所有线程之间共享某种本地缓存,其中包含自上次刷新以来索引的文档哈希。我知道这不会跨流程工作,但目前这是可以接受的。

我最终按照 Val 的建议实现了直写缓存。这样就可以删除对Refresh的调用,但仍对每次迭代进行计数。这是使用在所有线程之间共享的单例MemoryCache实现的:

var cache = new MemoryCache("hashes");

在检查唯一性时,我会检查缓存,以防在 Elasticsearch 中找不到类似的文档:

var result = elasticClient
.Count<MyDocument>(c => c
.Index(indexName)
.Query(q => q
.ConstantScore(qs => qs
.Filter(f => f
.Term(field => field.Hash, hash)))));
bool isUnique = false;
if (result.Count == 0)
{
isUnique = !cache.Contains(hash);
}

如果哈希计数返回0我会检查该哈希的缓存。

当文档成功编制索引后,我将哈希存储在缓存中,过期:

var policy = new CacheItemPolicy();
policy.AbsoluteExpiration = DateTimeOffset.UtcNow.AddSeconds(5);
cache.AddOrGetExisting(hash, string.Empty, policy);

TTL 也可能是 1 秒,因为这是我当前在索引上配置的刷新间隔。

最新更新