我正致力于将阻塞顺序编排框架转换为响应式。现在,这些任务是动态的,并通过JSON输入输入到引擎中。引擎提取类并执行run()
方法,并保存状态和来自每个任务的响应。
如何在反应器中实现相同的链?如果这是一个静态DAG,我将把它与flatMap
或then
操作符链接起来,但由于它是动态的,我如何继续执行响应性任务并从每个任务收集输出?
示例:
非反应性接口:
public interface OrchestrationTask {
OrchestrationContext run(IngestionContext ctx);
}
核心引擎
public Status executeDAG(String id) {
IngestionContext ctx = ContextBuilder.getCtx(id);
List<OrchestrationTask> tasks = app.getEligibleTasks(id);
for(OrchestrationTask task : tasks) {
// Eligible tasks are executed sequentially and results are collected.
OrchestrationContext stepContext = task.run(ctx);
if(!evaluateResult(stepContext)) break;
}
return Status.SUCCESS;
}
按照上面的例子,如果我转换任务返回Mono>那么,我如何等待或链接其他任务来操作先前任务的结果呢?任何帮助都是感激的。谢谢。
更新::
响应任务示例。
public class SampleTask implements OrchestrationTask {
@Override
public Mono<OrchestrationContext> run(OrchestrationContext context) {
// Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
return Mono.just(context).delayElements(Duration.ofSeconds(2));
}
因此,我将有一系列完成各种事情的任务,但每个任务的响应依赖于前一个任务,并存储在业务流程上下文中。任何时候发生错误,业务流程上下文标志将被设置为false,并且通量应该停止。
当然可以。
- 从任务列表中创建通量(如果适合以响应式方式生成任务列表,则可以直接将该数组列表替换为通量,如果不适合则保持原样);
flatMap()
每个任务到您的task.run()
方法(根据问题现在返回Mono
;- 确保只在
evaluateResult()
为true时使用元素; - …最后只返回
SUCCESS
状态。
把这些放在一起,替换掉循环&返回语句:
Flux.fromIterable(tasks)
.flatMap(task -> task.run(ctx))
.takeWhile(stepContext -> evaluateResult(stepContext))
.then(Mono.just(Status.SUCCESS));
(因为我们已经让它反应,你的方法显然需要返回一个Mono<Status>
,而不仅仅是Status
)
按注释更新-如果你只想让它"一次一个"执行的话与多个并发相比,您可以使用concatMap()
而不是flatMap()
。