如何在不放入一个大数组的情况下反序列化字节数组块



我有一个客户端和服务器通过Spring远程处理(使用Java序列化(通过专有的消息传递系统进行通信。我的服务器返回大对象,因此我的 Spring 远程处理实现将序列化的对象字节数组拆分为块,并发送多条消息。客户端等待给定请求的所有响应消息,并最终调用下面的方法将字节数组反序列化为结果对象。

protected Object deserialize(List<byte[]> blocks) {
    try {
        ByteArrayOutputStream os = new ByteArrayOutputStream(blocks.size() * blockSize);
        for (byte[] b : blocks) {
            os.write(b, 0, b.length);
        }
        ByteArrayInputStream is = new ByteArrayInputStream(os.toByteArray());
        ObjectInputStream objInputStream = new ObjectInputStream(is);
        return objInputStream.readObject();
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

这非常有效。但是,它的内存非常沉重。假设内存中的对象与内存中的序列化字节数组的大小大致相同,我最终得到的大约是内存中对象大小的 3 倍:

  1. 包含块的List<byte[]>
  2. 包含串联字节数组的ByteArrayOutputStream(可能还有另一个,因为ByteArrayOutputStream.toByteArray()复制数组(。
  3. 生成的对象

此方法返回后,所有数组都可以进行 GC'd,但在此方法调用期间,内存使用量激增。

那么,对于我的问题:有没有办法可以创建一个阻塞字节输入流,我可以在接收字节数组时将字节数组附加到它们?ObjectOutputStream 将(在单独的线程中(读取可用字节,然后阻止直到写入更多字节,并继续直到对象完全反序列化。这样,我就不必在内存中拥有完整的串联字节数组。似乎没有一个标准的流实现适合,我看不出我如何使用 NIO 来做到这一点,如果有一个足够的话,我宁愿不编写自己的流实现。

非常感谢,伊恩

实现您自己的输入流以减少数组开销

protected Object deserialize(final List<byte[]> blocks) {
    try {
       ObjectInputStream objInputStream = new ObjectInputStream(InputStream(){
            Iterator<byte[]> it=blocks.iterator();
            byte[] curr;
            int ind;
            public int read(){
                if(curr==null||curr.length==ind){
                    if(!it.hasNext())return -1;//or use a blocking queue and pop
                    curr=it.next();
                    ind=0;
                }
                return curr[ind++];
            }
        });
        return objInputStream.readObject();
    } catch (Exception e) {
        e.printStackTrace();
        return null;
    }
}

当然,为了提高效率,您也应该覆盖read(byte[],int,int)但是如果速度慢一点,这将起作用

或者,您可以使用PipedInputStreamPipedOutputStream组合来实现您真正想要的东西。输入流将阻塞,直到它有内容要读取

为了完整起见,下面是我对客户端的新(测试(实现,该客户端在服务器块进入对象时反序列化,使用 Piped 流@rachetfreak建议。谢谢!

public static class Client implements Runnable {
    private final PipedInputStream deserializationInputStream = new PipedInputStream(BLOCK_SIZE);
    private final PipedOutputStream deserializationOutputStream;
    public Client() throws IOException {
        deserializationOutputStream = new PipedOutputStream(deserializationInputStream);
    }
    /** Called by messaging system when a message is received */
    public void onReceive(byte[] block) throws Exception {
        deserializationOutputStream.write(block);
    }
    public Object readObject() throws Exception {
        ObjectInputStream objectInputStream = new ObjectInputStream(deserializationInputStream);
        Object readObject = objectInputStream.readObject();
        objectInputStream.close();
        return readObject;
    }
    @Override
    public void run() {
        try {
            Object readObject = readObject();
            System.out.println("read: " + readObject);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

相关内容

  • 没有找到相关文章

最新更新