如何在 java 中使用所有 50ms 的多线程查询数据



我将每次查询 50 毫秒从机器查询数据。我有一张带有 id 的地图用于查询。对于一个 id,我创建一个线程。最后我有 500 个线程。

for (Map.Entry<String, NodeId[]> entry1 : nodeIds.entrySet()) {
try {
NodeId[] entrNodeIds = entry1.getValue();
new Thread() {
public void run() {
new ReadRegisteredValues().getValue(entry1.getKey(), entrNodeIds[0], client);
};
}.start();
} catch (Exception e) {
fileLogger.error("failed read node: " + entry1.getKey());
}
}

在此之后,我每 50 毫秒读取一次while(true)值的每个线程,如下所示:

public void getValue(String nodeName, NodeId registeredNodeId, UaClient client) {
while (true) {
try {
DataValue value = null;
try {
value = client.readAttribute(registeredNodeId, Attributes.Value);
} catch (ServiceException e1) {
e1.printStackTrace();
} catch (StatusException e1) {
e1.printStackTrace();
}
TeocModel curTeocModel = new TeocModel(value.getSourceTimestamp().getTimeInMillis(), nodeName, value);
KafkaStreamer.getInstance().startStreaming(curTeocModel.toJson());
Thread.sleep(50);
} catch (InterruptedException e) {
fileLogger.error(e.getMessage());
}
}
}

但在这种情况下,我读取的值不平行。这是并行查询数据的最佳方法吗?

知道吗?

您可以使用 ScheduledExecutorService 为您执行调度。

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(parallelism);
for (Map.Entry<String, NodeId[]> entry1 : nodeIds.entrySet()) {
Runnable checkNode = () -> getValue(entry1.getKey(), entrNodeIds[0], client);
scheduler.scheduleAtFixedRate(checkNode, 50, 50, ChronoUnit.MILLIS);
}

为此,请删除 while 循环和getValue中的Thread.sleep()

public void getValue(String nodeName, NodeId registeredNodeId, UaClient client) {
try {
DataValue value = null;
try {
value = client.readAttribute(registeredNodeId, Attributes.Value);
} catch (ServiceException | StatusException e1) {
e1.printStackTrace();
}
TeocModel curTeocModel = new TeocModel(value.getSourceTimestamp().getTimeInMillis(), nodeName, value);
KafkaStreamer.getInstance().startStreaming(curTeocModel.toJson());
} catch (Exception e) {
fileLogger.error(e.getMessage());
}
}

现在,更新将从其中一个线程每 50 毫秒执行一次。请注意,如果更新时间过长并且更新过多,则执行队列将永远增长,您最终会遇到某种资源问题。

最新更新