Ruby - Elastic Search & RabbitMQ - 数据导入丢失,脚本静默崩溃



Stackers

我在 RabbitMQ 队列中有很多消息(在我的开发环境中的本地主机上运行)。 消息的有效负载是一个 JSON 字符串,我想将其直接加载到弹性搜索中(现在也在本地主机上运行)。 我写了一个快速的 ruby 脚本来从队列中提取消息并加载到 ES 中,如下所示:

#! /usr/bin/ruby
require 'bunny'
require 'json'
require 'elasticsearch'
# Connect to RabbitMQ to collect data
mq_conn = Bunny.new
mq_conn.start
mq_ch = mq_conn.create_channel
mq_q  = mq_ch.queue("test.data")
# Connect to ElasticSearch to post the data
es = Elasticsearch::Client.new log: true
# Main loop - collect the message and stuff it into the db.
mq_q.subscribe do |delivery_info, metadata, payload|
    begin
            es.index index: "indexname",
                     type:  "relationship",
                     body:  payload
    rescue
            puts "Received #{payload} - #{delivery_info} - #{metadata}"
            puts "Exception raised"
            exit
    end
end
mq_conn.close

队列中大约有 4,000,000 条消息。

当我运行脚本时,我看到一堆消息(例如 30 条)被加载到 Elastic Search 中。 但是,我看到大约 500 条消息离开队列。

root@beep:~# rabbitmqctl list_queues
Listing queues ...
test.data    4333080
...done.
root@beep:~# rabbitmqctl list_queues
Listing queues ...
test.data    4332580
...done.

然后剧本默默地退出,没有告诉我例外。 开始/救援块永远不会触发异常,所以我不知道为什么脚本提前完成或丢失这么多消息。 任何线索我接下来应该如何调试它。

一个

我在这里添加了一个简单的工作示例:

https://github.com/elasticsearch/elasticsearch-ruby/blob/master/examples/rabbitmq/consumer-publisher.rb

如果不提供测试数据的示例,则很难调试示例。

Elasticsearch "river" 功能已被弃用,最终将被删除。你绝对应该花时间编写自己的自定义馈送器,如果RabbitMQ和Elasticsearch是你基础设施的核心部分。

在回答我自己的问题时,我了解到这是一种将索引指令的消息队列加载到 Elastic 中的疯狂而愚蠢的方法。 我创建了一条河流,可以比使用绳索脚本更快地排出指令。;-)

相关内容

  • 没有找到相关文章

最新更新