Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8216330

All of threads are in waiting State and not getting a chance to execute

XMLWordPrintable

    • x86_64
    • windows_7

      ADDITIONAL SYSTEM INFORMATION :
      Windows Any OS, JRE 1.8.171,

      A DESCRIPTION OF THE PROBLEM :
      All thread are in waiting state and not getting a chance to execute. Not understanding what is the problem and how to resolve it. If we restart service then again the process is getting started. However again it gets accumlated and the respective functionality is stuck

      REGRESSION : Last worked in version 8u172

      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      It is my application and transferring file from temp storage to actual storage. For that we are using ThreadPoolExecutor with LinkedBlockingQueue<Runnable>(); Looks like at take() method it is stopped and not proceeding further.

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      Process should not be blocked.
      ACTUAL -
      All threads are in waiting state.

      ---------- BEGIN SOURCE ----------
      package com.mpi.framework.task.impl.juc;

      import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.Collection;
      import java.util.Collections;
      import java.util.Iterator;
      import java.util.List;
      import java.util.concurrent.BlockingQueue;
      import java.util.concurrent.Callable;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.Future;
      import java.util.concurrent.FutureTask;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.RejectedExecutionException;
      import java.util.concurrent.ThreadFactory;
      import java.util.concurrent.ThreadPoolExecutor;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicInteger;

      import org.apache.log4j.LogManager;
      import org.apache.log4j.Logger;
      import org.springframework.core.OrderComparator;

      import com.mpi.framework.task.CancelableTask;
      import com.mpi.framework.task.JobDetailInfo;
      import com.mpi.framework.task.QueueFullException;
      import com.mpi.framework.task.Task;
      import com.mpi.framework.task.TaskExecutionListener;
      import com.mpi.framework.task.ThreadPoolDescriptor;
      import com.mpi.framework.task.impl.ThreadPool;

      public class JUCThreadPool implements ThreadPool, JUCThreadPoolMBean {
      private static final Logger log = LogManager.getLogger(JUCThreadPool.class);

      private static final Object NULL = new Object();

      private final ThreadPoolExecutor executor;
      private final ThreadPoolDescriptor descriptor;
      private final List<TaskExecutionListener> preListeners;
      private final List<TaskExecutionListener> postListeners;

      private volatile long executedJobsCount;
      private volatile long totalWaitTime;

      private ConcurrentHashMap<InternalFutureTask, Object> activeJobs = new ConcurrentHashMap<InternalFutureTask, Object>();

      public static final int DEFAULT_PROCESSOR_COUNT = 4; //based on our typical appliance

      @SuppressWarnings("unchecked")
      public JUCThreadPool(ThreadPoolDescriptor desc, Collection<TaskExecutionListener> globalListeners) {
      this.descriptor = desc;
      this.preListeners = new ArrayList<TaskExecutionListener>();
      preListeners.addAll(globalListeners);
      preListeners.addAll(Arrays.asList(desc.getListeners()));
      Collections.sort(preListeners, new OrderComparator());

      this.postListeners = new ArrayList<TaskExecutionListener>(preListeners);
      Collections.reverse(postListeners);

      BlockingQueue<Runnable> queue;
      if(desc.getMaxQueueSize() > -1) {
      queue = new LinkedBlockingQueue<Runnable>(desc.getMaxQueueSize());
      } else {
      queue = new LinkedBlockingQueue<Runnable>();
      }

      int processorCount = Runtime.getRuntime().availableProcessors();
      if (processorCount < 1){
      processorCount = DEFAULT_PROCESSOR_COUNT;
      }
      int maxPoolSize = desc.getMaxPoolSize();
      if (maxPoolSize < 0){
      maxPoolSize = Math.abs(maxPoolSize) * processorCount;
      if (maxPoolSize < 1){
      maxPoolSize = 1;
      }
      }
      int corePoolSize = desc.getCorePoolSize();
      if (corePoolSize < 0){
      corePoolSize = Math.abs(corePoolSize) * processorCount;
      if (corePoolSize < 1){
      corePoolSize = 1;
      }
      }
      if (maxPoolSize < corePoolSize){
      maxPoolSize = corePoolSize;
      }
      log.info("Creating thread pool executor " + descriptor.getId() + " with a core size of " + corePoolSize + " and a max pool size of " + maxPoolSize);
      executor = new ThreadPoolExecutor(
      corePoolSize,
      maxPoolSize,
      desc.getMaxThreadKeepAliveTimeInMillis(),
      TimeUnit.MILLISECONDS,
      queue,
      new ThreadFactory() {
      final ThreadGroup group = new ThreadGroup(descriptor.getThreadPrefix());
      final AtomicInteger threadNumber = new AtomicInteger(1);
      @Override
      public Thread newThread(Runnable r) {
      Thread t = new Thread(group, r, descriptor.getThreadPrefix() + "-" + threadNumber.getAndIncrement(), 0);
      t.setDaemon(true);
      return t;
      }
      }
      ) {
      @Override
      protected void beforeExecute(Thread t, Runnable r) {
      InternalFutureTask ft = ((InternalFutureTask<?>)r);
      executedJobsCount++;
      totalWaitTime += System.currentTimeMillis() - ft.getCreationTime();
      Task<?> task = ft.getOriginalTask();
      try {
      for(TaskExecutionListener listener : preListeners) {
      listener.beforeExecution(task);
      }
      ft.getListener().beforeExecution(task);
      } catch (Exception e) {
      ((InternalFutureTask<?>)r).markInvalid();
      log.warn("Will not execute task " + task + " due to error: ", e);
      }

      activeJobs.put(ft, NULL);
      }

      @Override
      protected void afterExecute(Runnable r, Throwable t) {
      InternalFutureTask ft = ((InternalFutureTask<?>)r);
      activeJobs.remove(ft);
      Task<?> task = ft.getOriginalTask();
      Object value = null;
      if(null == t && !ft.isCancelled()) {
      try {
      value = ft.get();
      } catch (InterruptedException e) {
      //Should not happen
      } catch (ExecutionException e) {
      t = e.getCause();
      }
      }

      if (t != null) {
      log.warn("Error executing background task", t);
      }

      try {
      for(TaskExecutionListener listener : postListeners) {
      listener.afterExecution(task, value, ft.isCancelled(), t);
      }
      ft.getListener().afterExecution(task, value, ft.isCancelled(), t);
      } catch (Exception e) {
      log.warn("Error executing listeners for task " + task, e);
      }
      }

      protected <T extends Object> java.util.concurrent.RunnableFuture<T> newTaskFor(java.util.concurrent.Callable<T> callable) {
      TaskWithListener<T> internal = (TaskWithListener<T>)callable;
      return new InternalFutureTask<T>((Task<T>)internal.getCallable(), internal.getListener(), System.currentTimeMillis());
      }

      protected <T extends Object> java.util.concurrent.RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
      throw new UnsupportedOperationException();
      }
      };
      executor.prestartAllCoreThreads();
      }

      @Override
      public boolean hasAvailableThreads() {
      return executor.getActiveCount() < executor.getMaximumPoolSize();
      }

      @Override
      public void destroy() {
      executor.shutdownNow();
      }

      @Override
      public <T> Future<T> submitTask(Task<T> task, TaskExecutionListener listener) throws QueueFullException {
      descriptor.validateTask(task);
      try {
      return executor.submit(new TaskWithListener<T>(task, listener));
      } catch (RejectedExecutionException e) {
      throw new QueueFullException();
      }
      }

      @Override
      public int getActiveThreadCount() {
      return executor.getActiveCount();
      }

      public long getAverageWaitTime() {
      return totalWaitTime / executedJobsCount;
      };

      public String getExecutingJobs() {
      StringBuilder ret = new StringBuilder();
      for(InternalFutureTask ft : activeJobs.keySet()) {
      Task t = ft.getOriginalTask();
      String name = t.getTaskName();
      int id = System.identityHashCode(t);
      ret.append("[id: ");
      ret.append(id);
      ret.append(", name: ");
      ret.append(name);
      ret.append("] ");
      }
      return ret.toString();
      };

      @Override
      public List<JobDetailInfo> getExecutingJobsList() {
      List<JobDetailInfo> executingJobsList = new ArrayList<JobDetailInfo>();
      Iterator<InternalFutureTask> itr = activeJobs.keySet().iterator();
      while(itr.hasNext()) {
      InternalFutureTask ft = itr.next();
      Task t = ft.getOriginalTask();
      String name = t.getTaskName();
      int id = System.identityHashCode(t);

      JobDetailInfo jobDetailInfo = new JobDetailInfo();
      jobDetailInfo.setId(id);
      jobDetailInfo.setName(name);
      jobDetailInfo.setDetails(t.toString());
      executingJobsList.add(jobDetailInfo);
      }
      return executingJobsList;
      }

      @Override
      public int getJobQueueSize() {
      return executor.getQueue().size();
      }

      @Override
      public int getMaxThreadCount() {
      return executor.getMaximumPoolSize();
      }

      @Override
      public long getNumberOfExecutedJobs() {
      return executedJobsCount;
      }

      @Override
      public String getQueuedJobs() {
      StringBuilder ret = new StringBuilder();
      Iterator<Runnable> itr = executor.getQueue().iterator();
      while(itr.hasNext()) {
      InternalFutureTask ft = (InternalFutureTask)itr.next();
      Task t = ft.getOriginalTask();
      String name = t.getTaskName();
      int id = System.identityHashCode(t);
      ret.append("[id: ");
      ret.append(id);
      ret.append(", name: ");
      ret.append(name);
      ret.append("] ");
      }
      return ret.toString();
      }

      @Override
      public List<JobDetailInfo> getQueuedJobsList() {
      List<JobDetailInfo> queuedJobsList = new ArrayList<JobDetailInfo>();
      Iterator<Runnable> itr = executor.getQueue().iterator();
      while(itr.hasNext()) {
      InternalFutureTask ft = (InternalFutureTask)itr.next();
      Task t = ft.getOriginalTask();
      String name = t.getTaskName();
      int id = System.identityHashCode(t);

      JobDetailInfo jobDetailInfo = new JobDetailInfo();
      jobDetailInfo.setId(id);
      jobDetailInfo.setName(name);
      jobDetailInfo.setDetails(t.toString());
      queuedJobsList.add(jobDetailInfo);
      }
      return queuedJobsList;
      }

      @Override
      public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append("Job Queue Size: ").append(this.getJobQueueSize())
      .append("Max Thread Count: ").append(this.getMaxThreadCount()).append(" ").append(System.lineSeparator())
      .append("Active Thread Count: ").append(getActiveThreadCount()).append(" ").append(System.lineSeparator())
      .append("hasAvailableThreads: ").append(hasAvailableThreads()).append(" ").append(System.lineSeparator())
      .append("executedJobsCount: ").append(getNumberOfExecutedJobs()).append(" ").append(System.lineSeparator())
      .append("Executing Jobs: ").append(getExecutingJobs()).append(" ").append(System.lineSeparator())
      .append("Queued Jobs: ").append(getQueuedJobs());
      return sb.toString();
      }

      @Override
      public void cancelActiveTask(int id) {
      for(InternalFutureTask ft : activeJobs.keySet()) {
      if(System.identityHashCode(ft.getOriginalTask()) == id) {
      ft.cancel(true);
      break;
      }
      }
      }

      public void cancelQueueTask(int id) {
      Iterator<Runnable> itr = executor.getQueue().iterator();
      while(itr.hasNext()) {
      InternalFutureTask ft = (InternalFutureTask)itr.next();
      if(System.identityHashCode(ft) == id) {
      ft.cancel(true);
      break;
      }
      }
      };

      private static class TaskWithListener<T> implements Callable<T> {
      private final Callable<T> callable;
      private final TaskExecutionListener listener;

      public TaskWithListener(Callable<T> callable,
      TaskExecutionListener listener) {
      super();
      this.callable = callable;
      this.listener = listener;
      }

      @Override
      public T call() throws Exception {
      return callable.call();
      }

      public Callable<T> getCallable() {
      return callable;
      }

      public TaskExecutionListener getListener() {
      return listener;
      }
      }

      private static class InternalFutureTask<T> extends FutureTask<T> {
      private final Task<T> originalTask;
      private final TaskExecutionListener listener;
      private final long creationTime;
      private boolean invalid = false;

      public InternalFutureTask(Task<T> callable, TaskExecutionListener listener, long creationTime) {
      super(callable);
      this.originalTask = callable;
      this.listener = listener;
      this.creationTime = creationTime;
      }

      public Task<T> getOriginalTask() {
      return originalTask;
      }

      public TaskExecutionListener getListener() {
      return listener;
      }

      public long getCreationTime() {
      return creationTime;
      }

      public void markInvalid() {
      this.invalid = true;
      }

      @Override
      @SuppressWarnings("unchecked")
      public boolean cancel(boolean mayInterruptIfRunning) {
      if(originalTask instanceof CancelableTask) {
      ((CancelableTask)originalTask).cancel();
      }
      return super.cancel(mayInterruptIfRunning);
      }

      @Override
      public void run() {
      if(!invalid) {
      super.run();
      }
      }
      }

      }

      ---------- END SOURCE ----------

      CUSTOMER SUBMITTED WORKAROUND :
      no work around

      FREQUENCY : always


            psonal Pallavi Sonal (Inactive)
            webbuggrp Webbug Group
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: