我遇到了一个问题,其中我有两种方法,第一个呼叫第二个循环中的第二个方法,第二个将Future
创建为:
public class WorkersCoordinator {
private static Logger LOGGER =
LoggerFactory.getLogger(WorkersCoordinator.class);
private final Timeout timeout;
private final ActorSystem system;
private final List<Class<? extends BaseWorker>> workers;
private final Map<Class, ActorRef> refMap;
private final WorkResultPackageQueue pkgQueue;
private final ActorFactory actorFactory;
@Autowired
public WorkersCoordinator(final ApplicationConfiguration config,
final ActorSystem system,
final ActorFactory actorFactory,
final WorkerFactory workerFactory,
final WorkResultPackageQueue pkgQueue) {
timeout = new Timeout(config.getTimeoutInMilliseconds(),
TimeUnit.MILLISECONDS);
this.system = system;
this.actorFactory = actorFactory;
this.pkgQueue = pkgQueue;
refMap = Map.newHashMap();
workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
}
public void delegateWorkers() {
for (Class<? extends BaseWorker> worker : workers) {
if (refMap.containsKey(worker) continue;
sendWork(worker);
}
}
private void sendWork(Class<? extends BaseWorker> worker) {
// GetDataActor extends AbstractActor
ActorRef actorRef = actorFactory.create(GetDataActor.class);
Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
responseRef.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) throws Throwable {
LOGGER.error("Worker {} encountered a problem - cancelling.",
worker.getSimpleName());
if (refMap.containsKey(worker)) {
refMap.remove(worker);
}
}
}, system.dispatcher());
responseRef.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object msg) throws Throwable {
if (msg instanceof WorkResultPackage) {
final WorkResultPackage reportPackage = (WorkResultPackage) msg;
LOGGER.info(
"Received AssetDataPackage from {}, containing {} items",
reportPackage.getWorkReport().getFromWorker().getSimpleName(),
reportPackage.getPkg().getData().size());
pkgQueue.enqueue(reportPackage);
} else {
LOGGER.eror(
"Expected to receive WorkResultPackage Object but received: {}",
msg.getClass());
throw new UnknownObjectReceived(msg);
}
}
}, system.dispatcher());
refMap.put(worker, actorRef);
}
}
问题是,我相信responseRef.onFailure
的封闭不会像我期望的那样行动。如果我将其称为3名工人,其中一名我将其配置为失败,则处理失败,但记录的记录是不确定的,即使该工人将被报告为失败,即使我始终是我标记为失败的人。我是这个技术堆栈的新手(Java,Scala Futures和Akka),以及建立的代码基础,在该代码基础上,我发现了这种已建立的模式,因此我不知道我是否忽略了某些东西或误解Java/Scala Futures的封闭。这里的关键点是,我需要报告哪个工人失败并将其从refMap
中删除,以便不再考虑它。甚至陌生人的事实是,即使有错误的工人被报告为失败,所有工人似乎都可以完成并从refMap
中删除。
更新:在没有运气的答案之后,我做了一些调查,我进行了一些调查,并找到了另一篇文章,回答了Java 8是否支持关闭:
Java 8支持关闭?
简短的答案,我相信确实如此。但是,它谈到了final
或有效的final
变量。因此,我如下更新了代码。希望这能使人们了解关闭的人,以帮助我理解为什么他们不习惯(在C#和JavaScript中)。我只是向sendWork(...)
发布更新,以突出我尝试无济于事的努力。
private void sendWork(Class<? extends BaseWorker> worker) {
// GetDataActor extends AbstractActor
ActorRef actorRef = actorFactory.create(GetDataActor.class);
Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
Consumer<Throwable> onFailureClosure = (ex) -> {
LOGGER.error("Worker {} encountered a problem - cancelling.",
worker.getSimpleName());
if (refMap.containsKey(worker)) {
refMap.remove(worker);
}
}
responseRef.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) throws Throwable {
onFailureClosure.accept(failure);
}
}, system.dispatcher());
responseRef.onSuccess(new OnSuccess<Object>() {
@Override
public void onSuccess(Object msg) throws Throwable {
if (msg instanceof WorkResultPackage) {
final WorkResultPackage reportPackage = (WorkResultPackage) msg;
LOGGER.info(
"Received AssetDataPackage from {}, containing {} items",
reportPackage.getWorkReport().getFromWorker().getSimpleName(),
reportPackage.getPkg().getData().size());
pkgQueue.enqueue(reportPackage);
} else {
LOGGER.eror(
"Expected to receive WorkResultPackage Object but received: {}",
msg.getClass());
throw new UnknownObjectReceived(msg);
}
}
}, system.dispatcher());
refMap.put(worker, actorRef);
}
代码存在一个基本问题,可能会导致您看到的行为:代码正在并发环境中突变数据,而没有任何保障数据。将来可以随时执行未来的回调,并有可能并行运行。将来有多个回调检查和突变相同的数据可能会导致怪异的行为。
Java中典型的方法处理可变数据的并发访问是使用同步和锁。幸运的是,由于您使用了Akka,因此有一种更好的方法。基本上,重构WorkersCoordinator
是演员,并使用顺序消息处理的演员行为安全处理可变状态。
为了进一步简化问题,您可以在这种情况下放弃使用ask
,而是使用tell
在参与者之间进行通信。我猜想在这里使用未来以捕获错误,但是更好地处理错误的方法是Akka的主管策略。也就是说,如果WorkersCoordinator
是演员,并且每个GetDataActor
实例是WorkersCoordinator
的孩子,则后者可以决定如何处理前者中出现的错误。例如,如果在GetDataActor
中抛出异常,则协调员可以决定记录错误,然后停止孩子。
以下是包含上述想法的替代WorkersCoordinator
:
public class WorkersCoordinator extends AbstractActor {
private static Logger LOGGER = ...
private final List<Class<? extends BaseWorker>> workers;
private final Map<ActorRef, Class> refMap;
private final WorkResultPackageQueue pkgQueue;
public WorkersCoordinator(final WorkerFactory workerFactory,
final WorkResultPackageQueue pkgQueue) {
this.pkgQueue = pkgQueue;
this.refMap = Map.newHashMap();
this.workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
}
static Props props(WorkerFactory factory, WorkResultPackageQueue queue) {
return Props.create(WorkersCoordinator.class, factory, queue);
}
static public class Delegate {}
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), DeciderBuilder.
matchAny(t -> {
ActorRef failedChild = getSender();
LOGGER.error("This child failed: {}", failedChild);
refMap.remove(failedChild);
stop();
})
.build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
@Override
public void preStart() {
for (worker : workers) {
ActorRef child = getContext().actorOf(Props.create(GetDataActor.class));
refMap.put(child, worker);
}
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Delegate.class, d -> {
refMap.forEach((actor, msg) -> actor.tell(msg, getSelf()));
})
.match(WorkResultPackage.class, p -> {
LOGGER.info("Received AssetDataPackage from {}, containing {} items",
reportPackage.getWorkReport().getFromWorker().getSimpleName(),
reportPackage.getPkg().getData().size());
pkgQueue.enqueue(p);
ActorRef dataActor = getSender();
refMap.remove(dataActor);
})
.matchAny(
msg -> LOGGER.eror("Expected to receive WorkResultPackage Object but received: {}", msg)
)
.build();
}
}
有关上述代码的一些注释:
- 而不是使用
ActorFactory
,它似乎是某些自定义类,而是使用Props
。 -
refMap
被倒置,因此ActorRef
现在是关键,并且工作类型是值。这使我们可以根据ActorRef
从refMap
中删除条目,这是在儿童演员的成功回复的情况下,并且如果孩子抛出例外。 - 我删除了
@Autowired
注释,以简单起见。此处找到有关参与者依赖注入的更多信息。 - 要创建并启动
WorkersCoordinator
,请调用其props
方法。为了启动这项工作,演员期望一条自定义的Delegate
消息:一旦演员收到此消息,它将通过refMap
迭代,向地图中的每个孩子发送与该孩子相关的工作单位。
WorkerFactory Factory = ...WorkResultPackagequeue队列= ...ActorRef协调器= actorsystem.actorof(workersCoordinator.props(工厂,queue));委托Dowork = new差异();coordinator.tell(dowork,actorref.nosender());