集成测试多个 Celery worker 和一个数据库支持的 Django API



我正在使用一个面向软件的架构,该架构有多个芹菜工人(我们称它们为worker1worker2worker3(。所有三个工作线程都是单独的实体(即,单独的代码库,单独的存储库,单独的芹菜实例,单独的机器(,并且它们都没有连接到Django应用程序。

与这三个worker中的每一个进行通信是基于Django的,MySQL支持的RESTful API。

在开发中,这些服务都在一个流浪的盒子上,每个服务都充当从单独端口运行的单独机器。我们有一个 RabbitMQ 代理来处理所有 Celery 任务。

通过这些服务的典型路径可能如下所示:worker1从设备获取消息,执行一些处理,在worker2上排队一个任务,该任务执行进一步处理并向API发出POST,后者写入MySQL数据库并在worker3上触发任务,该任务执行其他一些处理并向API发出另一个POST,从而导致MySQL写入。

服务通信良好,但每次我们对任何服务进行更改时测试此流程都非常烦人。我真的很想进行一些完整的集成测试(即,从发送给worker1的消息开始并遍历整个链(,但我不确定从哪里开始。我面临的主要问题是这些:

如果我在worker1上排队,我怎么可能知道整个流程何时结束?当我不知道结果是否已经到来时,我如何对结果做出合理的断言?

如何处理数据库设置/拆卸?我想在每次测试结束时删除测试期间所做的所有条目,但是如果我从 Django 应用程序外部开始测试,我不确定如何有效地清除它。手动删除它并在每次测试后重新创建它似乎开销太大。

Celery 允许同步运行任务,因此第一步是: 将整个流程划分为单独的任务、虚假请求和断言结果:

原始流程:

device --- worker1 --- worker2 --- django --- worker3 --- django

一级集成测试:

1.      |- worker1 -|
2.                  |- worker2 -|
3.                              |- django -|
4.                                         |- worker3 -|
5.                                                     |- django -|

对于每个测试,创建假请求或同步调用并断言结果。将这些测试放在相应的存储库中。例如,在 worker 1 的测试中,您可以模拟 worker2 并测试它是否已使用正确的参数调用。然后,在另一个测试中,您将调用 worker2 并模拟请求来检查它是否调用了正确的 API。等等。

测试整个流程将很困难,因为所有任务都是单独的实体。我现在想出的唯一方法是对 worker1 进行一次虚假调用,设置合理的超时并等待数据库中的最终结果。这种测试只会告诉您它是否有效。它不会告诉你,问题在哪里。

要使用完整设置,您可以设置 Celery 结果后端。有关基础知识,请参阅 Celery "后续步骤"文档。

然后,worker1可以报告它传递给worker2的任务句柄。worker2返回的结果将是它传递给worker3的任务 id。worker3返回的结果意味着整个序列已完成,您可以检查结果。结果也可以立即报告这些结果的有趣部分,以使检查更容易。

这在芹菜中可能看起来有点像这样:

worker1_result = mytask.delay(someargs)  # executed by worker1
worker2_result = worker1_result.get()  # waits for worker1 to finish
worker3_result = worker2_result.get()  # waits for worker2 to finish
outcome = worker3_result.get()  # waits for worker3 to finish

(细节可能需要不同;我自己还没有用过这个。我不确定任务结果是否可序列化,因此它们是否适合作为任务函数返回值。

相关内容

  • 没有找到相关文章

最新更新