Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8195742

(dc) DatagramChannelImpl#receive(java.nio.ByteBuffer) performance issue

XMLWordPrintable

      A DESCRIPTION OF THE REQUEST :
      1.
      sun.nio.ch.DatagramChannelImpl#receive(java.nio.ByteBuffer) do extra copes of ByteBuffer when receiving data to direct byte buffer.

      It call
      bb = Util.getTemporaryDirectBuffer(dst.remaining()); as a result
       n = receive(fd, bb);
      bb.flip();
      dst.put(bb);
      2. DatagramChannelImpl use synchronized it perform very slow. I suggest to use ReentrantLock instead of synchronized .

      JUSTIFICATION :
      If udp socket channel works in non blocking/blocking mode system has bad performance.
      Without improvements it can process 160 udp messages per second with patch it grows to 500
      I can send patches for DatagramChannelImpl and SocketChannelImpl


      ---------- BEGIN SOURCE ----------
      patch for DatagramChannelImpl
      --- D:\work\jdk-trunk\jdk\src\share\classes\sun\nio\ch\DatagramChannelImpl.java
      +++ D:\work\jdk8\jdk\src\share\classes\sun\nio\ch\DatagramChannelImpl.java
      @@ -25,14 +25,21 @@
       
       package sun.nio.ch;
       
      +import sun.net.ResourceManager;
      +
       import java.io.FileDescriptor;
       import java.io.IOException;
       import java.net.*;
       import java.nio.ByteBuffer;
       import java.nio.channels.*;
      -import java.nio.channels.spi.*;
      -import java.util.*;
      -import sun.net.ResourceManager;
      +import java.nio.channels.spi.SelectorProvider;
      +import java.util.Collections;
      +import java.util.HashSet;
      +import java.util.Set;
      +import java.util.concurrent.locks.Lock;
      +import java.util.concurrent.locks.ReadWriteLock;
      +import java.util.concurrent.locks.ReentrantLock;
      +import java.util.concurrent.locks.ReentrantReadWriteLock;
       
       /**
        * An implementation of DatagramChannels.
      @@ -66,14 +73,14 @@
           private int cachedSenderPort;
       
           // Lock held by current reading or connecting thread
      - private final Object readLock = new Object();
      + private final Lock readLock = new ReentrantLock();
       
           // Lock held by current writing or connecting thread
      - private final Object writeLock = new Object();
      + private final Lock writeLock = new ReentrantLock();
       
           // Lock held by any thread that modifies the state fields declared below
           // DO NOT invoke a blocking I/O operation while holding this lock!
      - private final Object stateLock = new Object();
      + private final Lock stateLock = new ReentrantLock();
       
           // -- The following fields are protected by stateLock
       
      @@ -156,29 +163,40 @@
           }
       
           public DatagramSocket socket() {
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                   if (socket == null)
                       socket = DatagramSocketAdaptor.create(this);
                   return socket;
      + } finally {
      + stateLock.unlock();
               }
           }
       
           @Override
           public SocketAddress getLocalAddress() throws IOException {
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                   if (!isOpen())
                       throw new ClosedChannelException();
                   // Perform security check before returning address
                   return Net.getRevealedLocalAddress(localAddress);
      + } finally {
      + stateLock.unlock();
               }
           }
       
           @Override
           public SocketAddress getRemoteAddress() throws IOException {
      - synchronized (stateLock) {
      - if (!isOpen())
      - throw new ClosedChannelException();
      - return remoteAddress;
      + try {
      + stateLock.lock();
      + {
      + if (!isOpen())
      + throw new ClosedChannelException();
      + return remoteAddress;
      +
      + } } finally{
      + stateLock.unlock();
               }
           }
       
      @@ -191,7 +209,8 @@
               if (!supportedOptions().contains(name))
                   throw new UnsupportedOperationException("'" + name + "' not supported");
       
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                   ensureOpen();
       
                   if (name == StandardSocketOptions.IP_TOS) {
      @@ -239,6 +258,8 @@
                   // remaining options don't need any special handling
                   Net.setSocketOption(fd, Net.UNSPEC, name, value);
                   return this;
      + } finally {
      + stateLock.unlock();
               }
           }
       
      @@ -252,7 +273,8 @@
               if (!supportedOptions().contains(name))
                   throw new UnsupportedOperationException("'" + name + "' not supported");
       
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                   ensureOpen();
       
                   if (name == StandardSocketOptions.IP_TOS) {
      @@ -301,6 +323,8 @@
       
                   // no special handling
                   return (T) Net.getSocketOption(fd, Net.UNSPEC, name);
      + } finally {
      + stateLock.unlock();
               }
           }
       
      @@ -338,7 +362,8 @@
                   throw new IllegalArgumentException("Read-only buffer");
               if (dst == null)
                   throw new NullPointerException();
      - synchronized (readLock) {
      + try {
      + readLock.lock();
                   ensureOpen();
                   // Socket was not bound before attempting receive
                   if (localAddress() == null)
      @@ -358,37 +383,33 @@
                           if (n == IOStatus.UNAVAILABLE)
                               return null;
                       } else {
      - bb = Util.getTemporaryDirectBuffer(dst.remaining());
      - for (;;) {
      + for (; ; ) {
                               do {
      - n = receive(fd, bb);
      + n = receive(fd, dst);
                               } while ((n == IOStatus.INTERRUPTED) && isOpen());
                               if (n == IOStatus.UNAVAILABLE)
                                   return null;
      - InetSocketAddress isa = (InetSocketAddress)sender;
      + InetSocketAddress isa = (InetSocketAddress) sender;
                               try {
                                   security.checkAccept(
      - isa.getAddress().getHostAddress(),
      - isa.getPort());
      + isa.getAddress().getHostAddress(),
      + isa.getPort());
                               } catch (SecurityException se) {
                                   // Ignore packet
      - bb.clear();
                                   n = 0;
                                   continue;
                               }
      - bb.flip();
      - dst.put(bb);
                               break;
                           }
                       }
                       return sender;
                   } finally {
      - if (bb != null)
      - Util.releaseTemporaryDirectBuffer(bb);
                       readerThread = 0;
                       end((n > 0) || (n == IOStatus.UNAVAILABLE));
                       assert IOStatus.check(n);
                   }
      + }finally {
      + readLock.unlock();
               }
           }
       
      @@ -435,13 +456,15 @@
               if (src == null)
                   throw new NullPointerException();
       
      - synchronized (writeLock) {
      + try{
      + writeLock.lock();
                   ensureOpen();
                   InetSocketAddress isa = Net.checkAddress(target);
                   InetAddress ia = isa.getAddress();
                   if (ia == null)
                       throw new IOException("Target address not resolved");
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                       if (!isConnected()) {
                           if (target == null)
                               throw new NullPointerException();
      @@ -461,6 +484,8 @@
                           }
                           return write(src);
                       }
      + } finally {
      + stateLock.unlock();
                   }
       
                   int n = 0;
      @@ -473,10 +498,13 @@
                           n = send(fd, src, isa);
                       } while ((n == IOStatus.INTERRUPTED) && isOpen());
       
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                           if (isOpen() && (localAddress == null)) {
                               localAddress = Net.localAddress(fd);
                           }
      + } finally {
      + stateLock.unlock();
                       }
                       return IOStatus.normalize(n);
                   } finally {
      @@ -484,7 +512,10 @@
                       end((n > 0) || (n == IOStatus.UNAVAILABLE));
                       assert IOStatus.check(n);
                   }
      - }
      + } finally {
      + writeLock.unlock();
      + }
      +
           }
       
           private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target)
      @@ -544,11 +575,15 @@
           public int read(ByteBuffer buf) throws IOException {
               if (buf == null)
                   throw new NullPointerException();
      - synchronized (readLock) {
      - synchronized (stateLock) {
      + try {
      + readLock.lock();
      + try {
      + stateLock.lock();
                       ensureOpen();
                       if (!isConnected())
                           throw new NotYetConnectedException();
      + } finally {
      + stateLock.unlock();
                   }
                   int n = 0;
                   try {
      @@ -565,19 +600,24 @@
                       end((n > 0) || (n == IOStatus.UNAVAILABLE));
                       assert IOStatus.check(n);
                   }
      + } finally {
      + readLock.unlock();
               }
           }
       
           public long read(ByteBuffer[] dsts, int offset, int length)
      - throws IOException
      - {
      + throws IOException {
               if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
                   throw new IndexOutOfBoundsException();
      - synchronized (readLock) {
      - synchronized (stateLock) {
      + try {
      + readLock.lock();
      + try {
      + stateLock.lock();
                       ensureOpen();
                       if (!isConnected())
                           throw new NotYetConnectedException();
      + } finally {
      + stateLock.unlock();
                   }
                   long n = 0;
                   try {
      @@ -594,17 +634,26 @@
                       end((n > 0) || (n == IOStatus.UNAVAILABLE));
                       assert IOStatus.check(n);
                   }
      + } finally
      +
      + {
      + readLock.unlock();
               }
           }
       
           public int write(ByteBuffer buf) throws IOException {
               if (buf == null)
                   throw new NullPointerException();
      - synchronized (writeLock) {
      - synchronized (stateLock) {
      + try {
      + writeLock.lock();
      +
      + try{
      + stateLock.lock();
                       ensureOpen();
                       if (!isConnected())
                           throw new NotYetConnectedException();
      + } finally {
      + stateLock.unlock();
                   }
                   int n = 0;
                   try {
      @@ -621,6 +670,8 @@
                       end((n > 0) || (n == IOStatus.UNAVAILABLE));
                       assert IOStatus.check(n);
                   }
      + } finally {
      + writeLock.unlock();
               }
           }
       
      @@ -629,11 +680,15 @@
           {
               if ((offset < 0) || (length < 0) || (offset > srcs.length - length))
                   throw new IndexOutOfBoundsException();
      - synchronized (writeLock) {
      - synchronized (stateLock) {
      + try{
      + writeLock.lock();
      + try{
      + stateLock.lock();
                       ensureOpen();
                       if (!isConnected())
                           throw new NotYetConnectedException();
      + } finally {
      + stateLock.unlock();
                   }
                   long n = 0;
                   try {
      @@ -650,6 +705,8 @@
                       end((n > 0) || (n == IOStatus.UNAVAILABLE));
                       assert IOStatus.check(n);
                   }
      + } finally {
      + writeLock.unlock();
               }
           }
       
      @@ -658,67 +715,87 @@
           }
       
           public SocketAddress localAddress() {
      - synchronized (stateLock) {
      + try{
      + stateLock.lock();
                   return localAddress;
      + }finally {
      + stateLock.unlock();
               }
           }
       
           public SocketAddress remoteAddress() {
      - synchronized (stateLock) {
      + try{
      + stateLock.lock();
                   return remoteAddress;
      - }
      + }finally {
      + stateLock.unlock();
      + }
      +
           }
       
           @Override
           public DatagramChannel bind(SocketAddress local) throws IOException {
      - synchronized (readLock) {
      - synchronized (writeLock) {
      - synchronized (stateLock) {
      - ensureOpen();
      - if (localAddress != null)
      - throw new AlreadyBoundException();
      - InetSocketAddress isa;
      - if (local == null) {
      - // only Inet4Address allowed with IPv4 socket
      - if (family == StandardProtocolFamily.INET) {
      - isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
      - } else {
      - isa = new InetSocketAddress(0);
      - }
      + try {
      + readLock.lock();
      + writeLock.lock();
      + try {
      + stateLock.lock();
      + ensureOpen();
      + if (localAddress != null)
      + throw new AlreadyBoundException();
      + InetSocketAddress isa;
      + if (local == null) {
      + // only Inet4Address allowed with IPv4 socket
      + if (family == StandardProtocolFamily.INET) {
      + isa = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
                           } else {
      - isa = Net.checkAddress(local);
      -
      - // only Inet4Address allowed with IPv4 socket
      - if (family == StandardProtocolFamily.INET) {
      - InetAddress addr = isa.getAddress();
      - if (!(addr instanceof Inet4Address))
      - throw new UnsupportedAddressTypeException();
      - }
      + isa = new InetSocketAddress(0);
                           }
      - SecurityManager sm = System.getSecurityManager();
      - if (sm != null) {
      - sm.checkListen(isa.getPort());
      + } else {
      + isa = Net.checkAddress(local);
      +
      + // only Inet4Address allowed with IPv4 socket
      + if (family == StandardProtocolFamily.INET) {
      + InetAddress addr = isa.getAddress();
      + if (!(addr instanceof Inet4Address))
      + throw new UnsupportedAddressTypeException();
                           }
      - Net.bind(family, fd, isa.getAddress(), isa.getPort());
      - localAddress = Net.localAddress(fd);
                       }
      - }
      - }
      - return this;
      - }
      + SecurityManager sm = System.getSecurityManager();
      + if (sm != null) {
      + sm.checkListen(isa.getPort());
      + }
      + Net.bind(family, fd, isa.getAddress(), isa.getPort());
      + localAddress = Net.localAddress(fd);
      + } finally {
      + stateLock.unlock();
      + }
      + return this;
      + } finally {
      + readLock.unlock();
      + writeLock.unlock();
      + }
      +
      +}
       
           public boolean isConnected() {
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                   return (state == ST_CONNECTED);
      + } finally {
      + stateLock.unlock();
               }
           }
       
           void ensureOpenAndUnconnected() throws IOException { // package-private
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                   if (!isOpen())
                       throw new ClosedChannelException();
                   if (state != ST_UNCONNECTED)
                       throw new IllegalStateException("Connect already invoked");
      + } finally {
      + stateLock.unlock();
               }
           }
       
      @@ -726,59 +803,74 @@
           public DatagramChannel connect(SocketAddress sa) throws IOException {
               int localPort = 0;
       
      - synchronized(readLock) {
      - synchronized(writeLock) {
      - synchronized (stateLock) {
      - ensureOpenAndUnconnected();
      - InetSocketAddress isa = Net.checkAddress(sa);
      - SecurityManager sm = System.getSecurityManager();
      - if (sm != null)
      - sm.checkConnect(isa.getAddress().getHostAddress(),
      - isa.getPort());
      - int n = Net.connect(family,
      - fd,
      - isa.getAddress(),
      - isa.getPort());
      - if (n <= 0)
      - throw new Error(); // Can't happen
      -
      - // Connection succeeded; disallow further invocation
      - state = ST_CONNECTED;
      - remoteAddress = isa;
      - sender = isa;
      - cachedSenderInetAddress = isa.getAddress();
      - cachedSenderPort = isa.getPort();
      -
      - // set or refresh local address
      - localAddress = Net.localAddress(fd);
      - }
      - }
      - }
      - return this;
      + try {
      + readLock.lock();
      + writeLock.lock();
      + try {
      + stateLock.lock();
      + ensureOpenAndUnconnected();
      + InetSocketAddress isa = Net.checkAddress(sa);
      + SecurityManager sm = System.getSecurityManager();
      + if (sm != null)
      + sm.checkConnect(isa.getAddress().getHostAddress(),
      + isa.getPort());
      + int n = Net.connect(family,
      + fd,
      + isa.getAddress(),
      + isa.getPort());
      + if (n <= 0)
      + throw new Error(); // Can't happen
      +
      + // Connection succeeded; disallow further invocation
      + state = ST_CONNECTED;
      + remoteAddress = isa;
      + sender = isa;
      + cachedSenderInetAddress = isa.getAddress();
      + cachedSenderPort = isa.getPort();
      +
      + // set or refresh local address
      + localAddress = Net.localAddress(fd);
      + } finally {
      + stateLock.unlock();
      + }
      +
      + return this;
      + } finally {
      + readLock.unlock();
      + writeLock.unlock();
      + }
      +
           }
       
           public DatagramChannel disconnect() throws IOException {
      - synchronized(readLock) {
      - synchronized(writeLock) {
      - synchronized (stateLock) {
      - if (!isConnected() || !isOpen())
      - return this;
      - InetSocketAddress isa = remoteAddress;
      - SecurityManager sm = System.getSecurityManager();
      - if (sm != null)
      - sm.checkConnect(isa.getAddress().getHostAddress(),
      - isa.getPort());
      - boolean isIPv6 = (family == StandardProtocolFamily.INET6);
      - disconnect0(fd, isIPv6);
      - remoteAddress = null;
      - state = ST_UNCONNECTED;
      -
      - // refresh local address
      - localAddress = Net.localAddress(fd);
      - }
      - }
      - }
      - return this;
      + try {
      + readLock.lock();
      + writeLock.lock();
      + stateLock.lock();
      +
      +
      + if (!isConnected() || !isOpen())
      + return this;
      + InetSocketAddress isa = remoteAddress;
      + SecurityManager sm = System.getSecurityManager();
      + if (sm != null)
      + sm.checkConnect(isa.getAddress().getHostAddress(),
      + isa.getPort());
      + boolean isIPv6 = (family == StandardProtocolFamily.INET6);
      + disconnect0(fd, isIPv6);
      + remoteAddress = null;
      + state = ST_UNCONNECTED;
      +
      + // refresh local address
      + localAddress = Net.localAddress(fd);
      +
      + return this;
      + } finally {
      + readLock.unlock();
      + writeLock.unlock();
      + stateLock.unlock();
      + }
      +
           }
       
           /**
      @@ -818,7 +910,8 @@
               if (sm != null)
                   sm.checkMulticast(group);
       
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                   if (!isOpen())
                       throw new ClosedChannelException();
       
      @@ -874,6 +967,8 @@
       
                   registry.add(key);
                   return key;
      + } finally {
      + stateLock.unlock();
               }
           }
       
      @@ -900,7 +995,8 @@
           void drop(MembershipKeyImpl key) {
               assert key.channel() == this;
       
      - synchronized (stateLock) {
      + try{
      + stateLock.lock();
                   if (!key.isValid())
                       return;
       
      @@ -921,6 +1017,8 @@
       
                   key.invalidate();
                   registry.remove(key);
      + } finally {
      + stateLock.unlock();
               }
           }
       
      @@ -934,7 +1032,8 @@
               assert key.channel() == this;
               assert key.sourceAddress() == null;
       
      - synchronized (stateLock) {
      + try{
      + stateLock.lock();
                   if (!key.isValid())
                       throw new IllegalStateException("key is no longer valid");
                   if (source.isAnyLocalAddress())
      @@ -960,6 +1059,8 @@
                       // ancient kernel
                       throw new UnsupportedOperationException();
                   }
      + } finally {
      + stateLock.unlock();
               }
           }
       
      @@ -970,7 +1071,8 @@
               assert key.channel() == this;
               assert key.sourceAddress() == null;
       
      - synchronized (stateLock) {
      + try{
      + stateLock.lock();
                   if (!key.isValid())
                       throw new IllegalStateException("key is no longer valid");
       
      @@ -990,11 +1092,14 @@
                       // should not happen
                       throw new AssertionError(ioe);
                   }
      + } finally {
      + stateLock.unlock();
               }
           }
       
           protected void implCloseSelectableChannel() throws IOException {
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                   if (state != ST_KILLED)
                       nd.preClose(fd);
                   ResourceManager.afterUdpClose();
      @@ -1010,11 +1115,14 @@
                       NativeThread.signal(th);
                   if (!isRegistered())
                       kill();
      + } finally {
      + stateLock.unlock();
               }
           }
       
           public void kill() throws IOException {
      - synchronized (stateLock) {
      + try{
      + stateLock.lock();
                   if (state == ST_KILLED)
                       return;
                   if (state == ST_UNINITIALIZED) {
      @@ -1024,6 +1132,8 @@
                   assert !isOpen() && !isRegistered();
                   nd.close(fd);
                   state = ST_KILLED;
      + } finally {
      + stateLock.unlock();
               }
           }
       
      @@ -1080,14 +1190,18 @@
           int poll(int events, long timeout) throws IOException {
               assert Thread.holdsLock(blockingLock()) && !isBlocking();
       
      - synchronized (readLock) {
      + try{
      + readLock.lock();
                   int n = 0;
                   try {
                       begin();
      - synchronized (stateLock) {
      + try {
      + stateLock.lock();
                           if (!isOpen())
                               return 0;
                           readerThread = NativeThread.current();
      + } finally {
      + stateLock.unlock();
                       }
                       n = Net.poll(fd, events, timeout);
                   } finally {
      @@ -1095,7 +1209,10 @@
                       end(n > 0);
                   }
                   return n;
      - }
      + } finally {
      + readLock.unlock();
      + }
      +
           }
       
           /**

      ---------- END SOURCE ----------

      CUSTOMER SUBMITTED WORKAROUND :
      To use native libs but for cross platform development it is very unexpected

            alanb Alan Bateman
            webbuggrp Webbug Group
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: