拥有akka Source.队列并等待作业



我需要一个简单的机制,将"作业"放入例如BlockingQueue中,并以每秒5次的速度处理这些作业。

我希望使用akka Source.queue,但似乎不知道如何设置它,所以当队列为空时,系统会继续工作(因此需要持续监视(,因为它刚刚停止(这就是akka文档中的示例(。以前遇到它时,我最终创建了自己的自定义线程,但在这种情况下,我需要并行处理。

要创建的模式是什么?例如,监视队列的"连续"参与者?我正在使用Java和Play!框架

[编辑]:添加了我的代码,现在似乎可以工作,但似乎很笨拙。为了解释,Actor会收到一个联系具有更新数据的设备的请求。Actor查看"requestQeueu"中是否已经存在(当系统发生更改时,会同时出现多个更新请求(。如果没有,请添加它,然后安排连接(在短时间间隔后,允许在一次通信中包含多个更改(。然后将连接放入Source.queue.

private static final HashMap<Long, String> requestQueue = new HashMap<>();
private static final HashMap<Long, Date> lastDeviceConnection = new HashMap<>();
private final SourceQueueWithComplete<Device> sourceQueue;
private static final int timeBetweenConnectionsToDevice = 750;
private static final int timeToWaitBeforeConnecting = 250;
@Inject
ContactServer(ActorSystem system) {
this.system = system;
int bufferSize = 10;
int elementsToProcess = 5;
sourceQueue =
Source.<Device>queue(bufferSize, OverflowStrategy.backpressure())
.throttle(elementsToProcess, Duration.ofSeconds(1))
.map(device -> {
String fromServer = null;
try {
String thisJson = requestQueue.get(device.getId());
requestQueue.remove(device.getId());
device.refresh();

Socket serviceSocket = new Socket(device.getIpAddress(), 7780);

<!-- do communication -->
out.close();
in.close();
serviceSocket.close();
String data = encoding.decrypt(fromServer);
<!-- do processing -->
} catch (Exception e) {
logger.info("Problem contacting the device (server initiative): " + device.getName(), e);
if (fromServer != null)
logger.warn("The return data of the last error was: " + fromServer);
}
return device;
})
.to(Sink.foreach(x -> logger.debug("Processed: " + x)))
.run(system);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Map.Entry.class, message -> {
final Device device = ((Map.Entry<Device, String>) message).getKey();
final String actionJson = ((Map.Entry<Device, String>) message).getValue();
try {
if (requestQueue.containsKey(device.getId())) {
if (actionJson != null && requestQueue.get(device.getId()) != null) {
requestQueue.put(device.getId(), requestQueue.get(device.getId()) + "," + actionJson);
} else if (actionJson != null) {
requestQueue.put(device.getId(), actionJson);
} else {
return;
}
} else {
requestQueue.put(device.getId(), actionJson);
}
long waitTime = timeToWaitBeforeConnecting;
if ((lastDeviceConnection.containsKey(device.getId()) && lastDeviceConnection.get(device.getId()).after(new Date(new Date().getTime() - timeBetweenConnectionsToDevice)))) {
long timeBeforeNextDeviceConnection = new Date().getTime() - lastDeviceConnection.get(device.getId()).getTime() - timeBetweenConnectionsToDevice;
if(timeBeforeNextDeviceConnection > waitTime) waitTime = timeBeforeNextDeviceConnection;
}
system.scheduler().scheduleOnce(scala.concurrent.duration.Duration.create(waitTime, TimeUnit.MILLISECONDS), () -> {
sourceQueue.offer(device);
},context().dispatcher());
} catch (Exception e) {
logger.info("Exception scheduling the device connection (server initiative): " + device.getName(), e);
} finally {
context().stop(self());
}
}).build();
}

所以,我不确定我是否完全理解您的要求。尤其是在计时方面,因为你使用了两种机制:

  1. 使用调度
  2. 对队列使用节流

我还看到了如何在HashMap中聚合特定设备ID的操作。如果可能的设备Id的数目是"0";"合理";那么我认为你可以利用Akka Streams Api为你做这些事情。

我假设如下:

  1. 您不希望每秒处理超过5条消息
  2. 您不希望连接以每秒为特定设备执行一次以上的操作
  3. 您希望聚合与处理程序开头剪切的代码中相同设备的操作
  4. 所有可能的设备ID都是以千为单位,而不是以百万为单位(请参阅groupBy文档(
sourceQueue =
Source.<Map.Entry<Device, String>>queue(bufferSize,
OverflowStrategy.backpressure())
.throttle(elementsToProcess, Duration.ofSeconds(1))
.groupBy(1000, Map.Entry::getKey) //max 1000 devices
.conflate(
(aggregate, d) ->
new AbstractMap.SimpleEntry<>(d.getKey(),
aggregate.getValue() +
"," + d.getValue()))
.throttle(1, Duration.ofSeconds(1))
.map(deviceEntry -> {
//here goes the connection part, but DON'T use blocking IO, 
//use separate threadpool with mapAsync instead
...
})
.async()
.mergeSubstreams()
.to(Sink.foreach(x -> logger.debug("Processed: " + x)))
.run(system);

然后,在您的收件过程中,您只会拨打队列中的offer。你还应该检查offer()调用的结果,以检查你是否成功,或者你是否受到反压力,在这种情况下,你需要后退或放下物品或缓冲它

最新更新