Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8263152

LinkedBlockingQueue can get stuck in mulit-thread scene.

    XMLWordPrintable

Details

    Description

      ADDITIONAL SYSTEM INFORMATION :
      CentOS Linux release 7.5.1804 (Core)

      A DESCRIPTION OF THE PROBLEM :
      When there are two thread, which take from queue, which use LBQSpliterator to traverse the queue. The LBQSpliterator's traverse may stuck.

      Let's see why?
      First, see how to take()? we must konw taht the removed Node will point itself for help gc!!!
      Secondly, The key point code is in forEachRemaining, we see LBQSpliterator use forEachRemaining to visit all Node. But when got item value from Node, will release the lock. If at this time, take() will be called.
      The variable 'p' in forEachRemaining may point a Node which point itself, then forEachRemaining will be in dead loop.


      Some detailed discussion are in https://issues.apache.org/jira/browse/YARN-10642


      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      package com.zcy.test;

      import org.junit.Test;

      import java.util.Map;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.function.Function;
      import java.util.stream.Collectors;

      public class MockForDeadLoop {

        @Test
        public void test34() throws Exception{
          LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
          final int LOOP = 3;
          final long sleepTime = 100;
          boolean print = true;
          eventQueue.put("a");
          eventQueue.put("b");
          eventQueue.put("c");
          eventQueue.put("d");
          eventQueue.put("e");

          long joinCoef = 3;
          long expectMaxTime = eventQueue.size() * LOOP * sleepTime * 2;
          long joinExpectTime = expectMaxTime * joinCoef;
          System.out.println("expectTime is " + expectMaxTime);

          long start = System.currentTimeMillis();
          Thread takeThread = new Thread(){
            @Override
            public void run() {
              int n = LOOP;
              for (int i=0;i<n;i++) {
                try {
                  String val = eventQueue.take();
                  if (print) {
                    System.out.println("take : " + val);
                  }
                  Thread.sleep(1);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
              }
            }
          };

          Thread iteThread = new Thread(){
            @Override
            public void run() {
              int n = LOOP;
              for (int i=0;i<n;i++) {
                Map<String, Long> counterMap = eventQueue.stream().
                  collect(Collectors.
                    groupingBy(new Function<String, String>() {
                      @Override
                      public String apply(String s) {
                        try {
                          Thread.sleep(sleepTime *2 );
                        } catch (InterruptedException e) {
                          e.printStackTrace();
                        }
                        return s;
                      }
                    }, Collectors.counting())
                  );
                if (print) {
                  for (Map.Entry<String, Long> entry : counterMap.entrySet()) {
                    System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
                  }
                }
              }
            }
          };

          iteThread.start();
          Thread.sleep(sleepTime);
          takeThread.start();
          takeThread.join(joinExpectTime);
          iteThread.join(joinExpectTime);
          long interval = System.currentTimeMillis() - start;
          System.out.println("interval = " + interval);
          if (interval >= joinExpectTime) {
            System.out.println("stuck!!!");
          } else {
            System.out.println("not stuck!!!");
          }
        }
      }

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      program returned.
      ACTUAL -
      iteThread will loop forever in fact.

      ---------- BEGIN SOURCE ----------
      package com.zcy.test;

      import org.junit.Test;

      import java.util.Map;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.function.Function;
      import java.util.stream.Collectors;

      public class MockForDeadLoop {

        @Test
        public void test34() throws Exception{
          LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
          final int LOOP = 3;
          final long sleepTime = 100;
          boolean print = true;
          eventQueue.put("a");
          eventQueue.put("b");
          eventQueue.put("c");
          eventQueue.put("d");
          eventQueue.put("e");

          long joinCoef = 3;
          long expectMaxTime = eventQueue.size() * LOOP * sleepTime * 2;
          long joinExpectTime = expectMaxTime * joinCoef;
          System.out.println("expectTime is " + expectMaxTime);

          long start = System.currentTimeMillis();
          Thread takeThread = new Thread(){
            @Override
            public void run() {
              int n = LOOP;
              for (int i=0;i<n;i++) {
                try {
                  String val = eventQueue.take();
                  if (print) {
                    System.out.println("take : " + val);
                  }
                  Thread.sleep(1);
                } catch (InterruptedException e) {
                  e.printStackTrace();
                }
              }
            }
          };

          Thread iteThread = new Thread(){
            @Override
            public void run() {
              int n = LOOP;
              for (int i=0;i<n;i++) {
                Map<String, Long> counterMap = eventQueue.stream().
                  collect(Collectors.
                    groupingBy(new Function<String, String>() {
                      @Override
                      public String apply(String s) {
                        try {
                          Thread.sleep(sleepTime *2 );
                        } catch (InterruptedException e) {
                          e.printStackTrace();
                        }
                        return s;
                      }
                    }, Collectors.counting())
                  );
                if (print) {
                  for (Map.Entry<String, Long> entry : counterMap.entrySet()) {
                    System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
                  }
                }
              }
            }
          };

          iteThread.start();
          Thread.sleep(sleepTime);
          takeThread.start();
          takeThread.join(joinExpectTime);
          iteThread.join(joinExpectTime);
          long interval = System.currentTimeMillis() - start;
          System.out.println("interval = " + interval);
          if (interval >= joinExpectTime) {
            System.out.println("stuck!!!");
          } else {
            System.out.println("not stuck!!!");
          }
        }
      }
      ---------- END SOURCE ----------

      FREQUENCY : often


      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              webbuggrp Webbug Group
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: