-
Bug
-
Resolution: Duplicate
-
P4
-
None
-
8
ADDITIONAL SYSTEM INFORMATION :
CentOS Linux release 7.5.1804 (Core)
A DESCRIPTION OF THE PROBLEM :
When there are two thread, which take from queue, which use LBQSpliterator to traverse the queue. The LBQSpliterator's traverse may stuck.
Let's see why?
First, see how to take()? we must konw taht the removed Node will point itself for help gc!!!
Secondly, The key point code is in forEachRemaining, we see LBQSpliterator use forEachRemaining to visit all Node. But when got item value from Node, will release the lock. If at this time, take() will be called.
The variable 'p' in forEachRemaining may point a Node which point itself, then forEachRemaining will be in dead loop.
Some detailed discussion are in https://issues.apache.org/jira/browse/YARN-10642
STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
package com.zcy.test;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
public class MockForDeadLoop {
@Test
public void test34() throws Exception{
LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
final int LOOP = 3;
final long sleepTime = 100;
boolean print = true;
eventQueue.put("a");
eventQueue.put("b");
eventQueue.put("c");
eventQueue.put("d");
eventQueue.put("e");
long joinCoef = 3;
long expectMaxTime = eventQueue.size() * LOOP * sleepTime * 2;
long joinExpectTime = expectMaxTime * joinCoef;
System.out.println("expectTime is " + expectMaxTime);
long start = System.currentTimeMillis();
Thread takeThread = new Thread(){
@Override
public void run() {
int n = LOOP;
for (int i=0;i<n;i++) {
try {
String val = eventQueue.take();
if (print) {
System.out.println("take : " + val);
}
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread iteThread = new Thread(){
@Override
public void run() {
int n = LOOP;
for (int i=0;i<n;i++) {
Map<String, Long> counterMap = eventQueue.stream().
collect(Collectors.
groupingBy(new Function<String, String>() {
@Override
public String apply(String s) {
try {
Thread.sleep(sleepTime *2 );
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
}, Collectors.counting())
);
if (print) {
for (Map.Entry<String, Long> entry : counterMap.entrySet()) {
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
}
}
}
}
};
iteThread.start();
Thread.sleep(sleepTime);
takeThread.start();
takeThread.join(joinExpectTime);
iteThread.join(joinExpectTime);
long interval = System.currentTimeMillis() - start;
System.out.println("interval = " + interval);
if (interval >= joinExpectTime) {
System.out.println("stuck!!!");
} else {
System.out.println("not stuck!!!");
}
}
}
EXPECTED VERSUS ACTUAL BEHAVIOR :
EXPECTED -
program returned.
ACTUAL -
iteThread will loop forever in fact.
---------- BEGIN SOURCE ----------
package com.zcy.test;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
public class MockForDeadLoop {
@Test
public void test34() throws Exception{
LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
final int LOOP = 3;
final long sleepTime = 100;
boolean print = true;
eventQueue.put("a");
eventQueue.put("b");
eventQueue.put("c");
eventQueue.put("d");
eventQueue.put("e");
long joinCoef = 3;
long expectMaxTime = eventQueue.size() * LOOP * sleepTime * 2;
long joinExpectTime = expectMaxTime * joinCoef;
System.out.println("expectTime is " + expectMaxTime);
long start = System.currentTimeMillis();
Thread takeThread = new Thread(){
@Override
public void run() {
int n = LOOP;
for (int i=0;i<n;i++) {
try {
String val = eventQueue.take();
if (print) {
System.out.println("take : " + val);
}
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread iteThread = new Thread(){
@Override
public void run() {
int n = LOOP;
for (int i=0;i<n;i++) {
Map<String, Long> counterMap = eventQueue.stream().
collect(Collectors.
groupingBy(new Function<String, String>() {
@Override
public String apply(String s) {
try {
Thread.sleep(sleepTime *2 );
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
}, Collectors.counting())
);
if (print) {
for (Map.Entry<String, Long> entry : counterMap.entrySet()) {
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
}
}
}
}
};
iteThread.start();
Thread.sleep(sleepTime);
takeThread.start();
takeThread.join(joinExpectTime);
iteThread.join(joinExpectTime);
long interval = System.currentTimeMillis() - start;
System.out.println("interval = " + interval);
if (interval >= joinExpectTime) {
System.out.println("stuck!!!");
} else {
System.out.println("not stuck!!!");
}
}
}
---------- END SOURCE ----------
FREQUENCY : often
CentOS Linux release 7.5.1804 (Core)
A DESCRIPTION OF THE PROBLEM :
When there are two thread, which take from queue, which use LBQSpliterator to traverse the queue. The LBQSpliterator's traverse may stuck.
Let's see why?
First, see how to take()? we must konw taht the removed Node will point itself for help gc!!!
Secondly, The key point code is in forEachRemaining, we see LBQSpliterator use forEachRemaining to visit all Node. But when got item value from Node, will release the lock. If at this time, take() will be called.
The variable 'p' in forEachRemaining may point a Node which point itself, then forEachRemaining will be in dead loop.
Some detailed discussion are in https://issues.apache.org/jira/browse/YARN-10642
STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
package com.zcy.test;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
public class MockForDeadLoop {
@Test
public void test34() throws Exception{
LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
final int LOOP = 3;
final long sleepTime = 100;
boolean print = true;
eventQueue.put("a");
eventQueue.put("b");
eventQueue.put("c");
eventQueue.put("d");
eventQueue.put("e");
long joinCoef = 3;
long expectMaxTime = eventQueue.size() * LOOP * sleepTime * 2;
long joinExpectTime = expectMaxTime * joinCoef;
System.out.println("expectTime is " + expectMaxTime);
long start = System.currentTimeMillis();
Thread takeThread = new Thread(){
@Override
public void run() {
int n = LOOP;
for (int i=0;i<n;i++) {
try {
String val = eventQueue.take();
if (print) {
System.out.println("take : " + val);
}
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread iteThread = new Thread(){
@Override
public void run() {
int n = LOOP;
for (int i=0;i<n;i++) {
Map<String, Long> counterMap = eventQueue.stream().
collect(Collectors.
groupingBy(new Function<String, String>() {
@Override
public String apply(String s) {
try {
Thread.sleep(sleepTime *2 );
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
}, Collectors.counting())
);
if (print) {
for (Map.Entry<String, Long> entry : counterMap.entrySet()) {
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
}
}
}
}
};
iteThread.start();
Thread.sleep(sleepTime);
takeThread.start();
takeThread.join(joinExpectTime);
iteThread.join(joinExpectTime);
long interval = System.currentTimeMillis() - start;
System.out.println("interval = " + interval);
if (interval >= joinExpectTime) {
System.out.println("stuck!!!");
} else {
System.out.println("not stuck!!!");
}
}
}
EXPECTED VERSUS ACTUAL BEHAVIOR :
EXPECTED -
program returned.
ACTUAL -
iteThread will loop forever in fact.
---------- BEGIN SOURCE ----------
package com.zcy.test;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Collectors;
public class MockForDeadLoop {
@Test
public void test34() throws Exception{
LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
final int LOOP = 3;
final long sleepTime = 100;
boolean print = true;
eventQueue.put("a");
eventQueue.put("b");
eventQueue.put("c");
eventQueue.put("d");
eventQueue.put("e");
long joinCoef = 3;
long expectMaxTime = eventQueue.size() * LOOP * sleepTime * 2;
long joinExpectTime = expectMaxTime * joinCoef;
System.out.println("expectTime is " + expectMaxTime);
long start = System.currentTimeMillis();
Thread takeThread = new Thread(){
@Override
public void run() {
int n = LOOP;
for (int i=0;i<n;i++) {
try {
String val = eventQueue.take();
if (print) {
System.out.println("take : " + val);
}
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread iteThread = new Thread(){
@Override
public void run() {
int n = LOOP;
for (int i=0;i<n;i++) {
Map<String, Long> counterMap = eventQueue.stream().
collect(Collectors.
groupingBy(new Function<String, String>() {
@Override
public String apply(String s) {
try {
Thread.sleep(sleepTime *2 );
} catch (InterruptedException e) {
e.printStackTrace();
}
return s;
}
}, Collectors.counting())
);
if (print) {
for (Map.Entry<String, Long> entry : counterMap.entrySet()) {
System.out.println("key = " + entry.getKey() + ", value = " + entry.getValue());
}
}
}
}
};
iteThread.start();
Thread.sleep(sleepTime);
takeThread.start();
takeThread.join(joinExpectTime);
iteThread.join(joinExpectTime);
long interval = System.currentTimeMillis() - start;
System.out.println("interval = " + interval);
if (interval >= joinExpectTime) {
System.out.println("stuck!!!");
} else {
System.out.println("not stuck!!!");
}
}
}
---------- END SOURCE ----------
FREQUENCY : often
- duplicates
-
JDK-8171051 LinkedBlockingQueue spliterator needs to support node self-linking
-
- Resolved
-