指定对象的 Sidekiq 队列



我用这个工人进行进程

class CreateOrUpdateContactWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1
sidekiq_retries_exhausted do |msg|
Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
end
def perform(user_id, changed_fields, update_address = false)
ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
end
end

在用户模型中,我有after_commit回调

def update_mautic_contact
CreateOrUpdateContactWorker.perform_async(id, previous_changes.keys, ship_address_changed || false)
end

问题是当用户同时更新两次时,因为create_or_update_contact需要一些时间。如何仅限制指定用户的线程数?每个任务将逐个执行以指定user_id。

我不知道您是否将redis作为基础架构的一部分,但您所描述的是竞争条件。要解决此问题,您需要将互斥锁/锁定到关键路径create_or_update_contact

这里的竞争条件发生在两个异步工作线程/进程之间,因此您不能只使用简单的 ruby 互斥锁/锁定。您需要使用中央锁存储/守护器的分布式互斥锁。这:https://github.com/kenn/redis-mutex 应该为您完成,但您将需要redis数据库。

基本上,您的代码将如下所示:

class CreateOrUpdateContactWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1
sidekiq_retries_exhausted do |msg|
Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
end
def perform(user_id, changed_fields, update_address = false)
RedisMutex.with_lock("#{user_id}_create_or_update_contact") do
ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
end
end
end

因此,如果您同时有 2 个 user_id=1 的用户更新,则第一个获取称为1_create_or_update_contact的锁/互斥锁将首先执行并阻止另一个调用,直到它完成,然后第二个调用将开始。

这将解决您的问题:)我认为redis是需要的、有用的和少数的。我几乎想不出我的任何轨道项目而不使用redis.

我用 Redis 意识到了这一点,但没有任何宝石。我在执行工作线程之前使用了条件:

def update_mautic_contact
if Rails.current.get("CreateOrUpdateContactWorkerIsRunning_#{id}")
Redis.current.set("CreateOrUpdateContactWorkerIsRunning_#{id}", true)
CreateOrUpdateContactWorker.perform_in(1.minutes, id, changed_fields)
else
Redis.current.set("CreateOrUpdateContactWorkerIsRunning_#{id}", true)
CreateOrUpdateContactWorker.perform_async(id, changed_fields)
end
end

和内部工人:

class CreateOrUpdateContactWorker
include Sidekiq::Worker
sidekiq_options retry: 2, queue: 'contact_updater', concurrency: 1
sidekiq_retries_exhausted do |msg|
Airbrake.notify(error_message: "Contact update failed", session: { msg: msg })
end
def perform(user_id, changed_fields, update_address = false)
ContactUpdater.create_or_update_contact(user_id, changed_fields, update_address: update_address)
Redis.current.del("CreateOrUpdateContactWorkerIsRunning_#{user_id}")
end
end

相关内容

  • 没有找到相关文章

最新更新