import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;

public class JdkSchedulerIssue {

    @Test
    public void futureNeverStartsNorFinishesIfSubmittedOnScheduledExecutorBeforeShutdownNow() throws Exception {
        int threadCount = 3;

        for (int retryIndex = 0; retryIndex < 100_000; retryIndex++) {

            System.out.println((retryIndex + 1) + ". attempt");

            ExecutorService executor = Executors.newScheduledThreadPool(threadCount); // problematic executor
// ExecutorService executor = Executors.newFixedThreadPool(threadCount); // fixed thread pool executor has no such issue

            List<AtomicBoolean> hasStartedFlags = range(0, threadCount).mapToObj(__ -> new AtomicBoolean(false)).collect(toList());
            List<Future<Integer>> futures = new ArrayList<>();

            for (int threadIndex = 0; threadIndex < threadCount; threadIndex++) {
                int index = threadIndex;

                Callable<Integer> callable = () -> {
                    hasStartedFlags.get(index).set(true);

                    do {
                        Thread.sleep(randomInt(500, 750));
                    } while (!Thread.interrupted());

                    return 0;
                };

                futures.add(
                        executor.submit(callable)
                );
            }

// Thread.sleep(200); // this fixes it

            List<Runnable> drainedRunnables = executor.shutdownNow();

            executor.awaitTermination(10, TimeUnit.SECONDS); // so this should guarantee that all submitted tasks have ended

            if (drainedRunnables.size() < threadCount) { // this is not necessary

                System.out.println("drained runnables count = " + drainedRunnables.size());

                for (int threadIndex = 0; threadIndex < threadCount; threadIndex++) {
                    Future<Integer> future = futures.get(threadIndex);
                    try {

                        future.get(15, TimeUnit.SECONDS); // 15 seconds is an overkill, just to make sure that there wasn't some delay while cancelling or finishing the Callable

                    } catch (TimeoutException e) { // There should be no timeout
                        boolean hasStarted = hasStartedFlags.get(threadIndex).get();
                        boolean isCancelled = future.isCancelled();
                        boolean isDone = future.isDone();

                        if (!hasStarted && !isCancelled && !isDone) {

                            System.out.println("hasStarted = " + hasStarted);
                            System.out.println("isCancelled = " + isCancelled);
                            System.out.println("isDone = " + isDone);
                            throw new IllegalStateException();
                        }
                    } catch (Exception e) {
                        // do nothing
                    }
                }
            }
        }
    }

    public static int randomInt(int from, int to, Function<Integer, Boolean>... validityConditions) {
        return (int) ((long) (Math.random() * ((long) to - (long) from + 1L)) + (long) from);
    }
} 