-
Bug
-
Resolution: Unresolved
-
P4
-
None
-
6u4
-
Cause Known
-
x86
-
windows_xp
SYNOPSIS:
The FutureTask.Sync.runner variable is not managed in the same way that it is for FutureTask.run().
OPERATING SYSTEM(S):
Windows
FULL JDK VERSION(S):
Sun Java 1.5.0_14 and Sun Java 6 update4
DESCRIPTION:
Problem is observed only on a multiprocessor machine and not on single processor machine.Had run the program across JDKs and had observed the issue.
If needed,outputs of programs run can be provided.
Sample Program:
//package testFutureTask;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Demonstrates that {@link FutureTask#set} doesn't safely publish the
* value. testSet failed on my Intellistation z-Pro (dual processor, each
* hyperthreaded) with the following message:
*
* Failure counts by method (of 1642125 attempts):
* get, with infinite wait: 8189
* get, with timeout: 6268
* get, with zero wait: 4194
*
* @author James Synge
*/
public class TestFutureTaskSubClass {
public static void main(String[] args) throws Exception {
TestFutureTaskSubClass test = new TestFutureTaskSubClass();
test.test(true);
test = new TestFutureTaskSubClass();
test.test(false);
}
protected static final int TOTAL_METHODS = 3;
private Thread[] threads;
private FutureTask<int[]>[] tasks;
protected volatile Integer mt_expectedV = null;
protected volatile AFutureValue mt_fv = null;
protected volatile CountDownLatch mt_doneGetting = null;
protected volatile boolean mt_failed = false;
protected volatile boolean mt_doStop = false;
/**
* Test method for {@link java.util.concurrent.FutureTask#set(java.lang.Object)}.
* @throws Exception
*/
public void test(final boolean useSet) throws Exception {
System.out.println("--------------------------------------------");
if (useSet) {
System.out.println("Testing FutureValue.set");
}
else {
System.out.println("Testing FutureValue.run");
}
/*
* Create background threads that will get the value
* of a FutureTask provided by this foreground thread.
*/
createTasksAndThreads();
System.out.println("Started " + tasks.length + " threads to read from the FutureValue");
/*
* Run for some amount of time.
*/
final int DURATION = 10 * 1000; // 10 seconds
System.out.println("Starting main loop...");
final long startTime = System.currentTimeMillis();
final long endTime = startTime + DURATION;
int counter = 0;
mt_failed = false;
while (!mt_failed) {
long now = System.currentTimeMillis();
if (now >= endTime) {
System.out.println("Main loop duration reached");
break;
}
/*
* Objects that the threads will need after they
* get.
*/
mt_expectedV = new Integer((int)(now & 0xffffff));
mt_doneGetting = new CountDownLatch(threads.length);
/*
* The FutureValue that the threads will try to get
* the expected value from.
*/
if (useSet) {
mt_fv = new AFutureValue();
}
else {
mt_fv = new AFutureValue(new FixedResult<Integer>(mt_expectedV));
}
/*
* Yield the process before setting the value... sometimes.
*/
if (counter % 10 == 0) {
Thread.yield();
}
/*
* Set the value.
*/
if (useSet) {
assertTrue(mt_fv.mySet(mt_expectedV));
}
else {
mt_fv.run();
assertTrue(mt_fv.isDone());
assertTrue(!mt_fv.isCancelled());
}
assertSame(mt_expectedV, mt_fv.get());
/*
* Wait for the threads to finish getting the value.
*/
if (!mt_doneGetting.await(DURATION, TimeUnit.MILLISECONDS)) {
throw new AssertionError("waited too long for reader threads to read");
}
counter++;
if (counter % 100000 == 0) {
System.out.println("Completed " + counter + " loops");
}
}
mt_doStop = true;
mt_fv = null;
/*
* Wait for all threads to finish.
*/
System.out.println("Waiting for reader threads to finish");
for (Thread thread : threads) {
thread.join();
}
/*
* Did any of the threads fail?
*/
boolean didFail = false;
int[] sumFailuresByMethod = new int[TOTAL_METHODS];
for (FutureTask<int[]> task : tasks) {
int[] failuresByMethod = task.get();
for (int method = 0; method < TOTAL_METHODS; method++) {
sumFailuresByMethod[method] += failuresByMethod[method];
if (failuresByMethod[method] != 0) {
didFail = true;
}
}
}
if (!didFail) {
System.out.println("No problems encountered");
return;
}
int attempts = threads.length * counter;
String msg = String.format(
"Failure counts by method (of %d attempts):\n" +
" get, with infinite wait:\t%d\n" +
" get, with timeout:\t%d\n" +
" get, with zero wait:\t%d",
attempts,
sumFailuresByMethod[0],
sumFailuresByMethod[1],
sumFailuresByMethod[2]);
System.err.println(msg);
return;
}
private void assertSame(Integer expected, Integer actual) {
if (expected == actual) {
return;
}
String msg = String.format(
"Expected instances to be the same, but they aren't\n" +
" expected identityHashCode: %d\n" +
" actual identityHashCode: %d\n" +
" expected value: %s\n" +
" actual value: %s",
System.identityHashCode(expected),
System.identityHashCode(actual),
expected, actual);
throw new AssertionError(msg);
}
private void assertTrue(boolean v) {
if (v) {
return;
}
throw new AssertionError("Expected true, as false");
}
/**
* Need at least one background thread, and ideally want one thread
* (including the foreground testing thread) on each processor.
*/
private void createTasksAndThreads() {
int numProcessors = Runtime.getRuntime().availableProcessors();
final int numThreads = (numProcessors > 1) ? (numProcessors - 1) : numProcessors;
threads = new Thread[numThreads];
tasks = new FutureTask[numThreads];
for (int i = 0; i < threads.length; i++) {
BackgroundReader bgReader = new BackgroundReader();
FutureTask<int[]> task = new FutureTask<int[]>(bgReader);
tasks[i] = task;
Thread thread = new Thread(task);
thread.start();
threads[i] = thread;
}
}
/**
* Define the block that will be executed by the background threads.
*/
class BackgroundReader implements Callable<int[]> {
public int[] call() throws Exception {
int gets = 0;
FutureTask<Integer> prevFV = null;
FutureTask<Integer> fv = null;
int method = new Random().nextInt(3);
int[] failuresByMethod = new int[3];
int spinLoops = 0;
while (!mt_doStop) {
/*
* Spin loop to get the next FutureValue.
*/
fv = mt_fv;
if (fv == prevFV) {
spinLoops++;
if (spinLoops % 10000000 == 0) {
System.out.println(
"Long FV update loop in thread " +
Thread.currentThread().getName() +
"; count = " +
spinLoops +
"; gets = " +
gets);
}
continue;
}
if (fv == null) {
break;
}
prevFV = fv;
spinLoops = 0;
gets++;
/*
* Try several methods to get the value.
*/
Integer value = null;
if (method == 0) {
// Wait until value is set.
value = fv.get();
}
else if (method == 1) {
// Wait a reasonable amount of time.
value = fv.get(100, TimeUnit.MILLISECONDS);
}
else if (method == 2) {
// Poll for the value (i.e. don't wait, but instead spin).
while (true) {
try {
value = fv.get(0, TimeUnit.SECONDS);
break;
}
catch (TimeoutException ex) {
continue;
}
}
}
Integer expectedV = mt_expectedV;
try {
if (value == null) {
failuresByMethod[method]++;
// mt_failed = true;
// throw new AssertionError("Method #" + method + " failed");
}
else if (expectedV != value) {
mt_failed = true;
throw new AssertionError(
"Gets #" + gets +
", Method #" + method +
": wrong instance returned; expected object " +
System.identityHashCode(expectedV) +
", but got " +
System.identityHashCode(value));
}
}
finally {
mt_doneGetting.countDown();
}
method++;
if (method > 2) {
method = 0;
}
}
return failuresByMethod;
}
}
/**
* A sub-class of {@link FutureTask} that attempts to use FutureTask.set.
*/
class AFutureValue extends FutureTask<Integer> {
private volatile Thread creatorThread = Thread.currentThread();
public AFutureValue() {
super(new NeverCalled<Integer>());
return;
}
public AFutureValue(FixedResult<Integer> callable) {
super(callable);
return;
}
public boolean mySet(Integer value) {
if (isDone()) {
return false;
}
if (creatorThread != Thread.currentThread()) {
throw new IllegalStateException("Must only call from creator's thread");
}
try {
super.set(value);
if (isCancelled()) {
return false;
}
else {
return true;
}
}
finally {
creatorThread = null;
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return super.cancel(mayInterruptIfRunning);
}
@Override
public Integer get() throws InterruptedException {
if (creatorThread == Thread.currentThread() && !isDone()) {
throw new IllegalStateException("Must not call from creator's thread");
}
try {
Integer result = super.get();
return result;
} catch (ExecutionException e) {
// Can't happen.
throw new IllegalStateException(e);
}
}
@Override
public Integer get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
if (creatorThread == Thread.currentThread() && !isDone()) {
throw new IllegalStateException("Must not call from creator's thread");
}
try {
Integer result = super.get(timeout, unit);
return result;
} catch (ExecutionException e) {
// Can't happen.
throw new IllegalStateException(e);
}
}
}
class NeverCalled<V> implements Callable<V> {
public V call() throws Exception {
throw new UnsupportedOperationException();
}
}
class FixedResult<V> implements Callable<V> {
private final V result;
FixedResult(V result) {
this.result = result;
return;
}
public V call() throws Exception {
return result;
}
}
}
The FutureTask.Sync.runner variable is not managed in the same way that it is for FutureTask.run().
OPERATING SYSTEM(S):
Windows
FULL JDK VERSION(S):
Sun Java 1.5.0_14 and Sun Java 6 update4
DESCRIPTION:
Problem is observed only on a multiprocessor machine and not on single processor machine.Had run the program across JDKs and had observed the issue.
If needed,outputs of programs run can be provided.
Sample Program:
//package testFutureTask;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Demonstrates that {@link FutureTask#set} doesn't safely publish the
* value. testSet failed on my Intellistation z-Pro (dual processor, each
* hyperthreaded) with the following message:
*
* Failure counts by method (of 1642125 attempts):
* get, with infinite wait: 8189
* get, with timeout: 6268
* get, with zero wait: 4194
*
* @author James Synge
*/
public class TestFutureTaskSubClass {
public static void main(String[] args) throws Exception {
TestFutureTaskSubClass test = new TestFutureTaskSubClass();
test.test(true);
test = new TestFutureTaskSubClass();
test.test(false);
}
protected static final int TOTAL_METHODS = 3;
private Thread[] threads;
private FutureTask<int[]>[] tasks;
protected volatile Integer mt_expectedV = null;
protected volatile AFutureValue mt_fv = null;
protected volatile CountDownLatch mt_doneGetting = null;
protected volatile boolean mt_failed = false;
protected volatile boolean mt_doStop = false;
/**
* Test method for {@link java.util.concurrent.FutureTask#set(java.lang.Object)}.
* @throws Exception
*/
public void test(final boolean useSet) throws Exception {
System.out.println("--------------------------------------------");
if (useSet) {
System.out.println("Testing FutureValue.set");
}
else {
System.out.println("Testing FutureValue.run");
}
/*
* Create background threads that will get the value
* of a FutureTask provided by this foreground thread.
*/
createTasksAndThreads();
System.out.println("Started " + tasks.length + " threads to read from the FutureValue");
/*
* Run for some amount of time.
*/
final int DURATION = 10 * 1000; // 10 seconds
System.out.println("Starting main loop...");
final long startTime = System.currentTimeMillis();
final long endTime = startTime + DURATION;
int counter = 0;
mt_failed = false;
while (!mt_failed) {
long now = System.currentTimeMillis();
if (now >= endTime) {
System.out.println("Main loop duration reached");
break;
}
/*
* Objects that the threads will need after they
* get.
*/
mt_expectedV = new Integer((int)(now & 0xffffff));
mt_doneGetting = new CountDownLatch(threads.length);
/*
* The FutureValue that the threads will try to get
* the expected value from.
*/
if (useSet) {
mt_fv = new AFutureValue();
}
else {
mt_fv = new AFutureValue(new FixedResult<Integer>(mt_expectedV));
}
/*
* Yield the process before setting the value... sometimes.
*/
if (counter % 10 == 0) {
Thread.yield();
}
/*
* Set the value.
*/
if (useSet) {
assertTrue(mt_fv.mySet(mt_expectedV));
}
else {
mt_fv.run();
assertTrue(mt_fv.isDone());
assertTrue(!mt_fv.isCancelled());
}
assertSame(mt_expectedV, mt_fv.get());
/*
* Wait for the threads to finish getting the value.
*/
if (!mt_doneGetting.await(DURATION, TimeUnit.MILLISECONDS)) {
throw new AssertionError("waited too long for reader threads to read");
}
counter++;
if (counter % 100000 == 0) {
System.out.println("Completed " + counter + " loops");
}
}
mt_doStop = true;
mt_fv = null;
/*
* Wait for all threads to finish.
*/
System.out.println("Waiting for reader threads to finish");
for (Thread thread : threads) {
thread.join();
}
/*
* Did any of the threads fail?
*/
boolean didFail = false;
int[] sumFailuresByMethod = new int[TOTAL_METHODS];
for (FutureTask<int[]> task : tasks) {
int[] failuresByMethod = task.get();
for (int method = 0; method < TOTAL_METHODS; method++) {
sumFailuresByMethod[method] += failuresByMethod[method];
if (failuresByMethod[method] != 0) {
didFail = true;
}
}
}
if (!didFail) {
System.out.println("No problems encountered");
return;
}
int attempts = threads.length * counter;
String msg = String.format(
"Failure counts by method (of %d attempts):\n" +
" get, with infinite wait:\t%d\n" +
" get, with timeout:\t%d\n" +
" get, with zero wait:\t%d",
attempts,
sumFailuresByMethod[0],
sumFailuresByMethod[1],
sumFailuresByMethod[2]);
System.err.println(msg);
return;
}
private void assertSame(Integer expected, Integer actual) {
if (expected == actual) {
return;
}
String msg = String.format(
"Expected instances to be the same, but they aren't\n" +
" expected identityHashCode: %d\n" +
" actual identityHashCode: %d\n" +
" expected value: %s\n" +
" actual value: %s",
System.identityHashCode(expected),
System.identityHashCode(actual),
expected, actual);
throw new AssertionError(msg);
}
private void assertTrue(boolean v) {
if (v) {
return;
}
throw new AssertionError("Expected true, as false");
}
/**
* Need at least one background thread, and ideally want one thread
* (including the foreground testing thread) on each processor.
*/
private void createTasksAndThreads() {
int numProcessors = Runtime.getRuntime().availableProcessors();
final int numThreads = (numProcessors > 1) ? (numProcessors - 1) : numProcessors;
threads = new Thread[numThreads];
tasks = new FutureTask[numThreads];
for (int i = 0; i < threads.length; i++) {
BackgroundReader bgReader = new BackgroundReader();
FutureTask<int[]> task = new FutureTask<int[]>(bgReader);
tasks[i] = task;
Thread thread = new Thread(task);
thread.start();
threads[i] = thread;
}
}
/**
* Define the block that will be executed by the background threads.
*/
class BackgroundReader implements Callable<int[]> {
public int[] call() throws Exception {
int gets = 0;
FutureTask<Integer> prevFV = null;
FutureTask<Integer> fv = null;
int method = new Random().nextInt(3);
int[] failuresByMethod = new int[3];
int spinLoops = 0;
while (!mt_doStop) {
/*
* Spin loop to get the next FutureValue.
*/
fv = mt_fv;
if (fv == prevFV) {
spinLoops++;
if (spinLoops % 10000000 == 0) {
System.out.println(
"Long FV update loop in thread " +
Thread.currentThread().getName() +
"; count = " +
spinLoops +
"; gets = " +
gets);
}
continue;
}
if (fv == null) {
break;
}
prevFV = fv;
spinLoops = 0;
gets++;
/*
* Try several methods to get the value.
*/
Integer value = null;
if (method == 0) {
// Wait until value is set.
value = fv.get();
}
else if (method == 1) {
// Wait a reasonable amount of time.
value = fv.get(100, TimeUnit.MILLISECONDS);
}
else if (method == 2) {
// Poll for the value (i.e. don't wait, but instead spin).
while (true) {
try {
value = fv.get(0, TimeUnit.SECONDS);
break;
}
catch (TimeoutException ex) {
continue;
}
}
}
Integer expectedV = mt_expectedV;
try {
if (value == null) {
failuresByMethod[method]++;
// mt_failed = true;
// throw new AssertionError("Method #" + method + " failed");
}
else if (expectedV != value) {
mt_failed = true;
throw new AssertionError(
"Gets #" + gets +
", Method #" + method +
": wrong instance returned; expected object " +
System.identityHashCode(expectedV) +
", but got " +
System.identityHashCode(value));
}
}
finally {
mt_doneGetting.countDown();
}
method++;
if (method > 2) {
method = 0;
}
}
return failuresByMethod;
}
}
/**
* A sub-class of {@link FutureTask} that attempts to use FutureTask.set.
*/
class AFutureValue extends FutureTask<Integer> {
private volatile Thread creatorThread = Thread.currentThread();
public AFutureValue() {
super(new NeverCalled<Integer>());
return;
}
public AFutureValue(FixedResult<Integer> callable) {
super(callable);
return;
}
public boolean mySet(Integer value) {
if (isDone()) {
return false;
}
if (creatorThread != Thread.currentThread()) {
throw new IllegalStateException("Must only call from creator's thread");
}
try {
super.set(value);
if (isCancelled()) {
return false;
}
else {
return true;
}
}
finally {
creatorThread = null;
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// TODO Auto-generated method stub
return super.cancel(mayInterruptIfRunning);
}
@Override
public Integer get() throws InterruptedException {
if (creatorThread == Thread.currentThread() && !isDone()) {
throw new IllegalStateException("Must not call from creator's thread");
}
try {
Integer result = super.get();
return result;
} catch (ExecutionException e) {
// Can't happen.
throw new IllegalStateException(e);
}
}
@Override
public Integer get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
if (creatorThread == Thread.currentThread() && !isDone()) {
throw new IllegalStateException("Must not call from creator's thread");
}
try {
Integer result = super.get(timeout, unit);
return result;
} catch (ExecutionException e) {
// Can't happen.
throw new IllegalStateException(e);
}
}
}
class NeverCalled<V> implements Callable<V> {
public V call() throws Exception {
throw new UnsupportedOperationException();
}
}
class FixedResult<V> implements Callable<V> {
private final V result;
FixedResult(V result) {
this.result = result;
return;
}
public V call() throws Exception {
return result;
}
}
}
- relates to
-
JDK-6665818 FutureTask bullet-proofing
-
- Closed
-