我们有一个使用 UDP 组播发送日志事件的日志记录系统。事件速率约为 10,000 个事件/秒,平均事件大小约为 2Kb。
NIO 版本的应用程序在每次测试期间都会丢失一小部分事件(大约 12M 中的 ~2000 个事件(。有人在这方面有任何见解吗?
示例代码:没有蔚来汽车:
byte[] buf = new byte[65535];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
try {
while (!Thread.currentThread().isInterrupted()) {
socket.receive(packet);
final byte[] tmpBuffer = new byte[packet.getLength()];
System.arraycopy(packet.getData(), 0, tmpBuffer, 0,
tmpBuffer.length);
insertToNonBlockingQueue(tmpBuffer, packet.getSocketAddress());
}
} catch (Throwable t) {
throw new RuntimeException("Encountered exception in Acceptor", t);
} finally {
Util.closeQuietly(socket);
}
与蔚来汽车:
ByteBuffer inBuffer = ByteBuffer.allocate(65535);
try {
while (!Thread.currentThread().isInterrupted()) {
SocketAddress addr = channel.receive(inBuffer);
inBuffer.flip();
final byte[] tmpBuffer = new byte[inBuffer.limit()];
inBuffer.get(tmpBuffer);
inBuffer.clear();
insertToNonBlockingQueue(tmpBuffer, addr);
}
} catch (ClosedByInterruptException ex) {
log.info("Channel closed by interrupt"); // normal shutdown
} catch (Throwable t) {
throw new RuntimeException("Encountered exception in Acceptor", t);
} finally {
Util.closeQuietly(channel);
}
两个侦听器同时运行,每次非NIO版本捕获所有日志事件,而NIO版本错过一些日志事件。这不是网络问题,因为即使我们将代码切换到计算机上的其他版本,它的行为也是相同的。
您忘记在get()
之后compact()
或clear()
缓冲区。一旦缓冲区填满,此代码将开始丢弃数据包。
DatagramPacket
情况应在每次接收之前重置数据包长度。
实际DatagramPacket
插入队列并在每次接收时使用一个新,或者在 NIO 的情况下合成一个新会更简单。这样你就不需要新的数据结构了。
除了 EJP 所说的之外,您还应该使用直接字节缓冲区作为读取缓冲区,否则套接字将在内部分配一个 DBB,然后从该 DBB 复制到您的 BB 中,然后您将从该 DB 复制到数组中。 即有一个多余的复制操作。
此外,您可能希望将套接字的接收缓冲区配置为可以容纳多个数据包的大小。