如何使用sql表行项(或其他替代项)作为多个服务器的队列



我有10台作为ec2实例在aws上运行的ubuntu服务器,以及我需要在每台服务器中运行的参数列表。输出保存到PSQL数据库中。

参数是一个包含1000个项目的列表,如下所示。

args 
arg1
arg2
arg3
..
..
arg1000

我将列表平均分为10个部分,因此每个服务器运行100个参数以减少时间。

因此,server1打开一个具有以下列表的文件,server2将具有参数101-200等。

args 
arg1
arg2
arg3
..
..
arg100

server1打开这样一个函数,

import pandas as pd
from my_functions import my_function, save_return_value_sql_db
df = pd.read_csv(arguments_file.csv)
for idx, row in df.iterrows():
return_value = my_function(row[0])
save_return_value_sql_db(return_value)

它将输出值保存在名为return_values的PSQL表中。

争论需要不同的时间。有时,其中3台服务器比其他7台服务器要多花一个小时。因此,即使每个服务器需要100个自变量,自变量之间也存在不平衡。

所以我想改变一下。

我想在名为states的单独PSQL表中创建一个中心argument_list,当ubuntu服务器遍历该列表(共享同一列表(时,它们将更改另一列中的标志,并移动到下一行或参数。

我担心多台服务器会碰到同一行或参数,并在多台服务器上同时运行参数命令。有没有办法绕过这一点,使10台服务器中的任何一台都不会运行一次以上的参数。

到目前为止,我创建了一个包含2列的sql表,它看起来像这样,

args     state
arg1     0
arg2     0
arg3     0
..
..
arg1000  0

服务器运行这个python脚本,

import sqlalchemy
from my_engines import STATE_ENGINE
from my_functions import my_function, save_return_value_sql_db
arg_list = STATE_ENGINE.execute(f"SELECT * FROM states WHERE state = 0 order by args asc").fetchall()
current_arg = arg_list[0][0]
STATE_ENGINE.execute(f"UPDATE states SET state = 1 WHERE args = '{current_arg}'")
return_value = my_function(current_arg)
save_return_value_sql_db(return_value)

我该如何修改此代码,使10台服务器不会多次运行任何args,或者因为状态已设置为1而崩溃,因为另一台服务器正在尝试执行相同的操作而无法将其设置为1?

有没有其他数据结构或库可以用来实现我想要做的事情,让事情变得更容易?我不知道从哪里开始。我在谷歌上搜索了一下,看到了一些名为rabbitmq和zeromq的东西,但我不知道它们是如何工作的,也不知道它们是否适合。

您可以启动一个事务并使用:

SELECT * FROM states WHERE state = 0
order by args asc
LIMIT 100 -- otherwise the first worker would consume the whole list
FOR NO KEY UPDATE SKIP LOCKED -- if other workers already holds 100 rows, take next 100

以获得廉价的并发机制。

最新更新