在Cadence/Temporal工作流中处理信号的最佳方式/模式是什么?



当使用文档建议的信号时:

public class MyWorkflow{
public Output myWorkflwMethod(Input input){
...
}
public void mySignalMethod(request){
// do actual processing here. 
...
}
}

我可能会遇到以下问题:

  1. 我想保证FIFO处理一次一个(在相同的信号名称或跨所有信号名称)
  2. 我想处理"比赛条件";
  3. 我想安全地重置工作流。复位后的信号可以在历史早期重新应用
  4. 我想确保工作流程不会在信号被处理之前提前完成
  1. 保证FIFO按顺序一次处理一个
  2. 处理"比赛条件";在信号方法被过早调用的情况下。或者在现实中,使用没有signalWithStart的常规信号,信号可能在工作流准备好处理之前就到来了。
  3. 安全重置工作流。复位后的信号可以在历史早期重新应用
  4. 确保在信号被处理之前工作流不会提前完成
  5. 对于所有信号名的FIFO,为了避免竞争条件,可以使用queue的相同队列来存储所有信号,并使用cast
  6. instance of

这四个是在Cadence/Temporal工作流中使用信号时最常见的错误。

有一个设计模式,你可以应用它来解决所有的问题。

这个想法是为了简化信号处理程序,总是把信号放入队列,工作流方法将启动另一个工作流线程来处理队列。

它是基于样本(Cadence&时间)

Java

public class MyWorkflow{
private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); 
public void mySignalMethod(SignalRequest req){
signalRequestQueue.add(req);
}
public Output myWorkflwMethod(Input input){
//1. do everything necessary/needed before actually processing a signal
...
//2. spin up a workflow thread to process 
Async.procedure(
() -> {
while (true) {
Workflow.await(() -> !signalRequestQueue.isEmpty());
final SignalRequest request = signalRequestQueue.poll();
processSignal(request);
}
});

//3. always wait for queue to be empty before completing/failing/continueAsNew the workflow
Workflow.await(() -> signalRequestQueue.isEmpty());
return output
}
private void processSignal(request){
// do your actual processing here. 
// If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
...
}
}

将现有代码迁移到此模式

你应该使用版本控制来进行迁移。

假设你有这样的代码;

public class MyWorkflow{
public Output myWorkflwMethod(Input input){
...
}
public void mySignalMethod(request){
// do your actual processing here. 
...
}
}

那么你应该像下面这样使用版本控制:

public class MyWorkflow{
private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); 
public void mySignalMethod(SignalRequest req){
int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
if( version == 1){
signalRequestQueue.add(req);
}else{
processSignal(req);
}
}
public Output myWorkflwMethod(Input input){
//1. do everything necessary/needed before actually processing a signal
...
int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);
if( version == 1){
//2. spin up a workflow thread to process 
Async.procedure(
() -> {
while (true) {
Workflow.await(() -> !signalRequestQueue.isEmpty());
final SignalRequest request = signalRequestQueue.poll();
processSignal(request);
}
});
}
//3. always wait for queue to be empty before completing/failing/continueAsNeww the workflow
Workflow.await(() -> signalRequestQueue.isEmpty());
return output
}
private void processSignal(request){
// do your actual processing here. 
// If a process a single signal may take too much time and you don't care about FIFO, you could also start another workflow thread to process signals in parallel.
...
}
}

Golang

Golang SDK没有相同的1/2/3问题。这是因为Golang SDK提供了完全不同的API来处理信号。

没有将信号方法定义为处理程序,Golang SDK需要工作流监听通道来处理信号,这正是这个答案建议在Java中做的。参见信号API的例子。(见节奏/时态)

但是它有问题#4——工作流可能在信号被处理之前就已经完成了。这是Golang SDK的常见错误。

建议总是在完成或继续新建工作流之前耗尽信号通道。请参阅如何在Golang中耗尽信号通道的示例。

这与使用Workflow类似。await在Java中用来等待所有的信号被处理。但是由于通道没有api来获取大小,我们必须使用" default "分支来检查空值。

感谢@Maxim指出了Temporal go sdk中的API——或者,使用"HasPending"API在Temporal go-sdk检查是否所有的信号被消耗。

另外,建议监控unhandledSignal"指标。

最新更新