特别是如果信号处理需要调用一个/一些活动,我怎么能实现这一点?
我试图返回数据或异常,但它不工作。
信号方法无法返回数据。抛出异常将阻止工作流执行。
常见错误在信号方法中返回数据或抛出异常是错误的——因为信号方法是异步的。处理必须像Kafka处理消息一样,你不能通过返回方法返回结果。
所以下面的代码将NOT工作:
public class SampleWorkflow{
public Result mySignalMethod(SignalRequest req){
Result result = activityStub.execute(req)
if(...){
throw new RuntimeException(...)
}
return result
}
}
你该怎么做
你必须做什么:
- 确保信号不返回任何东西
- 使用查询方法返回结果
- 在信号方法处理中,将结果存储为工作流状态,以便查询返回状态
如果您还使用设计模式将信号请求存储到队列中,并让工作流方法处理信号,则会有额外的好处。这会给你带来一些好处
- 保证信号处理的FIFO顺序
- 确保重置工作流不会遇到问题——重置后,信号将被保留并移动到工作流历史的较早位置。有时工作流没有初始化以重放信号。
- 也使异常处理更容易
查看示例代码中的设计模式:Cadence Java sample/Temporal Java sample
如果我们应用了上面的所有内容,示例代码应该如下所示:public class SampleWorkflow{
private Queue<SignalRequest> queue = new Queue<>();
private Response<Result> lastSignalResponse;
public void myWorkflowMethod(){
Async.procedure(
() -> {
while (true) {
Workflow.await(() -> !queue.isEmpty());
final SignalRequest req =
queue.poll();
// alternatively, you can use async to start an activity:
try{
Result result = activityStub.execute(req);
}catch (ActivityException e){
lastSignalResponse = new Response( e );
}
if(...){
lastSignalResponse = new Response( new RuntimeException(...) );
}else{
lastSignalResponse = new Response( result);
}
}
});
...
}
public Response myQueryMethod(){
return lastSignalResponse;
}
public Result mySignalMethod(SignalRequest req){
queue.add(req)
}
}
在应用程序代码中,您应该发出信号,然后查询工作流以获得结果:
workflowStub.mySignalMethod(req)
Response response = workflowStub.myQueryMethod()
如果您想使用aysnc活动,请遵循此sample-Cadence/sample-Temporal