Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-4615008

(so) After some time, selector.select() fails to recognize read events

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Cannot Reproduce
    • Icon: P4 P4
    • None
    • 1.4.0
    • core-libs
    • generic, x86
    • solaris_7, windows_xp



      Name: nt126004 Date: 12/17/2001


      java version "1.4.0-beta3"
      Java(TM) 2 Runtime Environment, Standard Edition (build 1.4.0-beta3-b84)
      Java HotSpot(TM) Client VM (build 1.4.0-beta3-b84, mixed mode)

      I'm including 2 different java source code, one for the server and the other
      for the test client.

      Essentially, after a short period of heavy usage, selector.select() does not
      recognize events (in my case OP_READ). It also does not recognize when the
      client connections have been disconnected. More description is in the java
      source code itself:

      The java server:
      -----------------------------------------------------------------------
      package test;

      import java.io.*;
      import java.net.*;
      import java.util.*;

      import java.nio.*;
      import java.nio.charset.*;
      import java.nio.channels.*;

      /**
       * This test program is implemented using the Proactor Pattern. It has an
       * event pump that will put read events on a queue. A pool of service threads
       * can then pick up the work and service them.
       *
       * Once you are told that there is an event that you expressed an interest in,
      you
       * clear the interestOps to indicate that you are no longer interested in those
       * events. You do that in the postSelect call. The only exception is for the
       * accept context, where you want to be constantly notified when a new
      connection
       * is made to you.
       *
       * Since you can't register on a selector that is currently in a select call,
      I use
       * the wakeup function on the selector to make it wakup and register all the
      pending
       * register calls. I also changed the selector to select for 5 seconds, and
      got rid
       * of the wakeup, and it still exhibited the same problem.
       *
       * The problem is that if you have run the client on this server a couple of
      times,
       * you will notice that the selector.select() call gets hung. It will not
      recognize
       * the read events. It also ignores close events on the client sockets. I
      printed the
       * selectionkeys in the selector and they all indicate an interest in the read
      event
       * (all are valid selectionkeys) and of course one is still interested in the
       * accept call. The accept selectionkey still works. It's the read
      selectionkeys
       * that somehow get messed up.
       *
       * The other problem I notice is that in the TestSelectorClient, lots of
      connect
       * exceptions happen. Not sure why that should be the case since you are
      always selecting
       * on OP_ACCEPT.
       *
       * This was tested on a Windows XP machine (Pentium 4, 1.6 GHz processor). I
      found
       * it easier to have this problem appear if I ran the client on another machine
       * (in my case a Solaris 2.8 box;
       * uname -a: SunOS ss02 5.8 Generic_108528-12 sun4u sparc SUNW,Sun-Blade-100).
       * JDK used was (java -version):
       * java version "1.4.0-beta3"
       * Java(TM) 2 Runtime Environment, Standard Edition (build 1.4.0-beta3-b84)
       * Java HotSpot(TM) Client VM (build 1.4.0-beta3-b84, mixed mode)
       *
       * @see test.TestSelectorClient
       * @author Senthil Supramaniam <###@###.###>
       */

      public class TestSelector implements Runnable {

          Selector selector;
          ServerSocketChannel ssc;
          ByteBuffer b_write;
          List queue = Collections.synchronizedList(new LinkedList());
          List regQueue = Collections.synchronizedList(new LinkedList());

          final static boolean DEBUG = false;

          TestSelector(int p) throws Exception {
      Charset ascii = Charset.forName("US-ASCII");

      CharBuffer chars = CharBuffer.allocate(1024);
      chars.put("Hello There");
      chars.flip();

      ByteBuffer buffer = ascii.newEncoder().encode(chars);
      b_write = ByteBuffer.allocateDirect(buffer.limit());
      b_write.put(buffer);

      selector = Selector.open();

      ssc = ServerSocketChannel.open();
      ssc.configureBlocking(false);

      // InetAddress lh = InetAddress.getByName("localhost");
      InetAddress lh = InetAddress.getLocalHost();
      InetSocketAddress isa = new InetSocketAddress(lh, p);

      ssc.socket().bind(isa, 100);

      Context ctx = new Context(ssc, SelectionKey.OP_ACCEPT, null, null);
      addToRegQueue(ctx);

      new Thread(this).start();

      new Handler();
          }

          /**
           * A unit of work to be done. Only one accept context exists
           */
          class Context {
      SelectableChannel sc;
      int interestOps;
      ByteBuffer b_read;
      ByteBuffer b_write;

      Context(SelectableChannel sc, int interestOps, ByteBuffer b_read,
      ByteBuffer b_write) {
      this.sc = sc;
      this.interestOps = interestOps;
      this.b_read = b_read;
      this.b_write = b_write;
      }

      void setInterest(int op) {
      interestOps = op;
      }

      void register(Selector selector) {
      try {
      SelectionKey key = sc.keyFor(selector);
      if (key == null) {
      sc.register(selector, interestOps, this);
      } else {
      key.interestOps(interestOps);
      }
      } catch (Exception e) {
      e.printStackTrace();
      try { sc.close(); } catch (Exception ee) { }
      }
      }

      void postSelect(SelectionKey sk) {
      // after we are selected, we don't want to be called again unless
      this is
      // an accept context where only one exists
      // in READ or WRITE, I'll reregister if needed
      if ((interestOps & sk.OP_ACCEPT) == 0) {
      sk.interestOps(0);
      }
      }
          }

          class Handler extends Thread {

      Handler() {
      Thread t = new Thread(this);
      t.setName("Handler");
      t.start();
      }

      public void run() {
      try {
      while (true) {
      if (queue.isEmpty()) {
      waitForQueue();
      }
      while (!queue.isEmpty()) {
      Context ctx = (Context) queue.remove(0);
      // what operation were we interested in?
      if ((ctx.interestOps & SelectionKey.OP_ACCEPT) != 0) {
      handleAccept();
      } else if ((ctx.interestOps & SelectionKey.OP_READ) !=
      0) {
      handleRead(ctx);
      } else {
      handleWrite(ctx);
      }
      }
      }
      } catch (Exception e) {
      e.printStackTrace();
      }
      }
          }

          /*
           * Wait till there is some context to service
           */
          synchronized void waitForQueue() {
      try {
      wait();
      } catch (InterruptedException e) {
      e.printStackTrace();
      return;
      }
          }

          /*
           * There is some work to be done
           */
          synchronized void notifyQueue() {
      notify();
          }

          /*
           * Now that selector has woken up, let's register information for it
           */
          void doRegistrations() {
      if (DEBUG) {
      System.out.println("In doRegistrations " + regQueue.size());
      }
      while (!regQueue.isEmpty()) {
      Context ctx = (Context) regQueue.remove(0);
      ctx.register(selector);
      }
          }

          /*
           * Let's add all the context's that we want to have registered
           */
          void addToRegQueue(Context ctx) {
      if (DEBUG) {
      System.out.println("Adding to reg queue " + ctx.interestOps);
      }
      regQueue.add(ctx);

      // can't register a channel or change interestOps until select() call
      restarts again
      selector.wakeup();
          }

          void handleAccept() {
      Context ctx = null;
      try {
      SocketChannel sc = ssc.accept();

      /*
      * REMIND: This happens quite regularly - shouldn't!
      */
      if (sc == null) {
      if (DEBUG) {
      System.out.println("accept returned null");
      }
      return;
      }

      sc.configureBlocking(false);

      ByteBuffer b_read = ByteBuffer.allocateDirect(1024);

      ctx = new Context(sc, SelectionKey.OP_READ, b_read, (ByteBuffer)
      b_write.duplicate().rewind());
      addToRegQueue(ctx);
      } catch (Exception e) {
      e.printStackTrace();
      try {
      if (ctx != null) {
      ctx.sc.close();
      }
      } catch (Exception ee) {}
      }
          }

          void handleRead(Context ctx) {
      if (DEBUG) {
      System.out.println("HandleRead called");
      }
      try {
      ((SocketChannel)ctx.sc).read(ctx.b_read);

      // Now I want to know when it is writable...
      ctx.setInterest(SelectionKey.OP_WRITE);

      addToRegQueue(ctx);
      } catch (Exception e) {
      try {
      ctx.sc.close();
      } catch (Exception ee) {
      }
      }
          }

          void handleWrite(Context ctx) {
      if (DEBUG) {
      System.out.println("HandleWrite called");
      }
      try {
      try {
      ((SocketChannel)ctx.sc).write(ctx.b_write);
      } finally {
      ctx.sc.close(); // should cancel from the selector also
      }
      } catch (Exception e) {
      e.printStackTrace();
      }
          }

          String getOPName(int op) {
      StringBuffer sb = new StringBuffer();
      if ((op & SelectionKey.OP_ACCEPT) != 0) {
      sb.append("OP_ACCEPT |");
      }

      if ((op & SelectionKey.OP_READ) != 0) {
      sb.append("OP_READ |");
      }

      if ((op & SelectionKey.OP_WRITE) != 0) {
      sb.append("OP_WRITE |");
      }

      if ((op & SelectionKey.OP_CONNECT) != 0) {
      sb.append("OP_CONNECT |");
      }

      return sb.toString();
          }

          private void printSelector() {
      try {
      for (Iterator i = selector.keys().iterator(); i.hasNext(); ) {
      SelectionKey sk = (SelectionKey) i.next();
      String app = "NOTVALID SK";
      if (sk.isValid()) {
      app = getOPName(sk.interestOps());
      }
      System.out.println("Going to select on: (" + sk.isValid() + ")
      ops=" + app);
      }
      } catch (Exception e) {
      e.printStackTrace();
      }
          }

          public void run() {
      try {
      while (true) {

      System.out.println("Going to select " + selector.keys().size
      ());
      if (DEBUG) {
      printSelector();
      }
      int n = selector.select();
      System.out.println("Selector returned " + n);

      if (n > 0) {
      Set keys = selector.selectedKeys();
      Iterator i = keys.iterator();
      while (i.hasNext()) {
      SelectionKey sk = (SelectionKey) i.next();
      i.remove();

      Context ctx = (Context) sk.attachment();
      ctx.postSelect(sk);
      queue.add(ctx);
      }

      // notify the queue
      notifyQueue();
      }
      // do all key registrations now
      doRegistrations();
      }
      } catch (Exception e) {
      e.printStackTrace();
      }
          }


          public static void main(String args[]) throws Exception {

      if (args.length == 0) {
      System.err.println("Usage com.secretseal.test.TestSelector port");
      System.exit(1);
      }

      new TestSelector(Integer.parseInt(args[0]));

          }
          
      }
      ----------------------------------------------------------------------------
      The Java client:
      ----------------------------------------------------------------------------
      package test;

      import java.io.*;
      import java.net.*;
      import java.util.*;

      /**
       * The client end for testing TestSelector. It makes connections to the server
      and
       * writes "hello" and reads what it can.
       *
       * @see test.TestSelector
       * @author Senthil Supramaniam <###@###.###>
       */

      public class TestSelectorClient implements Runnable {

          InetAddress addr;
          int port;
          byte buf[] = new byte[1024];
          
          TestSelectorClient(InetAddress addr, int port) {
      this.addr = addr;
      this.port = port;

      new Thread(this).start();
          }

          public void run() {
      try {
      long start = System.currentTimeMillis();
      for (int i = 0; i < 100; i++) {
      try {
      Socket s = new Socket(addr, port);
      try {
      s.setReuseAddress(true);
      InputStream in = s.getInputStream();
      DataOutputStream out = new DataOutputStream(new
      BufferedOutputStream(s.getOutputStream()));

      out.writeBytes("hello");
      out.flush();

      in.read(buf);
      } finally {
      s.close();
      }
      } catch (Exception ee) {
      ee.printStackTrace();
      }

      if ((i % 10) == 0) {
      System.out.println("Finished " + i + " calls");
      }
      }
      System.out.println("Took " + (System.currentTimeMillis() - start)
      + "ms for 100 calls");
      } catch (Exception e) {
      e.printStackTrace();
      }
          }

          private static void usage() {
      System.err.println("Usage com.secretseal.test.TestSelectorClient
      host:port [nThread]");
      System.exit(1);
          }

          public static void main(String args[]) throws Exception {
      InetAddress addr = null;
      int port = 1639;
      int nThreads = 2;

      if (args.length < 1) {
      usage();
      }

      if (args.length > 0) {
      int i = args[0].indexOf(':');
      if (i < 0) {
      usage();
      }
      addr = InetAddress.getByName(args[0].substring(0, i));
      port = Integer.parseInt(args[0].substring(i+1));
      }

      if (args.length > 1) {
      nThreads = Integer.parseInt(args[1]);
      }

      for (int i = 0; i < nThreads; i++) {
      new TestSelectorClient(addr, port);
      }
          }
      }
      (Review ID: 136744)
      ======================================================================

            mmcclosksunw Michael Mccloskey (Inactive)
            nthompsosunw Nathanael Thompson (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

              Created:
              Updated:
              Resolved:
              Imported:
              Indexed: