Name: nt126004 Date: 12/17/2001
java version "1.4.0-beta3"
Java(TM) 2 Runtime Environment, Standard Edition (build 1.4.0-beta3-b84)
Java HotSpot(TM) Client VM (build 1.4.0-beta3-b84, mixed mode)
I'm including 2 different java source code, one for the server and the other
for the test client.
Essentially, after a short period of heavy usage, selector.select() does not
recognize events (in my case OP_READ). It also does not recognize when the
client connections have been disconnected. More description is in the java
source code itself:
The java server:
-----------------------------------------------------------------------
package test;
import java.io.*;
import java.net.*;
import java.util.*;
import java.nio.*;
import java.nio.charset.*;
import java.nio.channels.*;
/**
* This test program is implemented using the Proactor Pattern. It has an
* event pump that will put read events on a queue. A pool of service threads
* can then pick up the work and service them.
*
* Once you are told that there is an event that you expressed an interest in,
you
* clear the interestOps to indicate that you are no longer interested in those
* events. You do that in the postSelect call. The only exception is for the
* accept context, where you want to be constantly notified when a new
connection
* is made to you.
*
* Since you can't register on a selector that is currently in a select call,
I use
* the wakeup function on the selector to make it wakup and register all the
pending
* register calls. I also changed the selector to select for 5 seconds, and
got rid
* of the wakeup, and it still exhibited the same problem.
*
* The problem is that if you have run the client on this server a couple of
times,
* you will notice that the selector.select() call gets hung. It will not
recognize
* the read events. It also ignores close events on the client sockets. I
printed the
* selectionkeys in the selector and they all indicate an interest in the read
event
* (all are valid selectionkeys) and of course one is still interested in the
* accept call. The accept selectionkey still works. It's the read
selectionkeys
* that somehow get messed up.
*
* The other problem I notice is that in the TestSelectorClient, lots of
connect
* exceptions happen. Not sure why that should be the case since you are
always selecting
* on OP_ACCEPT.
*
* This was tested on a Windows XP machine (Pentium 4, 1.6 GHz processor). I
found
* it easier to have this problem appear if I ran the client on another machine
* (in my case a Solaris 2.8 box;
* uname -a: SunOS ss02 5.8 Generic_108528-12 sun4u sparc SUNW,Sun-Blade-100).
* JDK used was (java -version):
* java version "1.4.0-beta3"
* Java(TM) 2 Runtime Environment, Standard Edition (build 1.4.0-beta3-b84)
* Java HotSpot(TM) Client VM (build 1.4.0-beta3-b84, mixed mode)
*
* @see test.TestSelectorClient
* @author Senthil Supramaniam <###@###.###>
*/
public class TestSelector implements Runnable {
Selector selector;
ServerSocketChannel ssc;
ByteBuffer b_write;
List queue = Collections.synchronizedList(new LinkedList());
List regQueue = Collections.synchronizedList(new LinkedList());
final static boolean DEBUG = false;
TestSelector(int p) throws Exception {
Charset ascii = Charset.forName("US-ASCII");
CharBuffer chars = CharBuffer.allocate(1024);
chars.put("Hello There");
chars.flip();
ByteBuffer buffer = ascii.newEncoder().encode(chars);
b_write = ByteBuffer.allocateDirect(buffer.limit());
b_write.put(buffer);
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// InetAddress lh = InetAddress.getByName("localhost");
InetAddress lh = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(lh, p);
ssc.socket().bind(isa, 100);
Context ctx = new Context(ssc, SelectionKey.OP_ACCEPT, null, null);
addToRegQueue(ctx);
new Thread(this).start();
new Handler();
}
/**
* A unit of work to be done. Only one accept context exists
*/
class Context {
SelectableChannel sc;
int interestOps;
ByteBuffer b_read;
ByteBuffer b_write;
Context(SelectableChannel sc, int interestOps, ByteBuffer b_read,
ByteBuffer b_write) {
this.sc = sc;
this.interestOps = interestOps;
this.b_read = b_read;
this.b_write = b_write;
}
void setInterest(int op) {
interestOps = op;
}
void register(Selector selector) {
try {
SelectionKey key = sc.keyFor(selector);
if (key == null) {
sc.register(selector, interestOps, this);
} else {
key.interestOps(interestOps);
}
} catch (Exception e) {
e.printStackTrace();
try { sc.close(); } catch (Exception ee) { }
}
}
void postSelect(SelectionKey sk) {
// after we are selected, we don't want to be called again unless
this is
// an accept context where only one exists
// in READ or WRITE, I'll reregister if needed
if ((interestOps & sk.OP_ACCEPT) == 0) {
sk.interestOps(0);
}
}
}
class Handler extends Thread {
Handler() {
Thread t = new Thread(this);
t.setName("Handler");
t.start();
}
public void run() {
try {
while (true) {
if (queue.isEmpty()) {
waitForQueue();
}
while (!queue.isEmpty()) {
Context ctx = (Context) queue.remove(0);
// what operation were we interested in?
if ((ctx.interestOps & SelectionKey.OP_ACCEPT) != 0) {
handleAccept();
} else if ((ctx.interestOps & SelectionKey.OP_READ) !=
0) {
handleRead(ctx);
} else {
handleWrite(ctx);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/*
* Wait till there is some context to service
*/
synchronized void waitForQueue() {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
}
/*
* There is some work to be done
*/
synchronized void notifyQueue() {
notify();
}
/*
* Now that selector has woken up, let's register information for it
*/
void doRegistrations() {
if (DEBUG) {
System.out.println("In doRegistrations " + regQueue.size());
}
while (!regQueue.isEmpty()) {
Context ctx = (Context) regQueue.remove(0);
ctx.register(selector);
}
}
/*
* Let's add all the context's that we want to have registered
*/
void addToRegQueue(Context ctx) {
if (DEBUG) {
System.out.println("Adding to reg queue " + ctx.interestOps);
}
regQueue.add(ctx);
// can't register a channel or change interestOps until select() call
restarts again
selector.wakeup();
}
void handleAccept() {
Context ctx = null;
try {
SocketChannel sc = ssc.accept();
/*
* REMIND: This happens quite regularly - shouldn't!
*/
if (sc == null) {
if (DEBUG) {
System.out.println("accept returned null");
}
return;
}
sc.configureBlocking(false);
ByteBuffer b_read = ByteBuffer.allocateDirect(1024);
ctx = new Context(sc, SelectionKey.OP_READ, b_read, (ByteBuffer)
b_write.duplicate().rewind());
addToRegQueue(ctx);
} catch (Exception e) {
e.printStackTrace();
try {
if (ctx != null) {
ctx.sc.close();
}
} catch (Exception ee) {}
}
}
void handleRead(Context ctx) {
if (DEBUG) {
System.out.println("HandleRead called");
}
try {
((SocketChannel)ctx.sc).read(ctx.b_read);
// Now I want to know when it is writable...
ctx.setInterest(SelectionKey.OP_WRITE);
addToRegQueue(ctx);
} catch (Exception e) {
try {
ctx.sc.close();
} catch (Exception ee) {
}
}
}
void handleWrite(Context ctx) {
if (DEBUG) {
System.out.println("HandleWrite called");
}
try {
try {
((SocketChannel)ctx.sc).write(ctx.b_write);
} finally {
ctx.sc.close(); // should cancel from the selector also
}
} catch (Exception e) {
e.printStackTrace();
}
}
String getOPName(int op) {
StringBuffer sb = new StringBuffer();
if ((op & SelectionKey.OP_ACCEPT) != 0) {
sb.append("OP_ACCEPT |");
}
if ((op & SelectionKey.OP_READ) != 0) {
sb.append("OP_READ |");
}
if ((op & SelectionKey.OP_WRITE) != 0) {
sb.append("OP_WRITE |");
}
if ((op & SelectionKey.OP_CONNECT) != 0) {
sb.append("OP_CONNECT |");
}
return sb.toString();
}
private void printSelector() {
try {
for (Iterator i = selector.keys().iterator(); i.hasNext(); ) {
SelectionKey sk = (SelectionKey) i.next();
String app = "NOTVALID SK";
if (sk.isValid()) {
app = getOPName(sk.interestOps());
}
System.out.println("Going to select on: (" + sk.isValid() + ")
ops=" + app);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void run() {
try {
while (true) {
System.out.println("Going to select " + selector.keys().size
());
if (DEBUG) {
printSelector();
}
int n = selector.select();
System.out.println("Selector returned " + n);
if (n > 0) {
Set keys = selector.selectedKeys();
Iterator i = keys.iterator();
while (i.hasNext()) {
SelectionKey sk = (SelectionKey) i.next();
i.remove();
Context ctx = (Context) sk.attachment();
ctx.postSelect(sk);
queue.add(ctx);
}
// notify the queue
notifyQueue();
}
// do all key registrations now
doRegistrations();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String args[]) throws Exception {
if (args.length == 0) {
System.err.println("Usage com.secretseal.test.TestSelector port");
System.exit(1);
}
new TestSelector(Integer.parseInt(args[0]));
}
}
----------------------------------------------------------------------------
The Java client:
----------------------------------------------------------------------------
package test;
import java.io.*;
import java.net.*;
import java.util.*;
/**
* The client end for testing TestSelector. It makes connections to the server
and
* writes "hello" and reads what it can.
*
* @see test.TestSelector
* @author Senthil Supramaniam <###@###.###>
*/
public class TestSelectorClient implements Runnable {
InetAddress addr;
int port;
byte buf[] = new byte[1024];
TestSelectorClient(InetAddress addr, int port) {
this.addr = addr;
this.port = port;
new Thread(this).start();
}
public void run() {
try {
long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
try {
Socket s = new Socket(addr, port);
try {
s.setReuseAddress(true);
InputStream in = s.getInputStream();
DataOutputStream out = new DataOutputStream(new
BufferedOutputStream(s.getOutputStream()));
out.writeBytes("hello");
out.flush();
in.read(buf);
} finally {
s.close();
}
} catch (Exception ee) {
ee.printStackTrace();
}
if ((i % 10) == 0) {
System.out.println("Finished " + i + " calls");
}
}
System.out.println("Took " + (System.currentTimeMillis() - start)
+ "ms for 100 calls");
} catch (Exception e) {
e.printStackTrace();
}
}
private static void usage() {
System.err.println("Usage com.secretseal.test.TestSelectorClient
host:port [nThread]");
System.exit(1);
}
public static void main(String args[]) throws Exception {
InetAddress addr = null;
int port = 1639;
int nThreads = 2;
if (args.length < 1) {
usage();
}
if (args.length > 0) {
int i = args[0].indexOf(':');
if (i < 0) {
usage();
}
addr = InetAddress.getByName(args[0].substring(0, i));
port = Integer.parseInt(args[0].substring(i+1));
}
if (args.length > 1) {
nThreads = Integer.parseInt(args[1]);
}
for (int i = 0; i < nThreads; i++) {
new TestSelectorClient(addr, port);
}
}
}
(Review ID: 136744)
======================================================================
- relates to
-
JDK-4644304 (so) Selector.select() randomly stops reporting OP_READ (sol)
-
- Closed
-