A DESCRIPTION OF THE REQUEST :
1.
sun.nio.ch.DatagramChannelImpl#receive(java.nio.ByteBuffer) do extra copes of ByteBuffer when receiving data to direct byte buffer.
It call
bb = Util.getTemporaryDirectBuffer(dst.remaining()); as a result
n = receive(fd, bb);
bb.flip();
dst.put(bb);
2. DatagramChannelImpl use synchronized it perform very slow. I suggest to use ReentrantLock instead of synchronized .
JUSTIFICATION :
If udp socket channel works in non blocking/blocking mode system has bad performance.
Without improvements it can process 160 udp messages per second with patch it grows to 500
I can send patches for DatagramChannelImpl and SocketChannelImpl
---------- BEGIN SOURCE ----------
patch for DatagramChannelImpl
--- D:\work\jdk-trunk\jdk\src\share\classes\sun\nio\ch\DatagramChannelImpl.java
+++ D:\work\jdk8\jdk\src\share\classes\sun\nio\ch\DatagramChannelImpl.java
@@ -25,14 +25,21 @@
package sun.nio.ch;
+import sun.net.ResourceManager;
+
import java.io.FileDescriptor;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
-import java.nio.channels.spi.*;
-import java.util.*;
-import sun.net.ResourceManager;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* An implementation of DatagramChannels.
@@ -66,14 +73,14 @@
private int cachedSenderPort;
// Lock held by current reading or connecting thread
- private final Object readLock = new Object();
+ private final Lock readLock = new ReentrantLock();
// Lock held by current writing or connecting thread
- private final Object writeLock = new Object();
+ private final Lock writeLock = new ReentrantLock();
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
- private final Object stateLock = new Object();
+ private final Lock stateLock = new ReentrantLock();
// -- The following fields are protected by stateLock
@@ -156,29 +163,40 @@
}
public DatagramSocket socket() {
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (socket == null)
socket = DatagramSocketAdaptor.create(this);
return socket;
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getLocalAddress() throws IOException {
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isOpen())
throw new ClosedChannelException();
// Perform security check before returning address
return Net.getRevealedLocalAddress(localAddress);
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
- synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
- return remoteAddress;
+ try {
+ stateLock.lock();
+ {
+ if (!isOpen())
+ throw new ClosedChannelException();
+ return remoteAddress;
+
+ } } finally{
+ stateLock.unlock();
}
}
@@ -191,7 +209,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
@@ -239,6 +258,8 @@
// remaining options don't need any special handling
Net.setSocketOption(fd, Net.UNSPEC, name, value);
return this;
+ } finally {
+ stateLock.unlock();
}
}
@@ -252,7 +273,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
@@ -301,6 +323,8 @@
// no special handling
return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
+ } finally {
+ stateLock.unlock();
}
}
@@ -338,7 +362,8 @@
throw new IllegalArgumentException("Read-only buffer");
if (dst == null)
throw new NullPointerException();
- synchronized (readLock) {
+ try {
+ readLock.lock();
ensureOpen();
// Socket was not bound before attempting receive
if (localAddress() == null)
@@ -358,37 +383,33 @@
if (n == IOStatus.UNAVAILABLE)
return null;
} else {
- bb = Util.getTemporaryDirectBuffer(dst.remaining());
- for (;;) {
+ for (; ; ) {
do {
- n = receive(fd, bb);
+ n = receive(fd, dst);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNAVAILABLE)
return null;
- InetSocketAddress isa = (InetSocketAddress)sender;
+ InetSocketAddress isa = (InetSocketAddress) sender;
try {
security.checkAccept(
- isa.getAddress().getHostAddress(),
- isa.getPort());
+ isa.getAddress().getHostAddress(),
+ isa.getPort());
} catch (SecurityException se) {
// Ignore packet
- bb.clear();
n = 0;
continue;
}
- bb.flip();
- dst.put(bb);
break;
}
}
return sender;
} finally {
- if (bb != null)
- Util.releaseTemporaryDirectBuffer(bb);
readerThread = 0;
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ }finally {
+ readLock.unlock();
}
}
@@ -435,13 +456,15 @@
if (src == null)
throw new NullPointerException();
- synchronized (writeLock) {
+ try{
+ writeLock.lock();
ensureOpen();
InetSocketAddress isa = Net.checkAddress(target);
InetAddress ia = isa.getAddress();
if (ia == null)
throw new IOException("Target address not resolved");
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isConnected()) {
if (target == null)
throw new NullPointerException();
@@ -461,6 +484,8 @@
}
return write(src);
}
+ } finally {
+ stateLock.unlock();
}
int n = 0;
@@ -473,10 +498,13 @@
n = send(fd, src, isa);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (isOpen() && (localAddress == null)) {
localAddress = Net.localAddress(fd);
}
+ } finally {
+ stateLock.unlock();
}
return IOStatus.normalize(n);
} finally {
@@ -484,7 +512,10 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
- }
+ } finally {
+ writeLock.unlock();
+ }
+
}
private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target)
@@ -544,11 +575,15 @@
public int read(ByteBuffer buf) throws IOException {
if (buf == null)
throw new NullPointerException();
- synchronized (readLock) {
- synchronized (stateLock) {
+ try {
+ readLock.lock();
+ try {
+ stateLock.lock();
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
+ } finally {
+ stateLock.unlock();
}
int n = 0;
try {
@@ -565,19 +600,24 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ } finally {
+ readLock.unlock();
}
}
public long read(ByteBuffer[] dsts, int offset, int length)
- throws IOException
- {
+ throws IOException {
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
- synchronized (readLock) {
- synchronized (stateLock) {
+ try {
+ readLock.lock();
+ try {
+ stateLock.lock();
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
+ } finally {
+ stateLock.unlock();
}
long n = 0;
try {
@@ -594,17 +634,26 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ } finally
+
+ {
+ readLock.unlock();
}
}
public int write(ByteBuffer buf) throws IOException {
if (buf == null)
throw new NullPointerException();
- synchronized (writeLock) {
- synchronized (stateLock) {
+ try {
+ writeLock.lock();
+
+ try{
+ stateLock.lock();
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
+ } finally {
+ stateLock.unlock();
}
int n = 0;
try {
@@ -621,6 +670,8 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ } finally {
+ writeLock.unlock();
}
}
@@ -629,11 +680,15 @@
{
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
- synchronized (writeLock) {
- synchronized (stateLock) {
+ try{
+ writeLock.lock();
+ try{
+ stateLock.lock();
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
+ } finally {
+ stateLock.unlock();
}
long n = 0;
try {
@@ -650,6 +705,8 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ } finally {
+ writeLock.unlock();
}
}
@@ -658,67 +715,87 @@
}
public SocketAddress localAddress() {
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
return localAddress;
+ }finally {
+ stateLock.unlock();
}
}
public SocketAddress remoteAddress() {
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
return remoteAddress;
- }
+ }finally {
+ stateLock.unlock();
+ }
+
}
@Override
public DatagramChannel bind(SocketAddress local) throws IOException {
- synchronized (readLock) {
- synchronized (writeLock) {
- synchronized (stateLock) {
- ensureOpen();
- if (localAddress != null)
- throw new AlreadyBoundException();
- InetSocketAddress isa;
- if (local == null) {
- // only Inet4Address allowed with IPv4 socket
- if (family == StandardProtocolFamily.INET) {
- isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
- } else {
- isa = new InetSocketAddress(0);
- }
+ try {
+ readLock.lock();
+ writeLock.lock();
+ try {
+ stateLock.lock();
+ ensureOpen();
+ if (localAddress != null)
+ throw new AlreadyBoundException();
+ InetSocketAddress isa;
+ if (local == null) {
+ // only Inet4Address allowed with IPv4 socket
+ if (family == StandardProtocolFamily.INET) {
+ isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
} else {
- isa = Net.checkAddress(local);
-
- // only Inet4Address allowed with IPv4 socket
- if (family == StandardProtocolFamily.INET) {
- InetAddress addr = isa.getAddress();
- if (!(addr instanceof Inet4Address))
- throw new UnsupportedAddressTypeException();
- }
+ isa = new InetSocketAddress(0);
}
- SecurityManager sm = System.getSecurityManager();
- if (sm != null) {
- sm.checkListen(isa.getPort());
+ } else {
+ isa = Net.checkAddress(local);
+
+ // only Inet4Address allowed with IPv4 socket
+ if (family == StandardProtocolFamily.INET) {
+ InetAddress addr = isa.getAddress();
+ if (!(addr instanceof Inet4Address))
+ throw new UnsupportedAddressTypeException();
}
- Net.bind(family, fd, isa.getAddress(), isa.getPort());
- localAddress = Net.localAddress(fd);
}
- }
- }
- return this;
- }
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkListen(isa.getPort());
+ }
+ Net.bind(family, fd, isa.getAddress(), isa.getPort());
+ localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
+ }
+ return this;
+ } finally {
+ readLock.unlock();
+ writeLock.unlock();
+ }
+
+}
public boolean isConnected() {
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
return (state == ST_CONNECTED);
+ } finally {
+ stateLock.unlock();
}
}
void ensureOpenAndUnconnected() throws IOException { // package-private
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isOpen())
throw new ClosedChannelException();
if (state != ST_UNCONNECTED)
throw new IllegalStateException("Connect already invoked");
+ } finally {
+ stateLock.unlock();
}
}
@@ -726,59 +803,74 @@
public DatagramChannel connect(SocketAddress sa) throws IOException {
int localPort = 0;
- synchronized(readLock) {
- synchronized(writeLock) {
- synchronized (stateLock) {
- ensureOpenAndUnconnected();
- InetSocketAddress isa = Net.checkAddress(sa);
- SecurityManager sm = System.getSecurityManager();
- if (sm != null)
- sm.checkConnect(isa.getAddress().getHostAddress(),
- isa.getPort());
- int n = Net.connect(family,
- fd,
- isa.getAddress(),
- isa.getPort());
- if (n <= 0)
- throw new Error(); // Can't happen
-
- // Connection succeeded; disallow further invocation
- state = ST_CONNECTED;
- remoteAddress = isa;
- sender = isa;
- cachedSenderInetAddress = isa.getAddress();
- cachedSenderPort = isa.getPort();
-
- // set or refresh local address
- localAddress = Net.localAddress(fd);
- }
- }
- }
- return this;
+ try {
+ readLock.lock();
+ writeLock.lock();
+ try {
+ stateLock.lock();
+ ensureOpenAndUnconnected();
+ InetSocketAddress isa = Net.checkAddress(sa);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null)
+ sm.checkConnect(isa.getAddress().getHostAddress(),
+ isa.getPort());
+ int n = Net.connect(family,
+ fd,
+ isa.getAddress(),
+ isa.getPort());
+ if (n <= 0)
+ throw new Error(); // Can't happen
+
+ // Connection succeeded; disallow further invocation
+ state = ST_CONNECTED;
+ remoteAddress = isa;
+ sender = isa;
+ cachedSenderInetAddress = isa.getAddress();
+ cachedSenderPort = isa.getPort();
+
+ // set or refresh local address
+ localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
+ }
+
+ return this;
+ } finally {
+ readLock.unlock();
+ writeLock.unlock();
+ }
+
}
public DatagramChannel disconnect() throws IOException {
- synchronized(readLock) {
- synchronized(writeLock) {
- synchronized (stateLock) {
- if (!isConnected() || !isOpen())
- return this;
- InetSocketAddress isa = remoteAddress;
- SecurityManager sm = System.getSecurityManager();
- if (sm != null)
- sm.checkConnect(isa.getAddress().getHostAddress(),
- isa.getPort());
- boolean isIPv6 = (family == StandardProtocolFamily.INET6);
- disconnect0(fd, isIPv6);
- remoteAddress = null;
- state = ST_UNCONNECTED;
-
- // refresh local address
- localAddress = Net.localAddress(fd);
- }
- }
- }
- return this;
+ try {
+ readLock.lock();
+ writeLock.lock();
+ stateLock.lock();
+
+
+ if (!isConnected() || !isOpen())
+ return this;
+ InetSocketAddress isa = remoteAddress;
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null)
+ sm.checkConnect(isa.getAddress().getHostAddress(),
+ isa.getPort());
+ boolean isIPv6 = (family == StandardProtocolFamily.INET6);
+ disconnect0(fd, isIPv6);
+ remoteAddress = null;
+ state = ST_UNCONNECTED;
+
+ // refresh local address
+ localAddress = Net.localAddress(fd);
+
+ return this;
+ } finally {
+ readLock.unlock();
+ writeLock.unlock();
+ stateLock.unlock();
+ }
+
}
/**
@@ -818,7 +910,8 @@
if (sm != null)
sm.checkMulticast(group);
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isOpen())
throw new ClosedChannelException();
@@ -874,6 +967,8 @@
registry.add(key);
return key;
+ } finally {
+ stateLock.unlock();
}
}
@@ -900,7 +995,8 @@
void drop(MembershipKeyImpl key) {
assert key.channel() == this;
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
if (!key.isValid())
return;
@@ -921,6 +1017,8 @@
key.invalidate();
registry.remove(key);
+ } finally {
+ stateLock.unlock();
}
}
@@ -934,7 +1032,8 @@
assert key.channel() == this;
assert key.sourceAddress() == null;
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
if (!key.isValid())
throw new IllegalStateException("key is no longer valid");
if (source.isAnyLocalAddress())
@@ -960,6 +1059,8 @@
// ancient kernel
throw new UnsupportedOperationException();
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -970,7 +1071,8 @@
assert key.channel() == this;
assert key.sourceAddress() == null;
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
if (!key.isValid())
throw new IllegalStateException("key is no longer valid");
@@ -990,11 +1092,14 @@
// should not happen
throw new AssertionError(ioe);
}
+ } finally {
+ stateLock.unlock();
}
}
protected void implCloseSelectableChannel() throws IOException {
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (state != ST_KILLED)
nd.preClose(fd);
ResourceManager.afterUdpClose();
@@ -1010,11 +1115,14 @@
NativeThread.signal(th);
if (!isRegistered())
kill();
+ } finally {
+ stateLock.unlock();
}
}
public void kill() throws IOException {
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
if (state == ST_KILLED)
return;
if (state == ST_UNINITIALIZED) {
@@ -1024,6 +1132,8 @@
assert !isOpen() && !isRegistered();
nd.close(fd);
state = ST_KILLED;
+ } finally {
+ stateLock.unlock();
}
}
@@ -1080,14 +1190,18 @@
int poll(int events, long timeout) throws IOException {
assert Thread.holdsLock(blockingLock()) && !isBlocking();
- synchronized (readLock) {
+ try{
+ readLock.lock();
int n = 0;
try {
begin();
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isOpen())
return 0;
readerThread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
n = Net.poll(fd, events, timeout);
} finally {
@@ -1095,7 +1209,10 @@
end(n > 0);
}
return n;
- }
+ } finally {
+ readLock.unlock();
+ }
+
}
/**
---------- END SOURCE ----------
CUSTOMER SUBMITTED WORKAROUND :
To use native libs but for cross platform development it is very unexpected
1.
sun.nio.ch.DatagramChannelImpl#receive(java.nio.ByteBuffer) do extra copes of ByteBuffer when receiving data to direct byte buffer.
It call
bb = Util.getTemporaryDirectBuffer(dst.remaining()); as a result
n = receive(fd, bb);
bb.flip();
dst.put(bb);
2. DatagramChannelImpl use synchronized it perform very slow. I suggest to use ReentrantLock instead of synchronized .
JUSTIFICATION :
If udp socket channel works in non blocking/blocking mode system has bad performance.
Without improvements it can process 160 udp messages per second with patch it grows to 500
I can send patches for DatagramChannelImpl and SocketChannelImpl
---------- BEGIN SOURCE ----------
patch for DatagramChannelImpl
--- D:\work\jdk-trunk\jdk\src\share\classes\sun\nio\ch\DatagramChannelImpl.java
+++ D:\work\jdk8\jdk\src\share\classes\sun\nio\ch\DatagramChannelImpl.java
@@ -25,14 +25,21 @@
package sun.nio.ch;
+import sun.net.ResourceManager;
+
import java.io.FileDescriptor;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.*;
-import java.nio.channels.spi.*;
-import java.util.*;
-import sun.net.ResourceManager;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* An implementation of DatagramChannels.
@@ -66,14 +73,14 @@
private int cachedSenderPort;
// Lock held by current reading or connecting thread
- private final Object readLock = new Object();
+ private final Lock readLock = new ReentrantLock();
// Lock held by current writing or connecting thread
- private final Object writeLock = new Object();
+ private final Lock writeLock = new ReentrantLock();
// Lock held by any thread that modifies the state fields declared below
// DO NOT invoke a blocking I/O operation while holding this lock!
- private final Object stateLock = new Object();
+ private final Lock stateLock = new ReentrantLock();
// -- The following fields are protected by stateLock
@@ -156,29 +163,40 @@
}
public DatagramSocket socket() {
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (socket == null)
socket = DatagramSocketAdaptor.create(this);
return socket;
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getLocalAddress() throws IOException {
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isOpen())
throw new ClosedChannelException();
// Perform security check before returning address
return Net.getRevealedLocalAddress(localAddress);
+ } finally {
+ stateLock.unlock();
}
}
@Override
public SocketAddress getRemoteAddress() throws IOException {
- synchronized (stateLock) {
- if (!isOpen())
- throw new ClosedChannelException();
- return remoteAddress;
+ try {
+ stateLock.lock();
+ {
+ if (!isOpen())
+ throw new ClosedChannelException();
+ return remoteAddress;
+
+ } } finally{
+ stateLock.unlock();
}
}
@@ -191,7 +209,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
@@ -239,6 +258,8 @@
// remaining options don't need any special handling
Net.setSocketOption(fd, Net.UNSPEC, name, value);
return this;
+ } finally {
+ stateLock.unlock();
}
}
@@ -252,7 +273,8 @@
if (!supportedOptions().contains(name))
throw new UnsupportedOperationException("'" + name + "' not supported");
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
ensureOpen();
if (name == StandardSocketOptions.IP_TOS) {
@@ -301,6 +323,8 @@
// no special handling
return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
+ } finally {
+ stateLock.unlock();
}
}
@@ -338,7 +362,8 @@
throw new IllegalArgumentException("Read-only buffer");
if (dst == null)
throw new NullPointerException();
- synchronized (readLock) {
+ try {
+ readLock.lock();
ensureOpen();
// Socket was not bound before attempting receive
if (localAddress() == null)
@@ -358,37 +383,33 @@
if (n == IOStatus.UNAVAILABLE)
return null;
} else {
- bb = Util.getTemporaryDirectBuffer(dst.remaining());
- for (;;) {
+ for (; ; ) {
do {
- n = receive(fd, bb);
+ n = receive(fd, dst);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
if (n == IOStatus.UNAVAILABLE)
return null;
- InetSocketAddress isa = (InetSocketAddress)sender;
+ InetSocketAddress isa = (InetSocketAddress) sender;
try {
security.checkAccept(
- isa.getAddress().getHostAddress(),
- isa.getPort());
+ isa.getAddress().getHostAddress(),
+ isa.getPort());
} catch (SecurityException se) {
// Ignore packet
- bb.clear();
n = 0;
continue;
}
- bb.flip();
- dst.put(bb);
break;
}
}
return sender;
} finally {
- if (bb != null)
- Util.releaseTemporaryDirectBuffer(bb);
readerThread = 0;
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ }finally {
+ readLock.unlock();
}
}
@@ -435,13 +456,15 @@
if (src == null)
throw new NullPointerException();
- synchronized (writeLock) {
+ try{
+ writeLock.lock();
ensureOpen();
InetSocketAddress isa = Net.checkAddress(target);
InetAddress ia = isa.getAddress();
if (ia == null)
throw new IOException("Target address not resolved");
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isConnected()) {
if (target == null)
throw new NullPointerException();
@@ -461,6 +484,8 @@
}
return write(src);
}
+ } finally {
+ stateLock.unlock();
}
int n = 0;
@@ -473,10 +498,13 @@
n = send(fd, src, isa);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (isOpen() && (localAddress == null)) {
localAddress = Net.localAddress(fd);
}
+ } finally {
+ stateLock.unlock();
}
return IOStatus.normalize(n);
} finally {
@@ -484,7 +512,10 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
- }
+ } finally {
+ writeLock.unlock();
+ }
+
}
private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target)
@@ -544,11 +575,15 @@
public int read(ByteBuffer buf) throws IOException {
if (buf == null)
throw new NullPointerException();
- synchronized (readLock) {
- synchronized (stateLock) {
+ try {
+ readLock.lock();
+ try {
+ stateLock.lock();
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
+ } finally {
+ stateLock.unlock();
}
int n = 0;
try {
@@ -565,19 +600,24 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ } finally {
+ readLock.unlock();
}
}
public long read(ByteBuffer[] dsts, int offset, int length)
- throws IOException
- {
+ throws IOException {
if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
throw new IndexOutOfBoundsException();
- synchronized (readLock) {
- synchronized (stateLock) {
+ try {
+ readLock.lock();
+ try {
+ stateLock.lock();
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
+ } finally {
+ stateLock.unlock();
}
long n = 0;
try {
@@ -594,17 +634,26 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ } finally
+
+ {
+ readLock.unlock();
}
}
public int write(ByteBuffer buf) throws IOException {
if (buf == null)
throw new NullPointerException();
- synchronized (writeLock) {
- synchronized (stateLock) {
+ try {
+ writeLock.lock();
+
+ try{
+ stateLock.lock();
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
+ } finally {
+ stateLock.unlock();
}
int n = 0;
try {
@@ -621,6 +670,8 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ } finally {
+ writeLock.unlock();
}
}
@@ -629,11 +680,15 @@
{
if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
throw new IndexOutOfBoundsException();
- synchronized (writeLock) {
- synchronized (stateLock) {
+ try{
+ writeLock.lock();
+ try{
+ stateLock.lock();
ensureOpen();
if (!isConnected())
throw new NotYetConnectedException();
+ } finally {
+ stateLock.unlock();
}
long n = 0;
try {
@@ -650,6 +705,8 @@
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
+ } finally {
+ writeLock.unlock();
}
}
@@ -658,67 +715,87 @@
}
public SocketAddress localAddress() {
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
return localAddress;
+ }finally {
+ stateLock.unlock();
}
}
public SocketAddress remoteAddress() {
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
return remoteAddress;
- }
+ }finally {
+ stateLock.unlock();
+ }
+
}
@Override
public DatagramChannel bind(SocketAddress local) throws IOException {
- synchronized (readLock) {
- synchronized (writeLock) {
- synchronized (stateLock) {
- ensureOpen();
- if (localAddress != null)
- throw new AlreadyBoundException();
- InetSocketAddress isa;
- if (local == null) {
- // only Inet4Address allowed with IPv4 socket
- if (family == StandardProtocolFamily.INET) {
- isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
- } else {
- isa = new InetSocketAddress(0);
- }
+ try {
+ readLock.lock();
+ writeLock.lock();
+ try {
+ stateLock.lock();
+ ensureOpen();
+ if (localAddress != null)
+ throw new AlreadyBoundException();
+ InetSocketAddress isa;
+ if (local == null) {
+ // only Inet4Address allowed with IPv4 socket
+ if (family == StandardProtocolFamily.INET) {
+ isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
} else {
- isa = Net.checkAddress(local);
-
- // only Inet4Address allowed with IPv4 socket
- if (family == StandardProtocolFamily.INET) {
- InetAddress addr = isa.getAddress();
- if (!(addr instanceof Inet4Address))
- throw new UnsupportedAddressTypeException();
- }
+ isa = new InetSocketAddress(0);
}
- SecurityManager sm = System.getSecurityManager();
- if (sm != null) {
- sm.checkListen(isa.getPort());
+ } else {
+ isa = Net.checkAddress(local);
+
+ // only Inet4Address allowed with IPv4 socket
+ if (family == StandardProtocolFamily.INET) {
+ InetAddress addr = isa.getAddress();
+ if (!(addr instanceof Inet4Address))
+ throw new UnsupportedAddressTypeException();
}
- Net.bind(family, fd, isa.getAddress(), isa.getPort());
- localAddress = Net.localAddress(fd);
}
- }
- }
- return this;
- }
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkListen(isa.getPort());
+ }
+ Net.bind(family, fd, isa.getAddress(), isa.getPort());
+ localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
+ }
+ return this;
+ } finally {
+ readLock.unlock();
+ writeLock.unlock();
+ }
+
+}
public boolean isConnected() {
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
return (state == ST_CONNECTED);
+ } finally {
+ stateLock.unlock();
}
}
void ensureOpenAndUnconnected() throws IOException { // package-private
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isOpen())
throw new ClosedChannelException();
if (state != ST_UNCONNECTED)
throw new IllegalStateException("Connect already invoked");
+ } finally {
+ stateLock.unlock();
}
}
@@ -726,59 +803,74 @@
public DatagramChannel connect(SocketAddress sa) throws IOException {
int localPort = 0;
- synchronized(readLock) {
- synchronized(writeLock) {
- synchronized (stateLock) {
- ensureOpenAndUnconnected();
- InetSocketAddress isa = Net.checkAddress(sa);
- SecurityManager sm = System.getSecurityManager();
- if (sm != null)
- sm.checkConnect(isa.getAddress().getHostAddress(),
- isa.getPort());
- int n = Net.connect(family,
- fd,
- isa.getAddress(),
- isa.getPort());
- if (n <= 0)
- throw new Error(); // Can't happen
-
- // Connection succeeded; disallow further invocation
- state = ST_CONNECTED;
- remoteAddress = isa;
- sender = isa;
- cachedSenderInetAddress = isa.getAddress();
- cachedSenderPort = isa.getPort();
-
- // set or refresh local address
- localAddress = Net.localAddress(fd);
- }
- }
- }
- return this;
+ try {
+ readLock.lock();
+ writeLock.lock();
+ try {
+ stateLock.lock();
+ ensureOpenAndUnconnected();
+ InetSocketAddress isa = Net.checkAddress(sa);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null)
+ sm.checkConnect(isa.getAddress().getHostAddress(),
+ isa.getPort());
+ int n = Net.connect(family,
+ fd,
+ isa.getAddress(),
+ isa.getPort());
+ if (n <= 0)
+ throw new Error(); // Can't happen
+
+ // Connection succeeded; disallow further invocation
+ state = ST_CONNECTED;
+ remoteAddress = isa;
+ sender = isa;
+ cachedSenderInetAddress = isa.getAddress();
+ cachedSenderPort = isa.getPort();
+
+ // set or refresh local address
+ localAddress = Net.localAddress(fd);
+ } finally {
+ stateLock.unlock();
+ }
+
+ return this;
+ } finally {
+ readLock.unlock();
+ writeLock.unlock();
+ }
+
}
public DatagramChannel disconnect() throws IOException {
- synchronized(readLock) {
- synchronized(writeLock) {
- synchronized (stateLock) {
- if (!isConnected() || !isOpen())
- return this;
- InetSocketAddress isa = remoteAddress;
- SecurityManager sm = System.getSecurityManager();
- if (sm != null)
- sm.checkConnect(isa.getAddress().getHostAddress(),
- isa.getPort());
- boolean isIPv6 = (family == StandardProtocolFamily.INET6);
- disconnect0(fd, isIPv6);
- remoteAddress = null;
- state = ST_UNCONNECTED;
-
- // refresh local address
- localAddress = Net.localAddress(fd);
- }
- }
- }
- return this;
+ try {
+ readLock.lock();
+ writeLock.lock();
+ stateLock.lock();
+
+
+ if (!isConnected() || !isOpen())
+ return this;
+ InetSocketAddress isa = remoteAddress;
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null)
+ sm.checkConnect(isa.getAddress().getHostAddress(),
+ isa.getPort());
+ boolean isIPv6 = (family == StandardProtocolFamily.INET6);
+ disconnect0(fd, isIPv6);
+ remoteAddress = null;
+ state = ST_UNCONNECTED;
+
+ // refresh local address
+ localAddress = Net.localAddress(fd);
+
+ return this;
+ } finally {
+ readLock.unlock();
+ writeLock.unlock();
+ stateLock.unlock();
+ }
+
}
/**
@@ -818,7 +910,8 @@
if (sm != null)
sm.checkMulticast(group);
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isOpen())
throw new ClosedChannelException();
@@ -874,6 +967,8 @@
registry.add(key);
return key;
+ } finally {
+ stateLock.unlock();
}
}
@@ -900,7 +995,8 @@
void drop(MembershipKeyImpl key) {
assert key.channel() == this;
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
if (!key.isValid())
return;
@@ -921,6 +1017,8 @@
key.invalidate();
registry.remove(key);
+ } finally {
+ stateLock.unlock();
}
}
@@ -934,7 +1032,8 @@
assert key.channel() == this;
assert key.sourceAddress() == null;
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
if (!key.isValid())
throw new IllegalStateException("key is no longer valid");
if (source.isAnyLocalAddress())
@@ -960,6 +1059,8 @@
// ancient kernel
throw new UnsupportedOperationException();
}
+ } finally {
+ stateLock.unlock();
}
}
@@ -970,7 +1071,8 @@
assert key.channel() == this;
assert key.sourceAddress() == null;
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
if (!key.isValid())
throw new IllegalStateException("key is no longer valid");
@@ -990,11 +1092,14 @@
// should not happen
throw new AssertionError(ioe);
}
+ } finally {
+ stateLock.unlock();
}
}
protected void implCloseSelectableChannel() throws IOException {
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (state != ST_KILLED)
nd.preClose(fd);
ResourceManager.afterUdpClose();
@@ -1010,11 +1115,14 @@
NativeThread.signal(th);
if (!isRegistered())
kill();
+ } finally {
+ stateLock.unlock();
}
}
public void kill() throws IOException {
- synchronized (stateLock) {
+ try{
+ stateLock.lock();
if (state == ST_KILLED)
return;
if (state == ST_UNINITIALIZED) {
@@ -1024,6 +1132,8 @@
assert !isOpen() && !isRegistered();
nd.close(fd);
state = ST_KILLED;
+ } finally {
+ stateLock.unlock();
}
}
@@ -1080,14 +1190,18 @@
int poll(int events, long timeout) throws IOException {
assert Thread.holdsLock(blockingLock()) && !isBlocking();
- synchronized (readLock) {
+ try{
+ readLock.lock();
int n = 0;
try {
begin();
- synchronized (stateLock) {
+ try {
+ stateLock.lock();
if (!isOpen())
return 0;
readerThread = NativeThread.current();
+ } finally {
+ stateLock.unlock();
}
n = Net.poll(fd, events, timeout);
} finally {
@@ -1095,7 +1209,10 @@
end(n > 0);
}
return n;
- }
+ } finally {
+ readLock.unlock();
+ }
+
}
/**
---------- END SOURCE ----------
CUSTOMER SUBMITTED WORKAROUND :
To use native libs but for cross platform development it is very unexpected