Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8323659

LinkedTransferQueue add and put methods call overridable offer

XMLWordPrintable

    • b08
    • 22
    • b06
    • Verified

        The refactoring in JDK-8301341 resulted in `put` and `add` calling overridable `offer`. This change, while technically correct, does have an observable affect on existing code. For example, the below code snippet use to work with all previous JDK's, but no longer successfully executes the tasks with JDK 22 EA.

        This snippet is a reduced artificial reproducer that demonstrates the issue. The crux of what the code is attempting to do is to scale the executor to max pool size.

        I'm adding the regression label, as the affect of the change appears as a regression when upgrading from JDK 21 to JDK 22 EA. However, I do understand and accept that LinkedTransferQueue::offer is not really supposed to return false!

        ---
        import java.util.concurrent.*;

        public class Test {

            public static void main(String... args) throws Exception {
                ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
                ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    0,
                    1,
                    60,
                    TimeUnit.SECONDS,
                    queue,
                    Executors.defaultThreadFactory(),
                    new ForceQueuePolicy()
                );
                queue.executor = executor;

                final CountDownLatch countDownLatch = new CountDownLatch(10);

                class TestTask implements Runnable {
                    @Override
                    public void run() {
                        countDownLatch.countDown();
                        if (countDownLatch.getCount() > 0) {
                            executor.execute(TestTask.this);
                        }
                    }
                }

                executor.execute(new TestTask());
                countDownLatch.await();
                executor.shutdown();
                boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
                assert terminated;
                System.out.println("FINISHED");
            }

            static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
                ThreadPoolExecutor executor;

                @Override
                public boolean offer(E e) {
                    // first try to transfer to a waiting worker thread
                    if (tryTransfer(e) == false) {
                        // check if there might be spare capacity in the thread pool executor
                        int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
                        if (left > 0) {
                            // reject queuing the task to force the thread pool
                            // executor to add a worker if it can; combined
                            // with ForceQueuePolicy, this causes the thread
                            // pool to always scale up to max pool size and we
                            // only queue when there is no spare capacity
                            return false;
                        } else {
                            return super.offer(e);
                        }
                    } else {
                        return true;
                    }
                }
            }

            static class ForceQueuePolicy implements RejectedExecutionHandler {

                @Override
                public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
                    if (executor.isShutdown()) {
                        throw new RejectedExecutionException();
                    } else {
                        put(executor, task);
                        // we need to check again the executor state as it might have been concurrently shut down; in this case
                        // the executor's workers are shutting down and might have already picked up the task for execution.
                        if (executor.isShutdown() && executor.remove(task)) {
                            throw new RejectedExecutionException();
                        }
                    }
                }

                void put(ThreadPoolExecutor executor, Runnable task) {
                    try {
                        executor.getQueue().put(task);
                    } catch (final InterruptedException e) {
                        throw new AssertionError(e);
                    }
                }
            }
        }

              chegar Chris Hegarty
              chegar Chris Hegarty
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

                Created:
                Updated:
                Resolved: