Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-4545831

PipedInputStream performance problems

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Duplicate
    • Icon: P4 P4
    • None
    • 1.4.0
    • core-libs
    • generic
    • generic



      Name: nt126004 Date: 12/03/2001


      java version "1.4.0-beta3"
      Java(TM) 2 Runtime Environment, Standard Edition (build 1.4.0-beta3-b84)
      Java HotSpot(TM) Client VM (build 1.4.0-beta3-b84, mixed mode)

      I'm using Linux 2.4.16, JVM 1.3.1-b24 mixed mode.

      PipedInputStream seems to poll for data (at a rate of one poll per second) under
      certain specific conditions, this causes I/O throughput to drop to ridiculously
      low levels. See code.

      I've had a look at the source for PipedInputStream and this bug seems
      platform independant. The class blocks for read when the buffer is
      empty and blocks for write then the buffer is full. It blocks by
      calling wait(1000), however a reader will only be woken by a writer who
      encounters a full buffer (or the wait times out) and a writer will only
      be woken by a reader who encounters an empty buffer (or the wait times
      out).

      The code attached to the bug report demonstrates how under certain
      circumstances the waiting thread will not be notified, and hence is only
         woken after a timeout - hence the poor performance.

      I think the correct fix is that when a writer encounters an empty buffer
      it calls notifyAll() to wake any blocked readers, and vice versa for
      readers. Note that you should perform this check after the while ()
      loop - as otherwise you've got a subtle race. You should be able to
      remove the wait() timeout then.


      import java.io.*;

      public class TestPipe
      {
              class ReaderThread extends Thread
              {
                      InputStream stream;

                      public ReaderThread(InputStream stream)
                      {
                              this.stream = stream;
                      }

                      public void run()
                      {
                              try {
                                      synchronized (this) {
                                              notify();
                                      }

                                      log("reading");
                                      stream.read(new byte[1024]);
                                      log("read ok");

                                      synchronized (this) {
                                              notify();
                                      }

                                      log("reading");
                                      stream.read(new byte[1024]);
                                      log("read ok");

                                      synchronized (this) {
                                              notify();
                                      }

                                      log("reading");
                                      stream.read(new byte[1024]);
                                      log("read ok");

                              } catch (IOException ex) {
                                      ex.printStackTrace(System.err);
                              }
                      }
              }


              private static void log(String s)
              {
                      System.err.println(System.currentTimeMillis() + ": " + s);
              }


              public static void main(String[] args)
                      throws IOException, InterruptedException
              {
                      TestPipe t = new TestPipe();

                      t.start();
              }


              public void start()
                      throws IOException, InterruptedException
              {
                      PipedInputStream input = new PipedInputStream();
                      PipedOutputStream output = new PipedOutputStream(input);
                      ReaderThread reader = new ReaderThread(input);

                      reader.start();


                      synchronized (reader) {

                              reader.wait();
                              Thread.sleep(100);

                              log("writing");
                              output.write(new byte[1024]);
                              log("write ok");

                              reader.wait();
                              Thread.sleep(100);

                              log("writing");
                              output.write(new byte[1024]);
                              log("write ok");

                              reader.wait();
                              Thread.sleep(100);

                              log("writing");
                              output.write(new byte[1024]);
                              log("write ok");

                              reader.join();
                      }
              }
      }

      This should take about 300 milliseconds to execute, instead it takes about 3s:

      1006947122242: reading
      1006947122352: writing
      1006947122354: write ok
      1006947123252: read ok
      1006947123252: reading
      1006947123361: writing
      1006947123363: write ok
      1006947124262: read ok
      1006947124262: reading
      1006947124371: writing
      1006947124373: write ok
      1006947125272: read ok


      This, as a colleague put it, "sucks".
      (Review ID: 136427)
      ======================================================================

            kkladkosunw Konstantin Kladko (Inactive)
            nthompsosunw Nathanael Thompson (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

              Created:
              Updated:
              Resolved:
              Imported:
              Indexed: