diff --git a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java --- a/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java +++ b/src/java.base/share/classes/sun/nio/ch/DatagramChannelImpl.java @@ -37,6 +37,7 @@ import java.net.ProtocolFamily; import java.net.SocketAddress; import java.net.SocketOption; +import java.net.SocketTimeoutException; import java.net.StandardProtocolFamily; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; @@ -45,6 +46,7 @@ import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ClosedChannelException; import java.nio.channels.DatagramChannel; +import java.nio.channels.IllegalBlockingModeException; import java.nio.channels.MembershipKey; import java.nio.channels.NotYetConnectedException; import java.nio.channels.SelectionKey; @@ -401,61 +403,32 @@ public SocketAddress receive(ByteBuffer dst) throws IOException { if (dst.isReadOnly()) throw new IllegalArgumentException("Read-only buffer"); - readLock.lock(); try { boolean blocking = isBlocking(); int n = 0; - ByteBuffer bb = null; try { SocketAddress remote = beginRead(blocking, false); boolean connected = (remote != null); SecurityManager sm = System.getSecurityManager(); if (connected || (sm == null)) { // connected or no security manager - n = receive(fd, dst, connected); + n = receive(dst, connected); if (blocking) { while (IOStatus.okayToRetry(n) && isOpen()) { park(Net.POLLIN); - n = receive(fd, dst, connected); + n = receive(dst, connected); } - } else if (n == IOStatus.UNAVAILABLE) { - return null; } } else { - // Cannot receive into user's buffer when running with a - // security manager and not connected - bb = Util.getTemporaryDirectBuffer(dst.remaining()); - for (;;) { - n = receive(fd, bb, connected); - if (blocking) { - while (IOStatus.okayToRetry(n) && isOpen()) { - park(Net.POLLIN); - n = receive(fd, bb, connected); - } - } else if (n == IOStatus.UNAVAILABLE) { - return null; - } - InetSocketAddress isa = (InetSocketAddress)sender; - try { - sm.checkAccept(isa.getAddress().getHostAddress(), - isa.getPort()); - } catch (SecurityException se) { - // Ignore packet - bb.clear(); - n = 0; - continue; - } - bb.flip(); - dst.put(bb); - break; - } + // security manager and unconnected + n = untrustedReceive(dst, blocking); } + if (n == IOStatus.UNAVAILABLE) + return null; assert sender != null; return sender; } finally { - if (bb != null) - Util.releaseTemporaryDirectBuffer(bb); endRead(blocking, n > 0); assert IOStatus.check(n); } @@ -464,15 +437,122 @@ } } - private int receive(FileDescriptor fd, ByteBuffer dst, boolean connected) + /** + * Receives a datagram into the given buffer. + * + * @apiNote This method is for use by the socket adaptor. + * + * @throws IllegalBlockingModeException if the channel is non-blocking + * @throws SocketTimeoutException if the timeout elapses + */ + SocketAddress blockingReceive(ByteBuffer dst, long nanos) throws IOException { + readLock.lock(); + try { + ensureOpen(); + if (!isBlocking()) + throw new IllegalBlockingModeException(); + if (nanos == 0) { + return receive(dst); + } else { + return timedReceive(dst, nanos); + } + } finally { + readLock.unlock(); + } + } + + private SocketAddress timedReceive(ByteBuffer dst, long nanos) throws IOException { + assert readLock.isHeldByCurrentThread() && isBlocking(); + int n = 0; + try { + SocketAddress remote = beginRead(true, false); + boolean connected = (remote != null); + + // change socket to non-blocking + lockedConfigureBlocking(false); + try { + long startNanos = System.nanoTime(); + n = tryReceive(dst, connected); + while (n == IOStatus.UNAVAILABLE && isOpen()) { + long remainingNanos = nanos - (System.nanoTime() - startNanos); + if (remainingNanos <= 0) { + throw new SocketTimeoutException("Receive timed out"); + } + park(Net.POLLIN, remainingNanos); + n = tryReceive(dst, connected); + } + return sender; + } finally { + // restore socket to blocking mode + lockedConfigureBlocking(true); + } + } finally { + endRead(true, n > 0); + assert IOStatus.check(n); + } + } + + private int tryReceive(ByteBuffer dst, boolean connected) + throws IOException + { + if (connected || (System.getSecurityManager() == null)) { + // connected or no security manager + return receive(dst, connected); + } else { + // unconnected and security manager set + return untrustedReceive(dst, false); + } + } + + /** + * Receives a datagram in blocking or non-blocking mode. The datagram is + * copied into the given buffer when the sender address is accepted by the + * security manager. Returns the size of the datagram or IOStatus.UNAVAILABLE + * if the socket is non-blocking and there is no datagram available. + */ + private int untrustedReceive(ByteBuffer dst, boolean blocking) throws IOException { + SecurityManager sm = System.getSecurityManager(); + assert sm != null && remoteAddress == null; + + ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining()); + try { + int n; + for (;;) { + n = receive(bb, false); + if (blocking) { + while (IOStatus.okayToRetry(n) && isOpen()) { + park(Net.POLLIN); + n = receive(bb, false); + } + } else if (n == IOStatus.UNAVAILABLE) { + return n; + } + assert n >= 0; + InetSocketAddress isa = (InetSocketAddress) sender; + try { + sm.checkAccept(isa.getAddress().getHostAddress(), isa.getPort()); + bb.flip(); + dst.put(bb); + return n; + } catch (SecurityException se) { + // Ignore packet + bb.clear(); + } + } + } finally { + Util.releaseTemporaryDirectBuffer(bb); + } + } + + private int receive(ByteBuffer dst, boolean connected) throws IOException { int pos = dst.position(); int lim = dst.limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); if (dst instanceof DirectBuffer && rem > 0) - return receiveIntoNativeBuffer(fd, dst, rem, pos, connected); + return receiveIntoNativeBuffer(dst, rem, pos, connected); // Substitute a native buffer. If the supplied buffer is empty // we must instead use a nonempty buffer, otherwise the call @@ -480,7 +560,7 @@ int newSize = Math.max(rem, 1); ByteBuffer bb = Util.getTemporaryDirectBuffer(newSize); try { - int n = receiveIntoNativeBuffer(fd, bb, newSize, 0, connected); + int n = receiveIntoNativeBuffer(bb, newSize, 0, connected); bb.flip(); if (n > 0 && rem > 0) dst.put(bb); @@ -490,8 +570,8 @@ } } - private int receiveIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, - int rem, int pos, boolean connected) + private int receiveIntoNativeBuffer(ByteBuffer bb, int rem, int pos, + boolean connected) throws IOException { int n = receive0(fd, ((DirectBuffer)bb).address() + pos, rem, connected); @@ -556,6 +636,25 @@ } } + /** + * Sends a datagram from the bytes in given buffer. + * + * @apiNote This method is for use by the socket adaptor. + * + * @throws IllegalBlockingModeException if the channel is non-blocking + */ + void blockingSend(ByteBuffer src, SocketAddress target) throws IOException { + writeLock.lock(); + try { + ensureOpen(); + if (!isBlocking()) + throw new IllegalBlockingModeException(); + send(src, target); + } finally { + writeLock.unlock(); + } + } + private int send(FileDescriptor fd, ByteBuffer src, InetSocketAddress target) throws IOException { @@ -790,6 +889,17 @@ } } + /** + * Adjust the blocking mode while holding the readLock or writeLock. + */ + private void lockedConfigureBlocking(boolean block) throws IOException { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); + synchronized (stateLock) { + ensureOpen(); + IOUtil.configureBlocking(fd, block); + } + } + InetSocketAddress localAddress() { synchronized (stateLock) { return localAddress; @@ -854,6 +964,16 @@ @Override public DatagramChannel connect(SocketAddress sa) throws IOException { + return connect(sa, true); + } + + /** + * Connects the channel's socket. + * + * @param sa the remote address to which this channel is to be connected + * @param check true to check if the channel is already connected. + */ + DatagramChannel connect(SocketAddress sa, boolean check) throws IOException { InetSocketAddress isa = Net.checkAddress(sa, family); SecurityManager sm = System.getSecurityManager(); if (sm != null) { @@ -872,7 +992,7 @@ try { synchronized (stateLock) { ensureOpen(); - if (state == ST_CONNECTED) + if (check && state == ST_CONNECTED) throw new AlreadyConnectedException(); // ensure that the socket is bound @@ -901,7 +1021,7 @@ } try { ByteBuffer buf = ByteBuffer.allocate(100); - while (receive(fd, buf, false) > 0) { + while (receive(buf, false) > 0) { buf.clear(); } } finally { @@ -1332,30 +1452,6 @@ } /** - * Poll this channel's socket for reading up to the given timeout. - * @return {@code true} if the socket is polled - */ - boolean pollRead(long timeout) throws IOException { - boolean blocking = isBlocking(); - assert Thread.holdsLock(blockingLock()) && blocking; - - readLock.lock(); - try { - boolean polled = false; - try { - beginRead(blocking, false); - int events = Net.poll(fd, Net.POLLIN, timeout); - polled = (events != 0); - } finally { - endRead(blocking, polled); - } - return polled; - } finally { - readLock.unlock(); - } - } - - /** * Translates an interest operation set into a native poll event set */ public int translateInterestOps(int ops) { diff --git a/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java b/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java --- a/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java +++ b/src/java.base/share/classes/sun/nio/ch/DatagramSocketAdaptor.java @@ -26,6 +26,9 @@ package sun.nio.ch; import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodHandles.Lookup; +import java.lang.invoke.VarHandle; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.DatagramSocketImpl; @@ -35,15 +38,15 @@ import java.net.SocketAddress; import java.net.SocketException; import java.net.SocketOption; -import java.net.SocketTimeoutException; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.DatagramChannel; -import java.nio.channels.IllegalBlockingModeException; -import java.util.Objects; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.Set; +import static java.util.concurrent.TimeUnit.MILLISECONDS; // Make a datagram-socket channel look like a datagram socket. // @@ -61,13 +64,9 @@ // Timeout "option" value for receives private volatile int timeout; - // ## super will create a useless impl + // create DatagramSocket with useless impl private DatagramSocketAdaptor(DatagramChannelImpl dc) { - // Invoke the DatagramSocketAdaptor(SocketAddress) constructor, - // passing a dummy DatagramSocketImpl object to avoid any native - // resource allocation in super class and invoking our bind method - // before the dc field is initialized. - super(dummyDatagramSocket); + super(new DummyDatagramSocketImpl()); this.dc = dc; } @@ -75,9 +74,7 @@ return new DatagramSocketAdaptor(dc); } - private void connectInternal(SocketAddress remote) - throws SocketException - { + private void connectInternal(SocketAddress remote) throws SocketException { InetSocketAddress isa = Net.asInetSocketAddress(remote); int port = isa.getPort(); if (port < 0 || port > 0xFFFF) @@ -85,7 +82,7 @@ if (remote == null) throw new IllegalArgumentException("connect: null address"); try { - dc.connect(remote); + dc.connect(remote, false); // does not check if already connected } catch (ClosedChannelException e) { // ignore } catch (Exception x) { @@ -106,16 +103,19 @@ @Override public void connect(InetAddress address, int port) { + if (address == null) + throw new IllegalArgumentException("Address can't be null"); try { connectInternal(new InetSocketAddress(address, port)); } catch (SocketException x) { - // Yes, j.n.DatagramSocket really does this + throw new Error(x); } } @Override public void connect(SocketAddress remote) throws SocketException { - Objects.requireNonNull(remote, "Address can't be null"); + if (remote == null) + throw new IllegalArgumentException("Address can't be null"); connectInternal(remote); } @@ -157,80 +157,72 @@ @Override public SocketAddress getLocalSocketAddress() { - return dc.localAddress(); + if (isClosed()) + return null; + return Net.getRevealedLocalAddress(dc.localAddress()); } @Override public void send(DatagramPacket p) throws IOException { - synchronized (dc.blockingLock()) { - if (!dc.isBlocking()) - throw new IllegalBlockingModeException(); - try { - synchronized (p) { - ByteBuffer bb = ByteBuffer.wrap(p.getData(), - p.getOffset(), - p.getLength()); - if (dc.isConnected()) { - if (p.getAddress() == null) { - // Legacy DatagramSocket will send in this case - // and set address and port of the packet - InetSocketAddress isa = dc.remoteAddress(); - p.setPort(isa.getPort()); - p.setAddress(isa.getAddress()); - dc.write(bb); - } else { - // Target address may not match connected address - dc.send(bb, p.getSocketAddress()); - } - } else { - // Not connected so address must be valid or throw - dc.send(bb, p.getSocketAddress()); + ByteBuffer bb = null; + try { + InetSocketAddress target; + synchronized (p) { + // copy bytes to temporary direct buffer + int len = p.getLength(); + bb = Util.getTemporaryDirectBuffer(len); + bb.put(p.getData(), p.getOffset(), len); + bb.flip(); + + // target address + if (p.getAddress() == null) { + InetSocketAddress remote = dc.remoteAddress(); + if (remote == null) { + // not specified by DatagramSocket + throw new IllegalArgumentException("Address not set"); } + // set address/port to maintain compatibility with DatagramSocket + p.setAddress(remote.getAddress()); + p.setPort(remote.getPort()); + target = remote; + } else { + // throws IllegalArgumentException if port not set + target = (InetSocketAddress) p.getSocketAddress(); } - } catch (IOException x) { - Net.translateException(x); } - } - } - - private SocketAddress receive(ByteBuffer bb) throws IOException { - assert Thread.holdsLock(dc.blockingLock()) && dc.isBlocking(); - - long to = this.timeout; - if (to == 0) { - return dc.receive(bb); - } else { - for (;;) { - if (!dc.isOpen()) - throw new ClosedChannelException(); - long st = System.currentTimeMillis(); - if (dc.pollRead(to)) { - return dc.receive(bb); - } - to -= System.currentTimeMillis() - st; - if (to <= 0) - throw new SocketTimeoutException(); + // send datagram + dc.blockingSend(bb, target); + } catch (IOException x) { + Net.translateException(x); + } finally { + if (bb != null) { + Util.offerFirstTemporaryDirectBuffer(bb); } } } @Override public void receive(DatagramPacket p) throws IOException { - synchronized (dc.blockingLock()) { - if (!dc.isBlocking()) - throw new IllegalBlockingModeException(); - try { - synchronized (p) { - ByteBuffer bb = ByteBuffer.wrap(p.getData(), - p.getOffset(), - p.getLength()); - SocketAddress sender = receive(bb); - p.setSocketAddress(sender); - p.setLength(bb.position() - p.getOffset()); - } - } catch (IOException x) { - Net.translateException(x); + // get temporary direct buffer with a capacity of p.bufLength + int bufLength = DatagramPackets.getBufLength(p); + ByteBuffer bb = Util.getTemporaryDirectBuffer(bufLength); + try { + long nanos = MILLISECONDS.toNanos(timeout); + SocketAddress sender = dc.blockingReceive(bb, nanos); + bb.flip(); + synchronized (p) { + // copy bytes to the DatagramPacket and set length + int len = Math.min(bb.limit(), DatagramPackets.getBufLength(p)); + bb.get(p.getData(), p.getOffset(), len); + DatagramPackets.setLength(p, len); + + // sender address + p.setSocketAddress(sender); } + } catch (IOException x) { + Net.translateException(x); + } finally { + Util.offerFirstTemporaryDirectBuffer(bb); } } @@ -257,19 +249,16 @@ public int getLocalPort() { if (isClosed()) return -1; - try { - InetSocketAddress local = dc.localAddress(); - if (local != null) { - return local.getPort(); - } - } catch (Exception x) { + InetSocketAddress local = dc.localAddress(); + if (local != null) { + return local.getPort(); } return 0; } @Override public void setSoTimeout(int timeout) throws SocketException { - if (!dc.isOpen()) + if (isClosed()) throw new SocketException("Socket is closed"); if (timeout < 0) throw new IllegalArgumentException("timeout < 0"); @@ -278,7 +267,7 @@ @Override public int getSoTimeout() throws SocketException { - if (!dc.isOpen()) + if (isClosed()) throw new SocketException("Socket is closed"); return timeout; } @@ -353,7 +342,6 @@ @Override public boolean getReuseAddress() throws SocketException { return getBooleanOption(StandardSocketOptions.SO_REUSEADDR); - } @Override @@ -410,51 +398,157 @@ public Set> supportedOptions() { return dc.supportedOptions(); } +} - /* - * A dummy implementation of DatagramSocketImpl that can be passed to the - * DatagramSocket constructor so that no native resources are allocated in - * super class. - */ - private static final DatagramSocketImpl dummyDatagramSocket - = new DatagramSocketImpl() - { - protected void create() throws SocketException {} +/** + * DatagramSocketImpl implementation where all methods throw an error. + */ +class DummyDatagramSocketImpl extends DatagramSocketImpl { + private static T shouldNotGetHere() { + throw new InternalError("Should not get here"); + } - protected void bind(int lport, InetAddress laddr) throws SocketException {} + @Override + protected void create() { + shouldNotGetHere(); + } - protected void send(DatagramPacket p) throws IOException {} + @Override + protected void bind(int lport, InetAddress laddr) { + shouldNotGetHere(); + } - protected int peek(InetAddress i) throws IOException { return 0; } + @Override + protected void send(DatagramPacket p) { + shouldNotGetHere(); + } - protected int peekData(DatagramPacket p) throws IOException { return 0; } + @Override + protected int peek(InetAddress address) { + return shouldNotGetHere(); + } - protected void receive(DatagramPacket p) throws IOException {} + @Override + protected int peekData(DatagramPacket p) { + return shouldNotGetHere(); + } - @Deprecated - protected void setTTL(byte ttl) throws IOException {} + @Override + protected void receive(DatagramPacket p) { + shouldNotGetHere(); + } - @Deprecated - protected byte getTTL() throws IOException { return 0; } + @Deprecated + protected void setTTL(byte ttl) { + shouldNotGetHere(); + } - protected void setTimeToLive(int ttl) throws IOException {} + @Deprecated + protected byte getTTL() { + return shouldNotGetHere(); + } - protected int getTimeToLive() throws IOException { return 0;} + @Override + protected void setTimeToLive(int ttl) { + shouldNotGetHere(); + } - protected void join(InetAddress inetaddr) throws IOException {} + @Override + protected int getTimeToLive() { + return shouldNotGetHere(); + } - protected void leave(InetAddress inetaddr) throws IOException {} + @Override + protected void join(InetAddress group) { + shouldNotGetHere(); + } - protected void joinGroup(SocketAddress mcastaddr, - NetworkInterface netIf) throws IOException {} + @Override + protected void leave(InetAddress inetaddr) { + shouldNotGetHere(); + } - protected void leaveGroup(SocketAddress mcastaddr, - NetworkInterface netIf) throws IOException {} + @Override + protected void joinGroup(SocketAddress group, NetworkInterface netIf) { + shouldNotGetHere(); + } - protected void close() {} + @Override + protected void leaveGroup(SocketAddress mcastaddr, NetworkInterface netIf) { + shouldNotGetHere(); + } - public Object getOption(int optID) throws SocketException { return null;} + @Override + protected void close() { + shouldNotGetHere(); + } - public void setOption(int optID, Object value) throws SocketException {} - }; + @Override + public Object getOption(int optID) { + return shouldNotGetHere(); + } + + @Override + public void setOption(int optID, Object value) { + shouldNotGetHere(); + } + + @Override + protected void setOption(SocketOption name, T value) { + shouldNotGetHere(); + } + + @Override + protected T getOption(SocketOption name) { + return shouldNotGetHere(); + } + + @Override + protected Set> supportedOptions(){ + return shouldNotGetHere(); + } } + +/** + * Defines static methods to get/set DatagramPacket fields and workaround + * DatagramPacket deficiencies. + */ +class DatagramPackets { + private static final VarHandle LENGTH; + private static final VarHandle BUF_LENGTH; + static { + try { + PrivilegedAction pa = () -> { + try { + return MethodHandles.privateLookupIn(DatagramPacket.class, MethodHandles.lookup()); + } catch (Exception e) { + throw new ExceptionInInitializerError(e); + } + }; + MethodHandles.Lookup l = AccessController.doPrivileged(pa); + LENGTH = l.findVarHandle(DatagramPacket.class, "length", int.class); + BUF_LENGTH = l.findVarHandle(DatagramPacket.class, "bufLength", int.class); + } catch (Exception e) { + throw new ExceptionInInitializerError(e); + } + } + + /** + * Sets the DatagramPacket.length field. DatagramPacket.setLength cannot be + * used at this time because it sets both the length and bufLength fields. + */ + static void setLength(DatagramPacket p, int value) { + synchronized (p) { + LENGTH.set(p, value); + } + } + + /** + * Returns the value of the DatagramPacket.bufLength field. + */ + static int getBufLength(DatagramPacket p) { + synchronized (p) { + return (int) BUF_LENGTH.get(p); + } + } +} \ No newline at end of file diff --git a/src/java.base/share/classes/sun/nio/ch/DummySocketImpl.java b/src/java.base/share/classes/sun/nio/ch/DummySocketImpl.java --- a/src/java.base/share/classes/sun/nio/ch/DummySocketImpl.java +++ b/src/java.base/share/classes/sun/nio/ch/DummySocketImpl.java @@ -31,8 +31,6 @@ import java.net.SocketAddress; import java.net.SocketImpl; import java.net.SocketOption; -import java.security.AccessController; -import java.security.PrivilegedAction; import java.util.Set; /** @@ -41,12 +39,10 @@ */ class DummySocketImpl extends SocketImpl { - private static final PrivilegedAction NEW = DummySocketImpl::new; - private DummySocketImpl() { } static SocketImpl create() { - return AccessController.doPrivileged(NEW); + return new DummySocketImpl(); } private static T shouldNotGetHere() { diff --git a/test/jdk/java/nio/channels/DatagramChannel/AdaptDatagramSocket.java b/test/jdk/java/nio/channels/DatagramChannel/AdaptDatagramSocket.java --- a/test/jdk/java/nio/channels/DatagramChannel/AdaptDatagramSocket.java +++ b/test/jdk/java/nio/channels/DatagramChannel/AdaptDatagramSocket.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2001, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2001, 2019, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -24,6 +24,7 @@ /* @test * @bug 4313882 4981129 8143610 * @summary Unit test for datagram-socket-channel adaptors + * @modules java.base/java.net:+open * @library .. /test/lib * @build jdk.test.lib.Utils TestServers * @run main AdaptDatagramSocket @@ -33,6 +34,7 @@ import java.net.*; import java.nio.channels.*; import java.util.*; +import java.lang.reflect.Field; public class AdaptDatagramSocket { @@ -46,13 +48,19 @@ + "]"); } - static void test(DatagramSocket ds, InetSocketAddress dst, - boolean shouldTimeout) + static int getBufLength(DatagramPacket p) throws Exception { + Field f = DatagramPacket.class.getDeclaredField("bufLength"); + f.setAccessible(true); + return (int) f.get(p); + } + + static void test(DatagramSocket ds, InetSocketAddress dst, boolean shouldTimeout) throws Exception { DatagramPacket op = new DatagramPacket(new byte[100], 13, 42, dst); rand.nextBytes(op.getData()); - DatagramPacket ip = new DatagramPacket(new byte[100], 19, 100 - 19); + int bufLength = 100 - 19; + DatagramPacket ip = new DatagramPacket(new byte[100], 19, bufLength); out.println("pre op: " + toString(op) + " ip: " + toString(ip)); long start = System.currentTimeMillis(); @@ -61,10 +69,6 @@ for (;;) { try { ds.receive(ip); - if (ip.getLength() == 0) { // ## Not sure why this happens - ip.setLength(100 - 19); - continue; - } } catch (SocketTimeoutException x) { if (shouldTimeout) { out.println("Receive timed out, as expected"); @@ -88,6 +92,10 @@ throw new Exception("Incorrect sender address, expected: " + dst + " actual: " + ip.getSocketAddress()); } + + if (getBufLength(ip) != bufLength) { + throw new Exception("DatagramPacket bufLength changed by receive!!!"); + } } static void test(InetSocketAddress dst, diff --git a/test/jdk/java/nio/channels/DatagramChannel/ConcurrentSendReceive.java b/test/jdk/java/nio/channels/DatagramChannel/ConcurrentSendReceive.java new file mode 100644 --- /dev/null +++ b/test/jdk/java/nio/channels/DatagramChannel/ConcurrentSendReceive.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/** + * @test + * @summary Test the DatagramChannel socket adaptor can send a datagram while + * another thread is blocked in receive + */ + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.channels.DatagramChannel; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class ConcurrentSendReceive { + + public static void main(String[] args) throws Exception { + testConcurrentSendReceive(0); + testConcurrentSendReceive(60_000); + } + + static void testConcurrentSendReceive(int timeout) throws Exception { + try (DatagramChannel dc = DatagramChannel.open()) { + InetAddress lb = InetAddress.getLoopbackAddress(); + dc.bind(new InetSocketAddress(lb, 0)); + DatagramSocket s = dc.socket(); + s.setSoTimeout(timeout); + + ExecutorService pool = Executors.newSingleThreadExecutor(); + try { + Future result = pool.submit(() -> { + byte[] data = new byte[100]; + DatagramPacket p = new DatagramPacket(data, 0, data.length); + s.receive(p); + return new String(p.getData(), p.getOffset(), p.getLength(), "UTF-8"); + }); + + Thread.sleep(200); // give chance for thread to block + + byte[] data = "hello".getBytes("UTF-8"); + DatagramPacket p = new DatagramPacket(data, 0, data.length); + p.setSocketAddress(s.getLocalSocketAddress()); + s.send(p); + + String msg = result.get(); + if (!msg.equals("hello")) + throw new RuntimeException("Unexpected message: " + msg); + } finally { + pool.shutdown(); + } + } + } +}