我正在尝试使用管道输入流写入数据。但从线程转储来看,管道输入流似乎有一个锁定。
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream(pos);
FileInputStream fis = null;
GZIPOutputStream gos = null;
byte[] buffer = new byte[1024];
try {
fis = new FileInputStream(file);
gos = new GZIPOutputStream(pos);
int length;
while ((length = fis.read(buffer, 0, 1024)) != -1)
gos.write(buffer, 0, length);
} catch(Exception e){
print("Could not read the file");
}
finally {
try {
fis.close();
gos.close();
}catch (Exception ie){
printException(ie);
}
}
writeObject(pis);
pos.close();
writeobj方法将简单地从流中读取,但是读取方法被锁定。线程转储表示等待管道输入流。
main" prio=10 tid=0x08066000 nid=0x48d2 in Object.wait() [0xb7fd2000..0xb7fd31e8]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0xa5c28be8> (a java.io.PipedInputStream)
at java.io.PipedInputStream.awaitSpace(PipedInputStream.java:257)
at java.io.PipedInputStream.receive(PipedInputStream.java:215)
- locked <0xa5c28be8> (a java.io.PipedInputStream)
at java.io.PipedOutputStream.write(PipedOutputStream.java:132)
at java.util.zip.GZIPOutputStream.finish(GZIPOutputStream.java:95)
at java.util.zip.DeflaterOutputStream.close(DeflaterOutputStream.java:146)
Locked ownable synchronizers:
- None
我真的不确定是谁把它锁起来的。阅读文档,找出锁定调用。但无法弄清楚出了什么问题以及如何克服它。
使用PipedInputStream和PipedOutputStream必须在单独的线程中。
仔细阅读Javadoc:http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html
通常,一个线程从PipedInputStream对象读取数据,另一个线程将数据写入相应的PipedOutputStream。不建议尝试从单个线程使用这两个对象,因为这可能会使线程死锁。
PipedInputStream有一个小的非扩展缓冲区。一旦缓冲区已满,就会写入PipedOutputStream块,直到缓冲的输入被另一个线程读取为止。您不能使用来自同一线程的这两个,因为写入将等待无法发生的读取。
在您的情况下,在写入所有数据之前,您不会读取任何数据,因此解决方案是使用ByteArrayOutputStream和ByteArrayInputStream:
- 将所有数据写入ByteArrayOutputStream
- 完成后,对流调用toByteArray()以检索字节数据
- (可选)创建一个ByteArrayInputStream,其中包含要作为InputStream从中读取的字节数据
我需要一个过滤器来拦截慢速连接,因为我需要尽快关闭DB连接,所以我最初使用Java管道,但当仔细观察它们的实现时,它都是同步的,所以我最终使用一个小缓冲区和Blocking队列创建了自己的QueueInputStream,以在缓冲区满后将其放入队列中,除了LinkedBlockingQueue中使用的锁定条件外,它是无锁的,因为在小缓冲区的帮助下,它应该很便宜,这个类只用于每个实例的单个生产者和消费者:
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;
public class QueueOutputStream extends OutputStream
{
private static final int DEFAULT_BUFFER_SIZE=1024;
private static final byte[] END_SIGNAL=new byte[]{};
private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
private final byte[] buffer;
private boolean closed=false;
private int count=0;
public QueueOutputStream()
{
this(DEFAULT_BUFFER_SIZE);
}
public QueueOutputStream(final int bufferSize)
{
if(bufferSize<=0){
throw new IllegalArgumentException("Buffer size <= 0");
}
this.buffer=new byte[bufferSize];
}
private synchronized void flushBuffer()
{
if(count>0){
final byte[] copy=new byte[count];
System.arraycopy(buffer,0,copy,0,count);
queue.offer(copy);
count=0;
}
}
@Override
public synchronized void write(final int b) throws IOException
{
if(closed){
throw new IllegalStateException("Stream is closed");
}
if(count>=buffer.length){
flushBuffer();
}
buffer[count++]=(byte)b;
}
@Override
public synchronized void write(final byte[] b, final int off, final int len) throws IOException
{
super.write(b,off,len);
}
@Override
public synchronized void close() throws IOException
{
flushBuffer();
queue.offer(END_SIGNAL);
closed=true;
}
public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
{
return executor.submit(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try{
byte[] buffer=queue.take();
while(buffer!=END_SIGNAL){
outputStream.write(buffer);
buffer=queue.take();
}
outputStream.flush();
} catch(Exception e){
close();
throw e;
} finally{
outputStream.close();
}
return null;
}
}
);
}