我正在使用一个面向软件的架构,该架构有多个芹菜工人(我们称它们为worker1
,worker2
和worker3
(。所有三个工作线程都是单独的实体(即,单独的代码库,单独的存储库,单独的芹菜实例,单独的机器(,并且它们都没有连接到Django应用程序。
与这三个worker中的每一个进行通信是基于Django的,MySQL支持的RESTful API。
在开发中,这些服务都在一个流浪的盒子上,每个服务都充当从单独端口运行的单独机器。我们有一个 RabbitMQ 代理来处理所有 Celery 任务。
通过这些服务的典型路径可能如下所示:worker1
从设备获取消息,执行一些处理,在worker2
上排队一个任务,该任务执行进一步处理并向API
发出POST,后者写入MySQL数据库并在worker3
上触发任务,该任务执行其他一些处理并向API
发出另一个POST,从而导致MySQL写入。
服务通信良好,但每次我们对任何服务进行更改时测试此流程都非常烦人。我真的很想进行一些完整的集成测试(即,从发送给worker1
的消息开始并遍历整个链(,但我不确定从哪里开始。我面临的主要问题是这些:
如果我在worker1
上排队,我怎么可能知道整个流程何时结束?当我不知道结果是否已经到来时,我如何对结果做出合理的断言?
如何处理数据库设置/拆卸?我想在每次测试结束时删除测试期间所做的所有条目,但是如果我从 Django 应用程序外部开始测试,我不确定如何有效地清除它。手动删除它并在每次测试后重新创建它似乎开销太大。
Celery 允许同步运行任务,因此第一步是: 将整个流程划分为单独的任务、虚假请求和断言结果:
原始流程:
device --- worker1 --- worker2 --- django --- worker3 --- django
一级集成测试:
1. |- worker1 -|
2. |- worker2 -|
3. |- django -|
4. |- worker3 -|
5. |- django -|
对于每个测试,创建假请求或同步调用并断言结果。将这些测试放在相应的存储库中。例如,在 worker 1 的测试中,您可以模拟 worker2 并测试它是否已使用正确的参数调用。然后,在另一个测试中,您将调用 worker2 并模拟请求来检查它是否调用了正确的 API。等等。
测试整个流程将很困难,因为所有任务都是单独的实体。我现在想出的唯一方法是对 worker1 进行一次虚假调用,设置合理的超时并等待数据库中的最终结果。这种测试只会告诉您它是否有效。它不会告诉你,问题在哪里。
要使用完整设置,您可以设置 Celery 结果后端。有关基础知识,请参阅 Celery "后续步骤"文档。
然后,worker1
可以报告它传递给worker2
的任务句柄。worker2
返回的结果将是它传递给worker3
的任务 id。worker3
返回的结果意味着整个序列已完成,您可以检查结果。结果也可以立即报告这些结果的有趣部分,以使检查更容易。
这在芹菜中可能看起来有点像这样:
worker1_result = mytask.delay(someargs) # executed by worker1
worker2_result = worker1_result.get() # waits for worker1 to finish
worker3_result = worker2_result.get() # waits for worker2 to finish
outcome = worker3_result.get() # waits for worker3 to finish
(细节可能需要不同;我自己还没有用过这个。我不确定任务结果是否可序列化,因此它们是否适合作为任务函数返回值。