如何返回一个信号的处理结果?



特别是如果信号处理需要调用一个/一些活动,我怎么能实现这一点?

我试图返回数据或异常,但它不工作。

信号方法无法返回数据。抛出异常将阻止工作流执行。

常见错误在信号方法中返回数据或抛出异常是错误的——因为信号方法是异步的。处理必须像Kafka处理消息一样,你不能通过返回方法返回结果。

所以下面的代码将NOT工作:

public class SampleWorkflow{
public Result mySignalMethod(SignalRequest req){
Result result = activityStub.execute(req)
if(...){
throw new RuntimeException(...)
}
return result
}
}

你该怎么做

你必须做什么:

  • 确保信号不返回任何东西
  • 使用查询方法返回结果
  • 在信号方法处理中,将结果存储为工作流状态,以便查询返回状态

如果您还使用设计模式将信号请求存储到队列中,并让工作流方法处理信号,则会有额外的好处。这会给你带来一些好处

  • 保证信号处理的FIFO顺序
  • 确保重置工作流不会遇到问题——重置后,信号将被保留并移动到工作流历史的较早位置。有时工作流没有初始化以重放信号。
  • 也使异常处理更容易

查看示例代码中的设计模式:Cadence Java sample/Temporal Java sample

如果我们应用了上面的所有内容,示例代码应该如下所示:
public class SampleWorkflow{
private Queue<SignalRequest> queue = new Queue<>();
private Response<Result> lastSignalResponse;
public void myWorkflowMethod(){
Async.procedure(
() -> {
while (true) {
Workflow.await(() -> !queue.isEmpty());
final SignalRequest req =
queue.poll();
// alternatively, you can use async to start an activity:  
try{
Result result = activityStub.execute(req); 
}catch (ActivityException e){
lastSignalResponse = new Response( e );
}
if(...){
lastSignalResponse = new Response( new RuntimeException(...) );
}else{
lastSignalResponse = new Response( result);  
}
}
});
...
}
public Response myQueryMethod(){
return lastSignalResponse;
}
public Result mySignalMethod(SignalRequest req){
queue.add(req)
}
}

在应用程序代码中,您应该发出信号,然后查询工作流以获得结果:


workflowStub.mySignalMethod(req)
Response response = workflowStub.myQueryMethod()

如果您想使用aysnc活动,请遵循此sample-Cadence/sample-Temporal

  • 信号通过工作流决策任务(Workflow task in Temporal)执行。决策任务不能返回结果。在当前的设计中,没有机制可以让决策任务返回结果给应用程序代码。
  • 在工作流代码中抛出异常将阻塞决策任务或使工作流失败)。
  • 查询方法用于返回结果。——然而,查询不能安排活动或修改工作流状态。
  • 这是一个缺失的部分,让应用程序代码进行同步API调用来更新和返回数据。它需要一个复杂的设计:https://github.com/temporalio/proposals/pull/53

相关内容

  • 没有找到相关文章

最新更新