我目前正在面临一个我很确定有正式名称的问题,但我不知道该搜索网络。我希望,如果我描述了我想到的问题和解决方案,就可以告诉我设计模式的名称(如果有一个与我要描述的匹配的情况(。
基本上,我想要的是一个工作队列:我有多个创建作业(发布者(的客户以及许多处理这些作业(消费者(的工人。现在,我想将出版商创造的作业分发给各种消费者,基本上,使用几乎所有的消息队列都可以在队列中使用负载平衡,例如,例如。使用RabbitMQ甚至MQTT 5。
但是,现在事情变得复杂了...每个工作都指外部实体,假设用户。我想要的是,单个用户的作业会按顺序处理,但是对于多个用户而言。我没有要求用户x的工作总是去工作y,因为无论如何都应依次处理它们。
现在,我可以使用RabbitMQ及其一致的散列交换来解决此问题,但是当新工人进入集群时,我进行了一场数据竞赛,因为RabbitMQ不支持重新安排已经排在队列中的工作。
mqtt 5也不支持这一点:在这里,这个想法被称为"粘性共享订阅",但这不是正式的。它 May 是MQTT 6的一部分,或者可能不是。谁知道。
我还看了NSQ,NAT和其他一些经纪人。他们中的大多数人甚至都不支持这种非常具体的情况,以及那些确实使用一致的哈希(Hoshing(的情况,该方案具有前面提到的数据赛车问题。
现在,如果经纪人不将作业分类为一旦工作到达,则问题将消失,但是如果它会跟踪是否已经处理了特定用户的作业:如果是,则应延迟所有人该用户的其他工作,但是其他用户的所有作业仍应处理。这是Afaics,不可能使用Rabbitmq等人
我很确定我不是唯一一个有用案例的人。我可以考虑用户将视频上传到视频平台,尽管并行处理上传的视频,但单个用户上传的所有视频均已顺序处理。
所以,简短的简短简短:我描述的是以通用名称所知吗?诸如分布式工作队列之类的东西?具有任务亲和力的任务调度程序?还是其他?我尝试了很多术语,但没有成功。这可能意味着没有解决方案,但是正如所说,很难想象我是地球上唯一有这个问题的人。
有什么想法我可以寻找什么?而且:有什么工具可以实施吗?任何协议?
ps:仅使用预定义的路由密钥不是一个选项,因为用户ID(我只是在这里用作化妆示例(基本上是UUID,因此可以有数十亿美元,所以我需要更多的东西动态的。因此,一致的哈希基本上是正确的方法,但是如所说,分布必须按零件而不是预先进行,以避免进行数据竞赛。
时间工作流程能够以最小的努力支持您的用例。
这是满足您要求的稻草人设计:
- 使用用户ID作为工作流ID,将Signal -WithStart请求发送到用户工作流。它要么将信号传递到工作流程,要么首先启动工作流并将信号传递给它。
- 该工作流程的所有请求都由它缓冲。时间段提供了一个艰难的保证,即只有一个具有给定ID的工作流程可以在开放状态下存在。因此,保证所有信号(事件(都可以在属于用户的工作流中进行缓冲。在存在任何过程或下属故障的情况下,时间上将保留工作流程中的所有数据(包括堆栈跟踪和局部变量(。因此无需明确持续
taskQueue
变量。 - 一个内部工作流程循环调度这些请求。
- 当缓冲区为空工作流程时可以完成。
这是在Java中实现它的工作流代码(GO,Typescript和PHP SDK也支持,Python在Alpha中(:
@WorkflowInterface
public interface SerializedExecutionWorkflow {
@WorkflowMethod
void execute();
@SignalMethod
void addTask(Task t);
}
@ActivityInterface
public interface TaskProcessorActivity {
void process(Task poll);
}
public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {
private final Queue<Task> taskQueue = new ArrayDeque<>();
private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);
@Override
public void execute() {
while(!taskQueue.isEmpty()) {
processor.process(taskQueue.poll());
}
}
@Override
public void addTask(Task t) {
taskQueue.add(t);
}
}
,然后通过信号方法进入工作流程的代码:
private void addTask(WorkflowClient cadenceClient, Task task) {
// Set workflowId to userId
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(TASK_QUEUE)
.setWorkflowId(task.getUserId())
.build();
// Use workflow interface stub to start/signal workflow instance
SerializedExecutionWorkflow workflow = temporalClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
BatchRequest request = temporalClient.newSignalWithStartRequest();
request.add(workflow::execute);
request.add(workflow::addTask, task);
temporalClient.signalWithStart(request);
}
暂时性提供了许多其他优点,而不是使用队列进行任务处理。
- 建立了指数级的回程,以无限的到期间隔
- 失败处理。例如,它允许执行一个任务,该任务在配置的间隔期间两个更新都无法成功。
- 支持长期运行的心跳操作
- 能够实施复杂的任务依赖性。例如,在无法恢复的失败(SAGA( 的情况下,实施链接或补偿逻辑的链接
- 对更新的当前状态提供了完全的可见性。例如,当使用队列时,您知道队列中是否有一些消息,并且需要其他DB来跟踪整体进度。记录每个事件的时间。
- 能够取消快速更新的能力。
- 分布式cron支持
请参阅时间编程模型的演示文稿。
我想要的是一个工作队列:我有多个创建作业(发布者(的客户,以及许多处理这些作业(消费者(的工人。现在,我想将出版商创造的作业分发给各种消费者,基本上,使用几乎所有的消息队列都可以在队列中使用负载平衡,例如,例如。使用RabbitMQ甚至MQTT 5。
但是,现在事情变得复杂了...每个工作都指外部实体,假设用户。我想要的是,单个用户的作业会按顺序处理,但是对于多个用户而言。我没有要求用户x的工作总是去工作y,因为无论如何都应依次处理它们。
如果不是这种特殊用例,我对(动态(任务调度[0] [1]进行了调查,几个月前没有浮出水面。
我读到的每种调度算法都有一些其他任务共有的属性,例如优先级,年龄,符合时间,任务名称(以及延长的平均处理时间(。如果您的任务全部链接到用户,则可以构建将user_id
考虑到队列中的任务的调度程序。
但是我想,您不想构建自己的调度程序,无论如何,这都是浪费的,因为从有需求的经验中,现有消息队列允许实现您的要求。
总结您需要的要求:
同时仅执行每个用户一个任务的调度程序。
解决方案是使用分布式锁,例如Redis Distlock,并在任务启动之前定期刷新锁定,并在任务执行期间定期刷新它。如果同一用户的新任务进入并尝试执行,则将无法获得锁定并将重新加入。
这是一个伪代码:
def my_task(user_id, *args, **kwargs):
if app.distlock(user_id, blocking=False):
exec_my_task(user_id, *args, **kwargs)
else:
raise RetryTask()
不要忘记刷新和版本 lock 。
采用类似的方法来强制robots.txt
在爬行者中的每个请求之间延迟。
只要锁碰撞不经常发生,Amirouche所描述的内容就是一个简单的解决方案。如果是这样,您将浪费很多时间在您的工人上抓住他们必须拒绝并重新加入消息的消息。
一个很好地解决此类问题的替代方法是Actor模型/Actor框架。一些示例包括Akka,Orleans,原始骨骼和节奏(上面提到,尽管Candence不仅仅是演员框架(。这些框架可能会变得非常复杂,但其核心可以确保一次处理一个演员的消息,但允许许多演员立即处理(在您的情况下将有每个用户ID的演员(。框架抽象所有消息路由和并发远离您,从而大大简化了实现,并且长期应该更健壮/可扩展。
拥有一个每个实体的处理订单的严重要求是具有挑战性的。
每个已发布任务多长时间?如果它们总是很短,您可以通过哈希分发任务,只需在不损失生产力的情况下改变形状而耗尽工作量的工作库。
如果它们更长时间,也许这太慢了。在这种情况下,您也可能会让工人从快速中央服务(例如REDIS或某些东西(中取出原子咨询锁,以在其执行期间为他们所消费的每个任务的用户_ID带走。该服务也可以通过用户ID范围或您的哪个范围分别可扩展分区。如果在接收任务和执行任务的第一个副作用之间存在足够的差距,那么工人甚至不需要阻止锁定锁的成功,直到锁定为止,因此可能不会显着增加潜伏。争论*可能很少:如果您已经在用户_id上使用某种一致的哈希方案来分发工作,那么它们确实很少,而且只有在工作池拓扑变化时才发生。您至少应该使用哈希分布来确保只有两个工人争夺锁:旧的锁和新的工人。**
如果授予锁的锁定是在先到先得的订单中提供的,并且要求锁比工人池拓扑更改更快(即,一旦从出版商那里获得工作后,工人就会在锁中排队(,这甚至可以为您提供很好的保证,即使拓扑变化也很快。
编辑:
*我最初写了"失败";这不是我的意思。这个想法是,除非拓扑更改,否则此锁定服务几乎永远不会遇到任何锁定争论,因为给定用户的任务始终将正常发送到同一工人。
**另一种可能性:您也只能使用部分工人游泳池排水量提供良好的保证。没有用户级咨询锁,如果您使用一致的散列方案来分发任务,并且可以保持低水标记以完成派遣任务,则可以 defer 启动目标工作者不同的任务比当前最旧的执行任务开始的最古老的工作(即,仅针对分配的工人更改的用户耗尽运行任务(。这是相当多的额外复杂性;如果您可以有效地跟踪低水标记,并且没有长时间的长时间任务,则可能是一个不错的选择,可以让您提供锁定服务。但是,在写作时,我尚不清楚这是否比锁便宜。低水标记通常不便宜地可靠地实施,并且工人在错误的时间死亡可能会延迟整个1/N队列的处理,而不仅仅是改变工人的工人,而不仅仅是其任务是对工人在工作中的用户它死的时间。
apache qpid broker支持一个称为消息组的功能,其中路由密钥与工作者之间的关系是动态的,并且基于当前的流量。
消费订购意味着经纪人将不允许给定组的不止一个消费者的未销量的未经认可的消息。
这意味着只有一个消费者可以在给定时间处理特定组的消息。当消费者确认其所有获得的消息时,经纪人可能会将下一个待处理的消息从该组传递给其他消费者。
这可能可以更好地利用工人:
很好地注意,不同的消息组不会互相阻止交付。例如,假设队列包含来自两个不同消息组的消息 - 例如" a"。和" b&quot" - 它们被升起,以使" a"的消息在" b&quot"的前面。如果" a"是在被客户食用的过程中,然后剩下的" a"消息被阻止,但是" b&quot"的消息被阻止。小组可以被其他消费者使用 - 即使它是"背后"集团" a"在队列中。
与其他经纪人相比,此功能可能以显着的性能价格出现。如今,对QPID并没有太大的兴趣4 5。
编辑:还有其他经纪人也提供此功能:ActiveMQ和ActiveMQ Artemis。Edit2:事实证明"消息组"在ActiveMQ和Artemis中的工作不同 - 将组分配给Worker是静态的(粘性(而不是动态。您需要配置键,而KAFKA将确保所有具有相同键的消息将被顺序处理。
我能够找到有关该讨论的讨论您通过搜索"用类别排序的作业队列" 来描述的一种行为。
不幸的是,看起来他们没有解决您的问题的方法。
有一个先前问题的答案,该问题建议不使用任何形式的消息传播服务来实现订单敏感或对商业逻辑敏感的任务,其原因可能也可能不适用于您'重新做。它还指出了一种似乎可以做您要做的事情的技术,但对于手头的任务可能并不适当。
如果您有粘性的选择,它将以整齐的效率且额外的效率微不足道地解决您的问题。当然,粘性具有自己的失败模式;没有理由认为您会找到实施的实现,这是您要进行的确切权衡。
我假设,因为您在这里提出了一个问题,所以每个用户的顺序性是重要。在您给出的示例中,在处理上传的视频平台上,违反顺序违规并不是什么大不了的。更广泛地说,大多数需要大规模通电负载均衡的工作队列的人不需要 strong 保证处理订单。
。如果您最终需要自己构建东西,那么您将有很多选择。我得到的印象是您期望巨大的吞吐量,高度平行的体系结构和的用户ID碰撞率 。在这种情况下,您可以考虑维护先决条件的列表:
当一项新任务进来时,平衡器会为任何与作业密钥(user_id(的所有程序进行搜索,分配的和尚未分配的作业。
如果有现有匹配项,则将新作业添加到尚未分配的列表中,最古老的作业将其钥匙作为先决条件。
每次工作完成时,工人都需要检查尚未分配的列表,以查看是否刚刚完成了任何人的先决条件。如果这样做,工人可以标记该儿童工作进行分配,或者只是处理孩子的工作本身。
当然,这具有自己的失败模式。您必须进行权衡。
kafka可以帮助您存储一段时间,因此您可以再次对其进行轮询
如果我正确理解您的方案,我相信您所描述的功能与消息会话在Azure Service Bus中的工作方式非常相似。
您基本上将消息的SessionId
属性设置为UserId
,然后将其推入队列。
每个消费者将彼此锁定在会话处理消息上,这些消息将属于同一用户。完成后,消费者可以继续进行下一个可用的会话。
此外,Azure功能最近发布了预览中的服务总线会话支持,但您可以在很少的努力中实现这一切。
不幸的是,我不太熟悉此功能是否存在于其中一个开源替代品中,但我希望这会有所帮助。