将 Elasticsearch 2.x 转储到 MongoDB并返回 ES 6.x



这个问题更多的是理论性的,而不是源代码。

我有一个ES 2.x节点,它有超过1.2TB的数据。我们有 40+ 个索引,每个索引至少有 1 种类型。在这里,ES 2.x被用作数据库而不是搜索引擎。用于将数据转储到 ES 2.x 的源丢失。此外,数据不是规范化的,但单个 ES 文档具有多个嵌入文档。我们的目标是重新创建数据源,同时对其进行规范化。

我们的计划是:

  1. 从 ES 中检索数据,对其进行分析并将其转储到新的 mongodb 中到特定集合中,并维护数据之间的关系。 即以规范化形式保存。
  2. 在新的 ES 6 节点上索引新的 mongo 数据。

我们正在使用JRuby 9.1.15.0,Rails 5,Ruby 2.4和Sidekiq。

目前,我们正在从 ES 检索特定日期时间范围的数据。有时我们收到 0 条记录,有时收到 100000+。问题是当我们收到大量记录时。

下面是一个示例脚本,当日期范围的数据较小时有效,但在数据较大时失败。1.2TB/40 索引是平均索引大小

class DataRetrieverWorker
include Sidekiq::Worker
include Sidekiq::Status::Worker
def perform(indx_name, interval = 24, start_time = nil, end_time = nil)
unless start_time || end_time
client = ElasticSearchClient.instance.client
last_retrieved_at = RetrievedIndex.where(name: indx_name).desc(:created_at).first
start_time, end_time = unless last_retrieved_at
data = client.search index: indx_name, size: 1, sort: [{ insert_time: { order: 'asc' } }]
first_day = DateTime.parse(data['hits']['hits'].first['_source']['insert_time'])
start_time = first_day.beginning_of_day
end_time = first_day.end_of_day
else
# retrieve for the next time slot. usually 24 hrs.
[last_retrieved_at.end_time, last_retrieved_at.end_time + interval.hours]
end
DataRetrieverWorker.perform_async(indx_name, interval, start_time, end_time)
else
# start scroll on the specified range and retrieve data.
query = { range: { insert_time: { gt: DateTime.parse(start_time).utc.iso8601, lt: DateTime.parse(end_time).utc.iso8601 } } }
data = client.search index: indx_name, scroll: '10m', size: SCROLL_SIZE, body: { query: query }
ri = RetrievedIndex.find_by(name: indx_name, start_time: start_time, end_time: end_time)
if ri
DataRetrieverWorker.perform_at(2.seconds.from_now, indx_name, interval)
return
end
ri = RetrievedIndex.create!(name: indx_name, start_time: start_time, end_time: end_time, documents_cnt: data['hits']['total'])
if data['hits']['total'] > 0
if data['hits']['total'] > 2000
BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
BulkJobsHandlerWorker.perform_async(ri.id.to_s, data['hits']['hits'])
end
else
data['hits']['hits'].each do |r|
schedule(r)
ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
end
while data = client.scroll(body: { scroll_id: data['_scroll_id'] }, scroll: '10m') and not data['hits']['hits'].empty? do
data['hits']['hits'].each do |r|
schedule(r)
ri.retrieved_documents.find_or_create_by!(es_id: r['_id'], es_index: indx_name)
end
end
end
else
DataRetrieverWorker.perform_async(indx_name, interval)
return
end
DataRetrieverWorker.perform_at(indx_name, interval)
end
end
private
def schedule(data)
DataPersisterWorker.perform_async(data)
end
end

问题:

  1. 从 ES 2.x 检索数据的理想方法应该是什么?我们通过日期范围检索数据,然后使用滚动 API 检索结果集。这是对的吗?
  2. 当我们在特定时间范围内获得较大的结果时应该怎么做。有时,我们会在几分钟的时间范围内获得 20000+ 条记录。理想的方法应该是什么?
  3. sidekiq 是处理这种数据量的正确库吗?
  4. 运行 sidekiq 的服务器的理想配置应该是什么?
  5. 使用日期范围是检索数据的正确方法吗? 文档数量变化很大。 0 或 100000+。
  6. 有没有更好的方法可以让我知道记录数量,而不管时间范围如何?
  7. 我尝试独立于时间范围使用滚动 api,但对于具有 100cr 记录的索引,使用大小为 100(对 ES 的 api 调用为 100 个结果)的滚动是否正确? 8.指数中的数据不断添加。不会更新任何文档。

我们已经测试了我们的代码,它在每个日期时间范围(例如 6 小时)处理标称数据(例如 4-5k 文档)。我们还计划对数据进行分片。由于每当我们在某些集合中添加/更新记录时,我们都需要执行一些 ruby 回调,因此我们将使用 Mongoid 进行相同的操作。在没有mongoid的情况下直接插入mongodb中的数据不是一种选择。

任何指针都会有所帮助。谢谢。

在我看来,你应该假设这个过程在任何阶段都可能失败。

恕我直言,您不应该下载所有文档,而应该下载匹配日期范围文档的IDS。这将显著减少 ElasticSearch 返回的数据量。

使用这些IDS,您可以在后台执行另一个使用IDS作为输入的工作线程(我们称之为ImporterWorker),它将从ElasticSearch下载整个文档并将它们导出到MongoDB。

此外,如果您获得 1_000_000 IDS,您可以将它们拆分为 N 个较小的块 (200 X 5_000) 并将 N 个作业排队。

