使用 Akka 调度程序调度作业是这样的(至少从文档中(:
system.scheduler().schedule(
Duration.Zero(),
Duration.create(5, TimeUnit.SECONDS),
workerActor,
new MessageToTheActor(),
system.dispatcher(), ActorRef.noSender());
但是,我不明白如何确保下一次运行仅在当前运行完成时发生。我一直在四处寻找,但没有成功:(
调度程序对于您的用例来说是错误的工具。
另一种选择是 Akka Stream 的Sink.actorRefWithAck
(下面的代码改编自链接文档中的示例,并借用了那里定义的实用程序类(。您需要调整工作线程参与者,以处理与流状态相关的一些消息,并使用确认消息进行回复。确认消息用作背压信号,指示参与者已准备好处理下一条MessageToTheActor
消息。辅助角色演员将如下所示:
enum Ack {
INSTANCE;
}
static class StreamInitialized {}
static class StreamCompleted {}
static class StreamFailure {
private final Throwable cause;
public StreamFailure(Throwable cause) { this.cause = cause; }
public Throwable getCause() { return cause; }
}
public class MyWorker extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(StreamInitialized.class, init -> {
log().info("Stream initialized");
sender().tell(Ack.INSTANCE, self());
})
.match(MessageToTheActor.class, msg -> {
log().info("Received message: {}", msg);
// do something with the message...
sender().tell(Ack.INSTANCE, self());
})
.match(StreamCompleted.class, completed -> {
log().info("Stream completed");
})
.match(StreamFailure.class, failed -> {
log().error(failed.getCause(),"Stream failed!");
})
.build();
}
}
要将Sink.actorRefWithAck
与上述执行组件一起使用,请执行以下操作:
final ActorSystem system = ActorSystem.create("MySystem");
final Materializer materializer = ActorMaterializer.create(system);
ActorRef workerActor = system.actorOf(Props.create(MyWorker.class, "worker"));
Source<MessageToTheActor, NotUsed> messages = Source.repeat(new MessageToTheActor());
Sink<String, NotUsed> sink = Sink.<String>actorRefWithAck(
workerActor,
new StreamInitialized(),
Ack.INSTANCE,
new StreamCompleted(),
ex -> new StreamFailure(ex)
);
messages.runWith(sink, materializer);
请注意Source.repeat
的使用,在这种情况下,它会不断发出MessageToTheActor
消息。使用Sink.actorRefWithAck
可确保执行组件在处理完当前消息之前不会收到另一条消息。
需要以下导入(显然,Akka Streams 依赖项也是如此(:
import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;
import akka.stream.javadsl.*;
调度程序意味着你想要定期的东西,现在如果你的second
运行依赖于你的first
运行,那么你为什么还要创建一个调度程序。
只需创建两个演员,one manager actor
,另一个child actor
。
当任务success
时,child actor
会向parent actor
发送成功消息,然后父参与者要求child actor
第二次运行该任务。这保证了任务按周期顺序运行,以及当前一个任务成功时运行。
所以基本上,你必须在actors
的接收方法中实现相应的匹配案例类。
希望这有帮助!
system.scheduler().schedule(
Duration.Zero(),
Duration.create(5, TimeUnit.SECONDS),
workerActor,
new MessageToTheActor(),
system.dispatcher(), ActorRef.noSender());
上面的代码意味着每 5 秒,调度程序就会向执行组件发送message
workerActor
。
如您所知,actor 默认只有一个线程(除非您使用 nr-of-instance>1 进行配置(,这意味着发送到workActor
的所有消息都将缓冲在mailbox
中,因为只有一个线程可以调用receive
actor
的函数。
换句话说,您始终可以确保下一次运行仅在当前运行完成时才发生,因为默认情况下只有一个线程同时为参与者工作。