批量创建记录时的 Sidekiq 作业幂等性



我们如何以幂等方式批量创建记录?

在下面的示例中,如果一切按预期运行,则应创建 100,500 个票证。但是,假设至少有一个作业由于某种未知原因运行两次。

  1. 我们如何保证作业只创建请求的确切数量的票证,而不是更多?
  2. 我们可以在没有任何竞争条件风险的情况下做到这一点吗?

上下文

我正在尝试快速批量创建 100k+ 记录,Sidekiq 最佳实践建议作业应该是幂等的,即它们应该能够运行多次并且最终结果应该相同。

就我而言,我正在执行以下操作:

  • 我正在使用insert_all(Rails 6+)能够非常快速地进行批量创建(它跳过了Rails验证)。
  • 如果任何批处理创建作业无法为其批处理创建所有记录,则该尝试将以原子方式回滚,并且作业将失败(以及以后的重试)。

我们有一个raffles表:

id number_of_tickets_requested

创建新的raffle记录后,我们希望在tickets表中批量创建抽奖券:

id code raffle_id

假设我们刚刚创建了一个新的抽奖活动,number_of_tickets_requested: 100500.

(免责声明:我在示例中对内容进行了硬编码,以使其更易于理解。

到目前为止我的尝试

在抽奖模式中:

MAX_TICKETS_PER_JOB = 1000
after_create :queue_jobs_to_batch_create_tickets
def queue_jobs_to_batch_create_tickets
100.times { BatchCreateTicketsJob.perform_later(raffle, 1000) }
BatchCreateTicketsJob.perform_later(raffle, 500)
end

在 BatchCreateTicketsJob:

def perform(raffle, number_of_tickets_to_create)
BatchCreateTicketsService.call(raffle, number_of_tickets_to_create)
end

在批处理中创建票证服务:

def call
Raffle.transaction do
# Uses insert_all to create all tickets in 1 db query
# It skips Rails validations so is very fast
# It only creates records that pass the db validations
result = Ticket.insert_all(tickets)
unless result.count == number_of_tickets_to_create
raise ActiveRecord::Rollback
end
end
end
private
def tickets
result = []
number_of_tickets_to_create.times { result << new_ticket }
result
end
def new_ticket
{
code: "#{SecureRandom.hex(6)}".upcase,
raffle_id: raffle.id
}
end

作为参考,我最终选择了:

  • with_lock防止竞争条件;
  • 确保原子性的事务;
  • 莱佛士桌上新增tickets_count计数器列,以确保幂等性。
class BatchCreateTicketsService < ApplicationService
attr_reader :raffle, :num_tickets
def initialize(raffle, num_tickets)
@raffle = raffle
@num_tickets = num_tickets
end
def call
raffle.with_lock do
Raffle.transaction do
create_tickets
end
end
end
private
def create_tickets
result = Ticket.insert_all(tickets)
raise StandardError unless result.count == num_tickets
raffle.tickets_count += result.count
raffle.save
end
def tickets
result = []
num_tickets.times { result << new_ticket }
result
end
def new_ticket
{
code: "#{SecureRandom.hex(6)}".upcase,
raffle_id: raffle.id
}
end
end

相关内容

  • 没有找到相关文章

最新更新