好处:

  • 拆分为块 - 您没有从 ElasticSearch 获得大量响应的风险,因为块大小决定了 ElasticSearch 响应的最大大小

  • 当出现问题(临时网络问题或其他任何事情)时,您将使用最初触发它的IDS重新运行ImporterWorker,并且一切都可以在不中断整个过程的情况下正常工作。 即使它失败了 - 你也会知道没有导入的确切IDS

  1. 从 ES 2.x 检索数据的理想方法是什么?我们通过日期范围检索数据,然后使用滚动 API 检索结果集。这是对的吗?

ES 中的数据是否在不断增加?

  1. 当我们在特定时间范围内获得较大的结果时应该怎么做。有时,我们会在几分钟的时间范围内获得 20000+ 条记录。理想的方法应该是什么?

您正在使用滚动 api,这是一种很好的方法。您可以尝试 ES 的切片滚动 API。

  1. sidekiq 是处理这种数据量的正确库吗?

是的,sidekiq很好,可以处理这么多的数据。

  1. 运行 sidekiq 的服务器的理想配置应该是什么?

您当前对运行 sidekiq 的服务器配置是什么?

  1. 使用日期范围是检索数据的正确方法吗? 文档数量差异很大。 0 或 100000+。

您一次不会持有 100000+ 个结果。您正在使用滚动 API 在块中处理它们。如果数据没有继续添加到 ES 中,则使用带有滚动 API 的查询match_all: {}。如果不断添加数据,则日期范围是很好的方法。

  1. 有没有更好的方法可以给我统一的记录数,而不管时间范围如何?

是的,如果您在不使用日期范围的情况下使用。使用滚动 API 扫描从 0 到最后的所有文档。

  1. 我尝试独立于时间范围使用滚动 api,但对于具有 100cr 记录的索引,使用大小为 100(对 ES 的 api 调用为 100 个结果)的滚动是否正确?

您可以增加滚动大小,因为 mongodb 支持批量插入文档。 MongoDB 批量插入

以下几点可能会解决您的问题:

在处理上一批后清除scroll_id可能会提高性能。

  1. 滚动请求具有优化功能,可在_doc排序顺序时使其更快。如果要遍历所有文档而不考虑顺序,这是最有效的选项。

  2. scroll参数告诉Elasticsearch应该保持搜索上下文存活多长时间。它的值(例如 1m)不需要足够长来处理所有数据,只需要足够长来处理前一批结果。每个滚动请求都会设置一个新的到期时间。

  3. 超过滚动超时时,将自动删除搜索上下文。但是,保持滚动打开是有代价的(稍后在性能部分讨论),因此一旦不再使用滚动,应使用清除滚动 API 显式清除滚动

  4. 滚动 API:后台合并过程通过将较小的段合并在一起以创建新的较大段来优化索引,此时将删除较小的段。此过程在滚动过程中继续,但打开的搜索上下文可防止旧区段在仍在使用时被删除。这就是 Elasticsearch 能够返回初始搜索请求结果的方式,而不管文档的后续更改如何。 保持较旧的段处于活动状态意味着需要更多的文件句柄。确保节点已配置为具有充足的可用文件句柄,并在数据获取后立即清除滚动 API 上下文。 我们可以检查有多少搜索上下文打开了节点统计 API:

因此,非常有必要清除滚动 API 上下文,如前面的清除滚动 API 部分所述。

基于 Elasticsearch 查询备份/恢复或重新索引数据的一个非常方便的工具是 Elasticdump。

要备份完整的索引,Elasticsearch snapshot API 是正确的工具。快照 API 提供创建和还原存储在文件或 Amazon S3 存储桶中的整个索引快照的操作。

让我们看一下 Elasticdump 和快照备份和恢复的几个示例。

  1. 使用节点包管理器安装 elasticdump

    npm i elasticdump -g
    
  2. 通过查询备份到 zip 文件:

    elasticdump --input='http://username:password@localhost:9200/myindex' --searchBody '{"query" : {"range" :{"timestamp" : {"lte": 1483228800000}}}}' --output=$ --limit=1000 | gzip > /backups/myindex.gz
    
  3. 从 zip 文件还原:

    zcat /backups/myindex.gz | elasticdump --input=$ --output=http://username:password@localhost:9200/index_name
    

使用快照备份和还原数据到 Amazon S3 或文件的示例

首先配置快照目标

  1. S3 示例

    curl 'localhost:9200/_snapshot/my_repository?pretty' 
    -XPUT -H 'Content-Type: application/json' 
    -d '{
    "type" : "s3",
    "settings" : {
    "bucket" : "test-bucket",
    "base_path" : "backup-2017-01",
    "max_restore_bytes_per_sec" : "1gb",
    "max_snapshot_bytes_per_sec" : "1gb",
    "compress" : "true",
    "access_key" : "<ACCESS_KEY_HERE>",
    "secret_key" : "<SECRET_KEY_HERE>"
    }
    }'
    
  2. 本地磁盘或挂载的 NFS 示例

    curl 'localhost:9200/_snapshot/my_repository?pretty' -XPUT -H 'Content-Type: application/json' -d '{
    "type" : "fs",
    "settings" : {
    "location": "<PATH … for example /mnt/storage/backup>"
    }
    }'
    
  3. 触发器快照

    curl -XPUT 'localhost:9200/_snapshot/my_repository/<snapshot_name>'
    
  4. 显示所有备份

    curl 'localhost:9200/_snapshot/my_repository/_all'
    
  5. 还原 – 备份最重要的部分是验证备份还原是否确实有效!

    curl -XPOST 'localhost:9200/_snapshot/my_repository/<snapshot_name>/_restore'
    

此文本位于:
https://sematext.com/blog/elasticsearch-security-authentication-encryption-backup/

相关内容

最新更新