A DESCRIPTION OF THE PROBLEM :
Using `java.nio.channels.DatagramChannel#receive` to receive UDP datagrams from multiple sources leads to `InetSocketAddress` being allocated for every packet received. The culprit is `sun.nio.ch.DatagramChannelImpl#sourceSocketAddress` method which caches last returned address. However, when there are multiple sources the cache value is useless, i.e. is constantly replaced on every received packet.
STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
Run attached code (DatagramChannelReceiveFromMultipleSources) with one sender (OK case) and greater than one (error case, default is 5). The former will succeed while the latter will fail with an exception.
EXPECTED VERSUS ACTUAL BEHAVIOR :
EXPECTED -
Test program completes without errors regarding to the number of senders used.
ACTUAL -
With one sender application succeeds, e.g.:
```
Sending 1,000,000 messages using 1 senders...
Duration: 3257 ms
```
Whereas with more than sender it fails with an error:
```
Sending 1,000,000 messages using 5 senders...
Duration: 3360 ms
Exception in thread "main" java.lang.IllegalStateException: Sender addresses not cached: expected=5, got=1000000
at test.DatagramChannelReceiveFromMultipleSources.main(DatagramChannelReceiveFromMultipleSources.java:119)
```
---------- BEGIN SOURCE ----------
package test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class DatagramChannelReceiveFromMultipleSources
{
private static final int MESSAGE_COUNT = 1_000_000;
public static void main(final String[] args) throws IOException
{
final int numSenders;
if (0 != args.length)
{
numSenders = Integer.parseInt(args[0]);
if (numSenders <= 0)
{
throw new IllegalArgumentException("Invalid number of senders: " + numSenders);
}
}
else
{
numSenders = 5;
}
final InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
final InetSocketAddress receiverAddress = new InetSocketAddress(loopbackAddress, 10000);
final DatagramChannel receiver = DatagramChannel.open(StandardProtocolFamily.INET);
receiver.bind(receiverAddress);
receiver.configureBlocking(false);
final DatagramChannel[] senders = new DatagramChannel[numSenders];
final long[] data = new long[senders.length];
for (int i = 0; i < senders.length; i++)
{
final DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET)
.bind(new InetSocketAddress(loopbackAddress, 10001 + i))
.connect(receiverAddress);
datagramChannel.configureBlocking(false);
senders[i] = datagramChannel;
}
final ByteBuffer buffer = ByteBuffer.allocateDirect(8);
final Set<SocketAddress> senderAddresses = Collections.newSetFromMap(new IdentityHashMap<>(numSenders));
final int iterations = MESSAGE_COUNT / numSenders;
System.out.printf("Sending %,d messages using %d senders...%n", MESSAGE_COUNT, numSenders);
final long startNs = System.nanoTime();
for (int iteration = 0; iteration < iterations; iteration++)
{
for (int i = 0; i < numSenders; i++)
{
final DatagramChannel sender = senders[i];
data[i] = ThreadLocalRandom.current().nextLong();
buffer.rewind();
buffer.putLong(data[i]);
buffer.flip();
while (buffer.hasRemaining())
{
if (0 == sender.send(buffer, receiverAddress))
{
Thread.yield();
}
}
}
for (int i = 0; i < numSenders; i++)
{
buffer.rewind();
SocketAddress socketAddress;
while (null == (socketAddress = receiver.receive(buffer)))
{
Thread.yield();
}
senderAddresses.add(socketAddress);
buffer.flip();
if (8 != buffer.remaining())
{
throw new IllegalStateException("Short read: " + buffer.remaining());
}
final long receivedValue = buffer.getLong();
boolean found = false;
for (final long sentValue : data)
{
if (receivedValue == sentValue)
{
found = true;
break;
}
}
if (!found)
{
throw new IllegalStateException("Unknown value: " + receivedValue + ", sentData=" +
Arrays.toString(data));
}
}
}
final long endNs = System.nanoTime();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(endNs - startNs) + " ms");
if (senderAddresses.size() != numSenders)
{
throw new IllegalStateException("Sender addresses not cached: expected=" + numSenders + ", got=" +
senderAddresses.size());
}
}
}
---------- END SOURCE ----------
Using `java.nio.channels.DatagramChannel#receive` to receive UDP datagrams from multiple sources leads to `InetSocketAddress` being allocated for every packet received. The culprit is `sun.nio.ch.DatagramChannelImpl#sourceSocketAddress` method which caches last returned address. However, when there are multiple sources the cache value is useless, i.e. is constantly replaced on every received packet.
STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
Run attached code (DatagramChannelReceiveFromMultipleSources) with one sender (OK case) and greater than one (error case, default is 5). The former will succeed while the latter will fail with an exception.
EXPECTED VERSUS ACTUAL BEHAVIOR :
EXPECTED -
Test program completes without errors regarding to the number of senders used.
ACTUAL -
With one sender application succeeds, e.g.:
```
Sending 1,000,000 messages using 1 senders...
Duration: 3257 ms
```
Whereas with more than sender it fails with an error:
```
Sending 1,000,000 messages using 5 senders...
Duration: 3360 ms
Exception in thread "main" java.lang.IllegalStateException: Sender addresses not cached: expected=5, got=1000000
at test.DatagramChannelReceiveFromMultipleSources.main(DatagramChannelReceiveFromMultipleSources.java:119)
```
---------- BEGIN SOURCE ----------
package test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class DatagramChannelReceiveFromMultipleSources
{
private static final int MESSAGE_COUNT = 1_000_000;
public static void main(final String[] args) throws IOException
{
final int numSenders;
if (0 != args.length)
{
numSenders = Integer.parseInt(args[0]);
if (numSenders <= 0)
{
throw new IllegalArgumentException("Invalid number of senders: " + numSenders);
}
}
else
{
numSenders = 5;
}
final InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
final InetSocketAddress receiverAddress = new InetSocketAddress(loopbackAddress, 10000);
final DatagramChannel receiver = DatagramChannel.open(StandardProtocolFamily.INET);
receiver.bind(receiverAddress);
receiver.configureBlocking(false);
final DatagramChannel[] senders = new DatagramChannel[numSenders];
final long[] data = new long[senders.length];
for (int i = 0; i < senders.length; i++)
{
final DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET)
.bind(new InetSocketAddress(loopbackAddress, 10001 + i))
.connect(receiverAddress);
datagramChannel.configureBlocking(false);
senders[i] = datagramChannel;
}
final ByteBuffer buffer = ByteBuffer.allocateDirect(8);
final Set<SocketAddress> senderAddresses = Collections.newSetFromMap(new IdentityHashMap<>(numSenders));
final int iterations = MESSAGE_COUNT / numSenders;
System.out.printf("Sending %,d messages using %d senders...%n", MESSAGE_COUNT, numSenders);
final long startNs = System.nanoTime();
for (int iteration = 0; iteration < iterations; iteration++)
{
for (int i = 0; i < numSenders; i++)
{
final DatagramChannel sender = senders[i];
data[i] = ThreadLocalRandom.current().nextLong();
buffer.rewind();
buffer.putLong(data[i]);
buffer.flip();
while (buffer.hasRemaining())
{
if (0 == sender.send(buffer, receiverAddress))
{
Thread.yield();
}
}
}
for (int i = 0; i < numSenders; i++)
{
buffer.rewind();
SocketAddress socketAddress;
while (null == (socketAddress = receiver.receive(buffer)))
{
Thread.yield();
}
senderAddresses.add(socketAddress);
buffer.flip();
if (8 != buffer.remaining())
{
throw new IllegalStateException("Short read: " + buffer.remaining());
}
final long receivedValue = buffer.getLong();
boolean found = false;
for (final long sentValue : data)
{
if (receivedValue == sentValue)
{
found = true;
break;
}
}
if (!found)
{
throw new IllegalStateException("Unknown value: " + receivedValue + ", sentData=" +
Arrays.toString(data));
}
}
}
final long endNs = System.nanoTime();
System.out.println("Duration: " + TimeUnit.NANOSECONDS.toMillis(endNs - startNs) + " ms");
if (senderAddresses.size() != numSenders)
{
throw new IllegalStateException("Sender addresses not cached: expected=" + numSenders + ", got=" +
senderAddresses.size());
}
}
}
---------- END SOURCE ----------