反应器Java中反应式任务的顺序执行



我正致力于将阻塞顺序编排框架转换为响应式。现在,这些任务是动态的,并通过JSON输入输入到引擎中。引擎提取类并执行run()方法,并保存状态和来自每个任务的响应。
如何在反应器中实现相同的链?如果这是一个静态DAG,我将把它与flatMapthen操作符链接起来,但由于它是动态的,我如何继续执行响应性任务并从每个任务收集输出?

示例:
非反应性接口:

public interface OrchestrationTask {
OrchestrationContext run(IngestionContext ctx);
}

核心引擎

public Status executeDAG(String id) {
IngestionContext ctx = ContextBuilder.getCtx(id);
List<OrchestrationTask> tasks = app.getEligibleTasks(id);
for(OrchestrationTask task : tasks) {
// Eligible tasks are executed sequentially and results are collected.
OrchestrationContext stepContext = task.run(ctx);
if(!evaluateResult(stepContext)) break;
}
return Status.SUCCESS;
}

按照上面的例子,如果我转换任务返回Mono那么,我如何等待或链接其他任务来操作先前任务的结果呢?任何帮助都是感激的。谢谢。

更新::

响应任务示例。

public class SampleTask implements OrchestrationTask {  
@Override
public Mono<OrchestrationContext> run(OrchestrationContext context) {  
// Im simulating a delay here. treat this as a long running task (web call) But the next task needs the response from the below call.
return Mono.just(context).delayElements(Duration.ofSeconds(2));
}

因此,我将有一系列完成各种事情的任务,但每个任务的响应依赖于前一个任务,并存储在业务流程上下文中。任何时候发生错误,业务流程上下文标志将被设置为false,并且通量应该停止。

当然可以。

  • 从任务列表中创建通量(如果适合以响应式方式生成任务列表,则可以直接将该数组列表替换为通量,如果不适合则保持原样);
  • flatMap()每个任务到您的task.run()方法(根据问题现在返回Mono;
  • 确保只在evaluateResult()为true时使用元素;
  • …最后只返回SUCCESS状态。

把这些放在一起,替换掉循环&返回语句:

Flux.fromIterable(tasks)
.flatMap(task -> task.run(ctx))
.takeWhile(stepContext -> evaluateResult(stepContext))
.then(Mono.just(Status.SUCCESS));

(因为我们已经让它反应,你的方法显然需要返回一个Mono<Status>,而不仅仅是Status)

按注释更新-如果你只想让它"一次一个"执行的话与多个并发相比,您可以使用concatMap()而不是flatMap()

相关内容

  • 没有找到相关文章

最新更新