Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-4404700

PipedInputStream too slow due to polling (alt implementation proposed)

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Unresolved
    • Icon: P4 P4
    • None
    • 1.3.0, 1.4.0
    • core-libs
    • Fix Understood
    • generic
    • generic



      Name: krC82822 Date: 01/15/2001


      7 Nov 2000, eval1127@eng -- see also # 4221634.
      (filed against kest-linux-fcs)
      java version "1.3.0"
      Java(TM) 2 Runtime Environment, Standard Edition (build 1.3.0)
      Java HotSpot(TM) Client VM (build 1.3.0, mixed mode)

      The java.io.PipedInputStream is too slow because it polls to check for new data.
      Every second it tests if new data is available. When data is available it
      potentially wastes almost a second. It also has an unsettable small buffer.
      I propose to consider the following implementation of both PipedInputStream and
      PipedOutputStream, which is simpler and much faster.

      ---------------------------------------

      import java.io.IOException;
      import java.io.InputStream;



      /**
       * This class is equivalent to <code>java.io.PipedInputStream</code>. In the
       * interface it only adds a constructor which allows for specifying the buffer
       * size. Its implementation, however, is much simpler and a lot more efficient
       * than its equivalent. It doesn't rely on polling. Instead it uses proper
       * synchronization with its counterpart <code>be.re.io.PipedOutputStream</code>.
       *
       * Multiple readers can read from this stream concurrently. The block asked for
       * by a reader is delivered completely, or until the end of the stream if less
       * is available. Other readers can't come in between.
       * @author WD
       */

      public class PipedInputStream extends InputStream

      {

        byte[] buffer;
        boolean closed = false;
        int readLaps = 0;
        int readPosition = 0;
        PipedOutputStream source;
        int writeLaps = 0;
        int writePosition = 0;



        /**
         * Creates an unconnected PipedInputStream with a default buffer size.
         */

        PipedInputStream() throws IOException
        {
          this(null);
        }



        /**
         * Creates a PipedInputStream with a default buffer size and connects it to
         * <code>source</code>.
         * @exception IOException It was already connected.
         */

        public
        PipedInputStream(PipedOutputStream source) throws IOException
        {
          this(source, 0x10000);
        }



        /**
         * Creates a PipedInputStream with buffer size <code>bufferSize</code> and
         * connects it to <code>source</code>.
         * @exception IOException It was already connected.
         */

        public
        PipedInputStream(PipedOutputStream source, int bufferSize) throws IOException
        {
          if (source != null)
          {
            connect(source);
          }

          buffer = new byte[bufferSize];
        }



        public int
        available() throws IOException
        {
          /* The circular buffer is inspected to see where the reader and the writer
           * are located.
           */

          return
            writePosition > readPosition /* The writer is in the same lap. */ ?
              writePosition - readPosition :
              (
                writePosition < readPosition /* The writer is in the next lap. */ ?
                  buffer.length - readPosition + 1 + writePosition :
                  /* The writer is at the same position or a complete lap ahead. */
                  (writeLaps > readLaps ? buffer.length : 0)
              );
        }



        /**
         * @exception IOException The pipe is not connected.
         */

        public void
        close() throws IOException
        {
          if (source == null)
          {
            throw new IOException("Unconnected pipe");
          }

          synchronized (buffer)
          {
            closed = true;
            // Release any pending writers.
            buffer.notifyAll();
          }
        }



        /**
         * @exception IOException The pipe is already connected.
         */

        public void
        connect(PipedOutputStream source) throws IOException
        {
          if (this.source != null)
          {
            throw new IOException("Pipe already connected");
          }

          this.source = source;
          source.sink = this;
        }



        protected void
        finalize() throws Throwable
        {
          close();
        }



        public void
        mark(int readLimit)
        {
          return;
        }



        public boolean
        markSupported()
        {
          return false;
        }



        public int
        read() throws IOException
        {
          byte[] b = new byte[0];
          int result = read(b);

          return result == -1 ? -1 : b[0];
        }



        public int
        read(byte[] b) throws IOException
        {
          return read(b, 0, b.length);
        }



        /**
         * @exception IOException The pipe is not connected.
         */

        public int
        read(byte[] b, int off, int len) throws IOException
        {
          if (source == null)
          {
            throw new IOException("Unconnected pipe");
          }

          synchronized (buffer)
          {
            if (writePosition == readPosition && writeLaps == readLaps)
            {
              if (closed)
              {
                return -1;
              }

              // Wait for any writer to put something in the circular buffer.

              try
              {
                buffer.wait();
              }

              catch (InterruptedException e)
              {
                throw new IOException(e.getMessage());
              }

              // Try again.

              return read(b, off, len);
            }

            // Don't read more than the capacity indicated by len or what's available
            // in the circular buffer.

            int amount =
              Math.min
              (
                len,
                (writePosition > readPosition ? writePosition : buffer.length) -
                  readPosition
              );

            System.arraycopy(buffer, readPosition, b, off, amount);
            readPosition += amount;

            if (readPosition == buffer.length) // A lap was completed, so go back.
            {
              readPosition = 0;
              ++readLaps;
            }

            // The buffer is only released when the complete desired block was
            // obtained.

            if (amount < len)
            {
              int second = read(b, off + amount, len - amount);

              return second == -1 ? amount : amount + second;
            }
            else
            {
              buffer.notifyAll();
            }

            return amount;
          }
        }

      } // PipedInputStream
      ---------------------------------------
      package be.re.io;

      import java.io.IOException;
      import java.io.OutputStream;



      /**
       * This class is equivalent to <code>java.io.PipedOutputStream</code>. In the
       * interface it only adds a constructor which allows for specifying the buffer
       * size. Its implementation, however, is much simpler and a lot more efficient
       * than its equivalent. It doesn't rely on polling. Instead it uses proper
       * synchronization with its counterpart <code>be.re.io.PipedInputStream</code>.
       *
       * Multiple writers can write in this stream concurrently. The block written
       * by a writer is put in completely. Other writers can't come in between.
       * @author WD
       */

      public class PipedOutputStream extends OutputStream

      {

        PipedInputStream sink;



        /**
         * Creates an unconnected PipedOutputStream.
         */

        public
        PipedOutputStream() throws IOException
        {
          this(null);
        }



        /**
         * Creates a PipedOutputStream with a default buffer size and connects it to
         * <code>sink</code>.
         * @exception IOException It was already connected.
         */

        public
        PipedOutputStream(PipedInputStream sink) throws IOException
        {
          this(sink, 0x10000);
        }



        /**
         * Creates a PipedOutputStream with buffer size <code>bufferSize</code> and
         * connects it to <code>sink</code>.
         * @exception IOException It was already connected.
         */

        public
        PipedOutputStream(PipedInputStream sink, int bufferSize) throws IOException
        {
          if (sink != null)
          {
            connect(sink);
            sink.buffer = new byte[bufferSize];
          }
        }



        /**
         * @exception IOException The pipe is not connected.
         */

        public void
        close() throws IOException
        {
          if (sink == null)
          {
            throw new IOException("Unconnected pipe");
          }

          synchronized (sink.buffer)
          {
            sink.closed = true;
            flush();
          }
        }



        /**
         * @exception IOException The pipe is already connected.
         */

        public void
        connect(PipedInputStream sink) throws IOException
        {
          if (this.sink != null)
          {
            throw new IOException("Pipe already connected");
          }

          this.sink = sink;
          sink.source = this;
        }



        protected void
        finalize() throws Throwable
        {
          close();
        }



        public void
        flush() throws IOException
        {
          synchronized (sink.buffer)
          {
            // Release all readers.
            sink.buffer.notifyAll();
          }
        }



        public void
        write(int b) throws IOException
        {
          write(new byte[] {(byte) b});
        }



        public void
        write(byte[] b) throws IOException
        {
          write(b, 0, b.length);
        }



        /**
         * @exception IOException The pipe is not connected or a reader has closed it.
         */

        public void
        write(byte[] b, int off, int len) throws IOException
        {
          if (sink == null)
          {
            throw new IOException("Unconnected pipe");
          }

          if (sink.closed)
          {
            throw new IOException("Broken pipe");
          }

          synchronized (sink.buffer)
          {
            if
            (
              sink.writePosition == sink.readPosition &&
              sink.writeLaps > sink.readLaps
            )
            {
              // The circular buffer is full, so wait for some reader to consume
              // something.

              try
              {
                sink.buffer.wait();
              }

              catch (InterruptedException e)
              {
                throw new IOException(e.getMessage());
              }

              // Try again.

              write(b, off, len);

              return;
            }

            // Don't write more than the capacity indicated by len or the space
            // available in the circular buffer.

            int amount =
              Math.min
              (
                len,
                (
                  sink.writePosition < sink.readPosition ?
                    sink.readPosition : sink.buffer.length
                ) - sink.writePosition
              );

            System.arraycopy(b, off, sink.buffer, sink.writePosition, amount);
            sink.writePosition += amount;

            if (sink.writePosition == sink.buffer.length)
            {
              sink.writePosition = 0;
              ++sink.writeLaps;
            }

            // The buffer is only released when the complete desired block was
            // written.

            if (amount < len)
            {
              write(b, off + amount, len - amount);
            }
            else
            {
              sink.buffer.notifyAll();
            }
          }
        }

      } // PipedOutputStream
      (Review ID: 111902)
      ======================================================================

            Unassigned Unassigned
            kryansunw Kevin Ryan (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

              Created:
              Updated:
              Imported:
              Indexed: