package ru.ok.bug8202252;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * AIO server accepts 1000 connections and responses with a large message.
 * The buffers should be available for reclamation as soon as the server
 * finishes writing. Since the client connections come sequentially,
 * there should be no memory leak. However, due to the JDK bug,
 * AsynchronousSocketChannel holds the reference to the CompletionHandler.
 * <p/>
 * Run with <b>-Xmx256m -XX:MaxDirectMemorySize=1g</b>.
 *
 * @author incubos
 * @link https://bugs.openjdk.java.net/browse/JDK-8202252
 */
public class AsyncChannelLeak {
    private static final int CLIENTS = 100;
    private static final int MESSAGE_SIZE = 16 * 1024 * 1024;

    public void runServer() throws Exception {
        List<AsynchronousSocketChannel> clients = new ArrayList<>();
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(1, Thread::new);

        try (AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(group)) {
            listener.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
            SocketAddress serverAddress = listener.getLocalAddress();

            new Thread(() -> runClients(serverAddress)).start();

            for (int i = 0; i < CLIENTS; i++) {
                AsynchronousSocketChannel client = listener.accept().get();
                clients.add(client);

                byte[] data = new byte[MESSAGE_SIZE];
                ThreadLocalRandom.current().nextBytes(data);
                ByteBuffer buffer = ByteBuffer.wrap(data);

                client.write(buffer, i, new CompletionHandler<Integer, Integer>() {
                    @Override
                    public void completed(Integer result, Integer clientId) {
                        // The callback keeps the reference to data array
                        if (buffer.hasRemaining()) {
                            client.write(buffer, clientId, this);
                            return;
                        }

                        System.out.println("Sent message #" + clientId);
                    }

                    @Override
                    public void failed(Throwable exc, Integer clientId) {
                        exc.printStackTrace();
                    }
                });
            }
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        } finally {
            group.shutdown();
        }
    }

    public void runClients(SocketAddress serverAddress) {
        try {
            ByteBuffer buf = ByteBuffer.allocateDirect(MESSAGE_SIZE);

            for (int i = 0; i < CLIENTS; i++) {
                SocketChannel ch = SocketChannel.open(serverAddress);

                while (buf.hasRemaining()) {
                    ch.read(buf);
                }
                buf.clear();

                System.out.println("Received message #" + i);
                ch.close();
            }
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(1);
        }

        System.out.println("Test passed");
        System.exit(0);
    }

    public static void main(final String[] args) throws Exception {
        new AsyncChannelLeak().runServer();
    }
}
