我在使用以下函数时遇到问题。此函数的目的是返回一组记录,如果在 60 秒内调用这些记录(几乎像队列一样(,这些记录将不会再次返回。
当我一次运行这个时,它似乎工作正常,但是当我在我的线程应用程序中使用它时,我看到出现重复项。我是否正确锁定了行?插入临时表时使用 FOR UPDATE 的正确方法是什么?
CREATE OR REPLACE FUNCTION needs_quantities(computer TEXT)
RETURNS TABLE(id BIGINT, listing_id CHARACTER VARYING, asin CHARACTER VARYING, retry_count INT)
LANGUAGE plpgsql
AS $$
BEGIN
CREATE TEMP TABLE temp_needs_quantity ON COMMIT DROP
AS
SELECT
listing.id,
listing.listing_id,
listing.asin,
listing.retry_count
FROM listing
WHERE listing.id IN (
SELECT min(listing.id) AS id
FROM listing
WHERE (listing.quantity_assigned_to IS NULL
--quantity is null
-- and quantity assigned date is at least 60 seconds ago
-- and quantity date is within 2 hours
OR (
quantity IS NULL AND listing.quantity_assigned_date < now_utc() - INTERVAL '60 second'
AND (listing.quantity_date IS NULL OR listing.quantity_date > now_utc() - INTERVAL '2 hour')
)
)
AND listing.retry_count < 10
GROUP BY listing.asin
ORDER BY min(listing.retry_count), min(listing_date)
LIMIT 10
)
FOR UPDATE;
UPDATE listing
SET quantity_assigned_date = now_utc(), quantity_assigned_to = computer
WHERE listing.id IN (SELECT temp_needs_quantity.id
FROM temp_needs_quantity);
RETURN QUERY
SELECT *
FROM temp_needs_quantity
ORDER BY id;
END
$$
你的函数应该像你在出现的第一个线程中一样锁定行listing
。
第二个线程中的问题是这个子选择:
...
WHERE listing.id IN (
SELECT min(listing.id) AS id
FROM listing
...
LIMIT 10
)
不会被行上的锁阻止,即使封闭SELECT ... FOR UPDATE
是。
因此,子选择将很高兴地看到第一个线程UPDATE
之前的旧行版本,然后在封闭SELECT ... FOR UPDATE
中阻塞,直到第一个线程完成。然后它继续再次更新相同的行。
我不确定这是否被视为一个错误——你可能想在 pgsql-general 邮件列表中询问。CTE 最近也出现了类似的问题,请参阅此修复此错误的提交消息。有人可能会争辩说,这是一个类似的案例。
不幸的是,对于像您这样的复杂查询,我想不出比
LOCK TABLE listing IN EXCLUSIVE MODE;
在你开始处理之前,这不是很令人满意。