我有一个客户端和服务器通过Spring远程处理(使用Java序列化(通过专有的消息传递系统进行通信。我的服务器返回大对象,因此我的 Spring 远程处理实现将序列化的对象字节数组拆分为块,并发送多条消息。客户端等待给定请求的所有响应消息,并最终调用下面的方法将字节数组反序列化为结果对象。
protected Object deserialize(List<byte[]> blocks) {
try {
ByteArrayOutputStream os = new ByteArrayOutputStream(blocks.size() * blockSize);
for (byte[] b : blocks) {
os.write(b, 0, b.length);
}
ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray());
ObjectInputStream objInputStream = new ObjectInputStream(is);
return objInputStream.readObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
这非常有效。但是,它的内存非常沉重。假设内存中的对象与内存中的序列化字节数组的大小大致相同,我最终得到的大约是内存中对象大小的 3 倍:
- 包含块的
List<byte[]>
- 包含串联字节数组的
ByteArrayOutputStream
(可能还有另一个,因为ByteArrayOutputStream.toByteArray()
复制数组(。 - 生成的对象
此方法返回后,所有数组都可以进行 GC'd,但在此方法调用期间,内存使用量激增。
那么,对于我的问题:有没有办法可以创建一个阻塞字节输入流,我可以在接收字节数组时将字节数组附加到它们?ObjectOutputStream 将(在单独的线程中(读取可用字节,然后阻止直到写入更多字节,并继续直到对象完全反序列化。这样,我就不必在内存中拥有完整的串联字节数组。似乎没有一个标准的流实现适合,我看不出我如何使用 NIO 来做到这一点,如果有一个足够的话,我宁愿不编写自己的流实现。
非常感谢,伊恩
实现您自己的输入流以减少数组开销
protected Object deserialize(final List<byte[]> blocks) {
try {
ObjectInputStream objInputStream = new ObjectInputStream(InputStream(){
Iterator<byte[]> it=blocks.iterator();
byte[] curr;
int ind;
public int read(){
if(curr==null||curr.length==ind){
if(!it.hasNext())return -1;//or use a blocking queue and pop
curr=it.next();
ind=0;
}
return curr[ind++];
}
});
return objInputStream.readObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
当然,为了提高效率,您也应该覆盖read(byte[],int,int)
但是如果速度慢一点,这将起作用
或者,您可以使用PipedInputStream
和PipedOutputStream
组合来实现您真正想要的东西。输入流将阻塞,直到它有内容要读取
为了完整起见,下面是我对客户端的新(测试(实现,该客户端在服务器块进入对象时反序列化,使用 Piped 流@rachetfreak建议。谢谢!
public static class Client implements Runnable {
private final PipedInputStream deserializationInputStream = new PipedInputStream(BLOCK_SIZE);
private final PipedOutputStream deserializationOutputStream;
public Client() throws IOException {
deserializationOutputStream = new PipedOutputStream(deserializationInputStream);
}
/** Called by messaging system when a message is received */
public void onReceive(byte[] block) throws Exception {
deserializationOutputStream.write(block);
}
public Object readObject() throws Exception {
ObjectInputStream objectInputStream = new ObjectInputStream(deserializationInputStream);
Object readObject = objectInputStream.readObject();
objectInputStream.close();
return readObject;
}
@Override
public void run() {
try {
Object readObject = readObject();
System.out.println("read: " + readObject);
} catch (Exception e) {
e.printStackTrace();
}
}
}