Name: krC82822 Date: 01/15/2001
7 Nov 2000, eval1127@eng -- see also # 4221634.
(filed against kest-linux-fcs)
java version "1.3.0"
Java(TM) 2 Runtime Environment, Standard Edition (build 1.3.0)
Java HotSpot(TM) Client VM (build 1.3.0, mixed mode)
The java.io.PipedInputStream is too slow because it polls to check for new data.
Every second it tests if new data is available. When data is available it
potentially wastes almost a second. It also has an unsettable small buffer.
I propose to consider the following implementation of both PipedInputStream and
PipedOutputStream, which is simpler and much faster.
---------------------------------------
import java.io.IOException;
import java.io.InputStream;
/**
* This class is equivalent to <code>java.io.PipedInputStream</code>. In the
* interface it only adds a constructor which allows for specifying the buffer
* size. Its implementation, however, is much simpler and a lot more efficient
* than its equivalent. It doesn't rely on polling. Instead it uses proper
* synchronization with its counterpart <code>be.re.io.PipedOutputStream</code>.
*
* Multiple readers can read from this stream concurrently. The block asked for
* by a reader is delivered completely, or until the end of the stream if less
* is available. Other readers can't come in between.
* @author WD
*/
public class PipedInputStream extends InputStream
{
byte[] buffer;
boolean closed = false;
int readLaps = 0;
int readPosition = 0;
PipedOutputStream source;
int writeLaps = 0;
int writePosition = 0;
/**
* Creates an unconnected PipedInputStream with a default buffer size.
*/
PipedInputStream() throws IOException
{
this(null);
}
/**
* Creates a PipedInputStream with a default buffer size and connects it to
* <code>source</code>.
* @exception IOException It was already connected.
*/
public
PipedInputStream(PipedOutputStream source) throws IOException
{
this(source, 0x10000);
}
/**
* Creates a PipedInputStream with buffer size <code>bufferSize</code> and
* connects it to <code>source</code>.
* @exception IOException It was already connected.
*/
public
PipedInputStream(PipedOutputStream source, int bufferSize) throws IOException
{
if (source != null)
{
connect(source);
}
buffer = new byte[bufferSize];
}
public int
available() throws IOException
{
/* The circular buffer is inspected to see where the reader and the writer
* are located.
*/
return
writePosition > readPosition /* The writer is in the same lap. */ ?
writePosition - readPosition :
(
writePosition < readPosition /* The writer is in the next lap. */ ?
buffer.length - readPosition + 1 + writePosition :
/* The writer is at the same position or a complete lap ahead. */
(writeLaps > readLaps ? buffer.length : 0)
);
}
/**
* @exception IOException The pipe is not connected.
*/
public void
close() throws IOException
{
if (source == null)
{
throw new IOException("Unconnected pipe");
}
synchronized (buffer)
{
closed = true;
// Release any pending writers.
buffer.notifyAll();
}
}
/**
* @exception IOException The pipe is already connected.
*/
public void
connect(PipedOutputStream source) throws IOException
{
if (this.source != null)
{
throw new IOException("Pipe already connected");
}
this.source = source;
source.sink = this;
}
protected void
finalize() throws Throwable
{
close();
}
public void
mark(int readLimit)
{
return;
}
public boolean
markSupported()
{
return false;
}
public int
read() throws IOException
{
byte[] b = new byte[0];
int result = read(b);
return result == -1 ? -1 : b[0];
}
public int
read(byte[] b) throws IOException
{
return read(b, 0, b.length);
}
/**
* @exception IOException The pipe is not connected.
*/
public int
read(byte[] b, int off, int len) throws IOException
{
if (source == null)
{
throw new IOException("Unconnected pipe");
}
synchronized (buffer)
{
if (writePosition == readPosition && writeLaps == readLaps)
{
if (closed)
{
return -1;
}
// Wait for any writer to put something in the circular buffer.
try
{
buffer.wait();
}
catch (InterruptedException e)
{
throw new IOException(e.getMessage());
}
// Try again.
return read(b, off, len);
}
// Don't read more than the capacity indicated by len or what's available
// in the circular buffer.
int amount =
Math.min
(
len,
(writePosition > readPosition ? writePosition : buffer.length) -
readPosition
);
System.arraycopy(buffer, readPosition, b, off, amount);
readPosition += amount;
if (readPosition == buffer.length) // A lap was completed, so go back.
{
readPosition = 0;
++readLaps;
}
// The buffer is only released when the complete desired block was
// obtained.
if (amount < len)
{
int second = read(b, off + amount, len - amount);
return second == -1 ? amount : amount + second;
}
else
{
buffer.notifyAll();
}
return amount;
}
}
} // PipedInputStream
---------------------------------------
package be.re.io;
import java.io.IOException;
import java.io.OutputStream;
/**
* This class is equivalent to <code>java.io.PipedOutputStream</code>. In the
* interface it only adds a constructor which allows for specifying the buffer
* size. Its implementation, however, is much simpler and a lot more efficient
* than its equivalent. It doesn't rely on polling. Instead it uses proper
* synchronization with its counterpart <code>be.re.io.PipedInputStream</code>.
*
* Multiple writers can write in this stream concurrently. The block written
* by a writer is put in completely. Other writers can't come in between.
* @author WD
*/
public class PipedOutputStream extends OutputStream
{
PipedInputStream sink;
/**
* Creates an unconnected PipedOutputStream.
*/
public
PipedOutputStream() throws IOException
{
this(null);
}
/**
* Creates a PipedOutputStream with a default buffer size and connects it to
* <code>sink</code>.
* @exception IOException It was already connected.
*/
public
PipedOutputStream(PipedInputStream sink) throws IOException
{
this(sink, 0x10000);
}
/**
* Creates a PipedOutputStream with buffer size <code>bufferSize</code> and
* connects it to <code>sink</code>.
* @exception IOException It was already connected.
*/
public
PipedOutputStream(PipedInputStream sink, int bufferSize) throws IOException
{
if (sink != null)
{
connect(sink);
sink.buffer = new byte[bufferSize];
}
}
/**
* @exception IOException The pipe is not connected.
*/
public void
close() throws IOException
{
if (sink == null)
{
throw new IOException("Unconnected pipe");
}
synchronized (sink.buffer)
{
sink.closed = true;
flush();
}
}
/**
* @exception IOException The pipe is already connected.
*/
public void
connect(PipedInputStream sink) throws IOException
{
if (this.sink != null)
{
throw new IOException("Pipe already connected");
}
this.sink = sink;
sink.source = this;
}
protected void
finalize() throws Throwable
{
close();
}
public void
flush() throws IOException
{
synchronized (sink.buffer)
{
// Release all readers.
sink.buffer.notifyAll();
}
}
public void
write(int b) throws IOException
{
write(new byte[] {(byte) b});
}
public void
write(byte[] b) throws IOException
{
write(b, 0, b.length);
}
/**
* @exception IOException The pipe is not connected or a reader has closed it.
*/
public void
write(byte[] b, int off, int len) throws IOException
{
if (sink == null)
{
throw new IOException("Unconnected pipe");
}
if (sink.closed)
{
throw new IOException("Broken pipe");
}
synchronized (sink.buffer)
{
if
(
sink.writePosition == sink.readPosition &&
sink.writeLaps > sink.readLaps
)
{
// The circular buffer is full, so wait for some reader to consume
// something.
try
{
sink.buffer.wait();
}
catch (InterruptedException e)
{
throw new IOException(e.getMessage());
}
// Try again.
write(b, off, len);
return;
}
// Don't write more than the capacity indicated by len or the space
// available in the circular buffer.
int amount =
Math.min
(
len,
(
sink.writePosition < sink.readPosition ?
sink.readPosition : sink.buffer.length
) - sink.writePosition
);
System.arraycopy(b, off, sink.buffer, sink.writePosition, amount);
sink.writePosition += amount;
if (sink.writePosition == sink.buffer.length)
{
sink.writePosition = 0;
++sink.writeLaps;
}
// The buffer is only released when the complete desired block was
// written.
if (amount < len)
{
write(b, off + amount, len - amount);
}
else
{
sink.buffer.notifyAll();
}
}
}
} // PipedOutputStream
(Review ID: 111902)
======================================================================
- duplicates
-
JDK-4545831 PipedInputStream performance problems
-
- Closed
-
- relates to
-
JDK-6246639 PipedInputStream.read() too slow - use System.arraycopy for bulk operations
-
- Resolved
-