Akka 调度程序:仅在当前运行完成时运行下一个



使用 Akka 调度程序调度作业是这样的(至少从文档中(:

system.scheduler().schedule(
Duration.Zero(),
Duration.create(5, TimeUnit.SECONDS),
workerActor,
new MessageToTheActor(),
system.dispatcher(), ActorRef.noSender());

但是,我不明白如何确保下一次运行仅在当前运行完成时发生。我一直在四处寻找,但没有成功:(

调度程序对于您的用例来说是错误的工具。

另一种选择是 Akka Stream 的Sink.actorRefWithAck(下面的代码改编自链接文档中的示例,并借用了那里定义的实用程序类(。您需要调整工作线程参与者,以处理与流状态相关的一些消息,并使用确认消息进行回复。确认消息用作背压信号,指示参与者已准备好处理下一条MessageToTheActor消息。辅助角色演员将如下所示:

enum Ack {
INSTANCE;
}
static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
private final Throwable cause;
public StreamFailure(Throwable cause) { this.cause = cause; }
public Throwable getCause() { return cause; }
}
public class MyWorker extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(StreamInitialized.class, init -> {
log().info("Stream initialized");
sender().tell(Ack.INSTANCE, self());
})
.match(MessageToTheActor.class, msg -> {
log().info("Received message: {}", msg);
// do something with the message...
sender().tell(Ack.INSTANCE, self());
})
.match(StreamCompleted.class, completed -> {
log().info("Stream completed");
})
.match(StreamFailure.class, failed -> {
log().error(failed.getCause(),"Stream failed!");
})
.build();
}
}

要将Sink.actorRefWithAck与上述执行组件一起使用,请执行以下操作:

final ActorSystem system = ActorSystem.create("MySystem");
final Materializer materializer = ActorMaterializer.create(system);
ActorRef workerActor = system.actorOf(Props.create(MyWorker.class, "worker"));
Source<MessageToTheActor, NotUsed> messages = Source.repeat(new MessageToTheActor());
Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(
workerActor,
new StreamInitialized(),
Ack.INSTANCE,
new StreamCompleted(),
ex -> new StreamFailure(ex)
);
messages.runWith(sink, materializer);

请注意Source.repeat的使用,在这种情况下,它会不断发出MessageToTheActor消息。使用Sink.actorRefWithAck可确保执行组件在处理完当前消息之前不会收到另一条消息。

需要以下导入(显然,Akka Streams 依赖项也是如此(:

import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;

调度程序意味着你想要定期的东西,现在如果你的second运行依赖于你的first运行,那么你为什么还要创建一个调度程序。

只需创建两个演员,one manager actor,另一个child actor

当任务success时,child actor会向parent actor发送成功消息,然后父参与者要求child actor第二次运行该任务。这保证了任务按周期顺序运行,以及当前一个任务成功时运行。

所以基本上,你必须在actors的接收方法中实现相应的匹配案例类。

希望这有帮助!

system.scheduler().schedule(
Duration.Zero(),
Duration.create(5, TimeUnit.SECONDS),
workerActor,
new MessageToTheActor(),
system.dispatcher(), ActorRef.noSender());

上面的代码意味着每 5 秒,调度程序就会向执行组件发送messageworkerActor

如您所知,actor 默认只有一个线程(除非您使用 nr-of-instance>1 进行配置(,这意味着发送到workActor的所有消息都将缓冲在mailbox中,因为只有一个线程可以调用receiveactor的函数。

换句话说,您始终可以确保下一次运行仅在当前运行完成时才发生,因为默认情况下只有一个线程同时为参与者工作。

最新更新