AWS步骤功能如何处理工人/活动竞赛条件



我正在尝试将Java与带有AWS步骤函数的Spring Boot框架一起使用,作为下面的示例之一。在这里,我们在启动服务的同时用Java设置了一个Runnable线程,并将该线程注册为";工人;线程,它是AWS步骤函数中的一个活动。

这是我的问题:

当注册了同一活动(同一ARN(的多个工作人员时,AWS步骤功能如何处理竞争条件,步骤功能如何避免超过一个工作人员拿起任务并执行重复的工作?在AWS Step Functions配置中,是否有任何设置可以更改,以防止在同一AZ、同一区域的1名以上工人中出现这种竞争情况?


import com.amazonaws.services.stepfunctions.AWSStepFunctions;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskRequest;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskResult;
@Component
class MyActivity implements DisposableBean, Runnable {
private final int WAITS_FOR_ACTIVITY_MILLISECONDS = 500;

public void run() {
while (shouldRun) {
GetActivityTaskResult getActivityTaskResult =
client.getActivityTask(
new GetActivityTaskRequest()
.withActivityArn(config.getActivityArns().getMyActivity()));
String taskToken = getActivityTaskResult.getTaskToken();
if (getActivityTaskResult.getTaskToken() != null) {
try {
// Get input
JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
ActivityInput input =
gson.fromJson(json.toString(), ActivityInput.class);
// do some work

client.sendTaskSuccess(
new SendTaskSuccessRequest()
.withOutput(gson.toJson(output))
.withTaskToken(taskToken));
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
client.sendTaskFailure(
new SendTaskFailureRequest().withTaskToken(taskToken).withError(e.getMessage()));
}
} else {
try {
Thread.sleep(WAITS_FOR_ACTIVITY_MILLISECONDS);
} catch (InterruptedException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
}
}


@Override
public void destroy() {
shouldRun = false;
}
}

多个活动工作者可以轮询同一类型的活动。步骤函数管理多个并发执行的状态机和多个轮询工作的活动工作者之间的多对多关系。

每次活动工作者在感兴趣的活动任务状态下从执行状态机成功轮询工作时,Step Functions都会向活动工作者发送一个唯一的令牌,以及任务状态的JSON输入。

下一个活动工作人员轮询将不会分配此相同的任务。当活动工作者使用ActivityTaskSuccess或ActivityTaskFailure调用Step Functions API时,它将返回结果和令牌。步骤函数使用令牌将结果与相应的状态机相匹配。对于大的工作负载,您可以创建活动工作者的"自动缩放"组,并根据需求缩放您的工作者。

最新更新