我正在开发一个基于事件机器的应用程序,该应用程序会定期轮询MongoDB存储文档的更改。
简化的代码片段可能如下所示:
require 'rubygems'
require 'eventmachine'
require 'em-mongo'
require 'bson'
EM.run {
@db = EM::Mongo::Connection.new('localhost').db('foo_development')
@posts = @db.collection('posts')
@comments = @db.collection('comments')
def handle_changed_posts
EM.next_tick do
cursor = @posts.find(state: 'changed')
resp = cursor.defer_as_a
resp.callback do |documents|
handle_comments documents.map{|h| h["comment_id"]}.map(&:to_s) unless documents.length == 0
end
resp.errback do |err|
raise *err
end
end
end
def handle_comments comment_ids
meta_product_ids.each do |id|
cursor = @comments.find({_id: BSON::ObjectId(id)})
resp = cursor.defer_as_a
resp.callback do |documents|
magic_value = documents.first['weight'].to_i * documents.first['importance'].to_i
end
resp.errback do |err|
raise *err
end
end
end
EM.add_periodic_timer(1) do
puts "alive: #{Time.now.to_i}"
end
EM.add_periodic_timer(5) do
handle_changed_posts
end
}
因此,每 5 秒 EM 会遍历所有帖子,并选择更改的帖子。对于每个更改的帖子,它将comment_id存储在数组中。完成后,该数组将传递给handle_comments
,该加载每个注释并进行一些计算。
现在我有些难以理解:
我知道,这个load_posts>load_comments>计算周期在有 20000 个帖子的 Rails 控制台中需要 3 秒,所以在 EM 中不会快多少。我每 5 秒安排一次
handle_changed_posts
方法,这很好,除非帖子数量增加并且计算时间超过 5 秒,之后再次安排相同的运行。在那种情况下,我很快就会遇到问题。如何避免这种情况?我相信em-mongo,但我不相信我的EM知识。为了监控EM仍在运行,我每秒
puts
一个时间戳。这似乎工作正常,但是当我的计算运行时,每 5 秒就会有点颠簸。这是一个信号,我阻止了循环吗?有什么通用方法可以确定我是否阻塞了循环?
我应该用 -19 来完善我的事件机器进程,以始终为其提供顶级操作系统 prio 吗?
不愿意在这里回答,因为到目前为止我没有 mongo 经验,但考虑到没有人回答,而且这里的一些东西是一般的 EM 东西,我也许能提供帮助:
- 在第一次扫描结束时安排下一次扫描(
handle_changed_posts
中的resp.callback
和resp.errback
似乎是链接下一次扫描的良好候选项),无论是add_timer
还是next_tick
- 可能,尝试更频繁地处理您的 MONGO 行程,以便它们处理较小的数据块,反应堆内的任何 CPU 周期占用都会使您的反应堆循环太忙而无法接受诸如周期计时器时钟周期之类的事件 没有
- 简单的方法,没有。一个想法是测量
Time.now
到next_tick{Time.now}
的差异,进行基准测试,然后在差异超过阈值时跟踪可能的罪魁祸首。模拟慢查询(在mongodb中模拟慢查询??)和许多并行连接是个好主意 - 老实说,我不知道,我从未遇到过这样做的人,我希望这取决于该服务器上运行的其他内容
为了扩展 bbozo 的答案,特别是关于你的第二个问题,当你运行代码时,没有时间不阻塞循环。 根据我的经验,当我们谈论"非阻塞"代码时,我们真正的意思是"不会阻塞很长时间的代码"。 通常,这些是非常短的时间(小于一毫秒),但它们在执行时仍然会阻塞。
此外,next_tick
唯一真正做的就是说"做这个,但不是现在"。 正如bbozo所提到的,你真正想做的是将你的处理拆分为多个即时报价,以便每次迭代阻塞尽可能短的时间。
要使用您自己的基准测试,如果处理 20,000 条记录大约需要 3 秒,则 4,000 条记录大约需要 0.6 秒。 这足够短,通常不会影响您的 1 秒心跳。 你可以把它分开得更远,以减少阻塞量,使反应堆运行更平稳,但这实际上取决于你需要从反应堆获得多少并发性。