-
Enhancement
-
Resolution: Won't Fix
-
P5
-
None
-
7
-
x86
-
windows_xp
A DESCRIPTION OF THE REQUEST :
Is it possible it add bulk take operation to java.util.concurrent.BlockingQueue (ArrayBlockingQueue)?
Only single item 'take' is currently supported. If ArrayBlockingQueue is used in a high volume producer/ consumer scenario, then a 'bulk take" will result in less contention between consumer and producer then using 'take'.
'takeAll' is essentially 'take' and 'drainTo' done atomically. I have pasted 'takeAll' proposed method below.
JUSTIFICATION :
I have prototyped a BlockingQueue implementation with takeAll based on ArrayBlockingQueue. It has much better performance and than ArrayBlockingQueue with much less contention.
---------- BEGIN SOURCE ----------
/**
* Retrieves and removes all items in the queue, waiting if necessary
* until an element becomes available.
* @param c list to drain items to
* @throws InterruptedException
*/
public void takeAll(Collection<? super E> c) throws InterruptedException {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
final E[] items = this.items;
int i = takeIndex;
int n = 0;
int max = count;
while (n < max) {
c.add(items[i]);
items[i] = null;
i = inc(i);
++n;
}
if (n > 0) {
count = 0;
putIndex = 0;
takeIndex = 0;
notFull.signalAll();
}
} finally {
lock.unlock();
}
}
---------- END SOURCE ----------
CUSTOMER SUBMITTED WORKAROUND :
The only workaround now is to reimplement BlockingQueue based on ArrayBlockingQueue.
Is it possible it add bulk take operation to java.util.concurrent.BlockingQueue (ArrayBlockingQueue)?
Only single item 'take' is currently supported. If ArrayBlockingQueue is used in a high volume producer/ consumer scenario, then a 'bulk take" will result in less contention between consumer and producer then using 'take'.
'takeAll' is essentially 'take' and 'drainTo' done atomically. I have pasted 'takeAll' proposed method below.
JUSTIFICATION :
I have prototyped a BlockingQueue implementation with takeAll based on ArrayBlockingQueue. It has much better performance and than ArrayBlockingQueue with much less contention.
---------- BEGIN SOURCE ----------
/**
* Retrieves and removes all items in the queue, waiting if necessary
* until an element becomes available.
* @param c list to drain items to
* @throws InterruptedException
*/
public void takeAll(Collection<? super E> c) throws InterruptedException {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0)
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
final E[] items = this.items;
int i = takeIndex;
int n = 0;
int max = count;
while (n < max) {
c.add(items[i]);
items[i] = null;
i = inc(i);
++n;
}
if (n > 0) {
count = 0;
putIndex = 0;
takeIndex = 0;
notFull.signalAll();
}
} finally {
lock.unlock();
}
}
---------- END SOURCE ----------
CUSTOMER SUBMITTED WORKAROUND :
The only workaround now is to reimplement BlockingQueue based on ArrayBlockingQueue.