Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-6918690

Is it possible it add bulk take operation to java.util.concurrent.BlockingQueue?

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Won't Fix
    • Icon: P5 P5
    • None
    • 7
    • core-libs

      A DESCRIPTION OF THE REQUEST :
      Is it possible it add bulk take operation to java.util.concurrent.BlockingQueue (ArrayBlockingQueue)?
      Only single item 'take' is currently supported. If ArrayBlockingQueue is used in a high volume producer/ consumer scenario, then a 'bulk take" will result in less contention between consumer and producer then using 'take'.
      'takeAll' is essentially 'take' and 'drainTo' done atomically. I have pasted 'takeAll' proposed method below.

      JUSTIFICATION :
      I have prototyped a BlockingQueue implementation with takeAll based on ArrayBlockingQueue. It has much better performance and than ArrayBlockingQueue with much less contention.


      ---------- BEGIN SOURCE ----------
      /**
       * Retrieves and removes all items in the queue, waiting if necessary
       * until an element becomes available.
       * @param c list to drain items to
       * @throws InterruptedException
       */
      public void takeAll(Collection<? super E> c) throws InterruptedException {
          if (c == null)
              throw new NullPointerException();
          if (c == this)
              throw new IllegalArgumentException();
          
          final ReentrantLock lock = this.lock;
          lock.lockInterruptibly();
          try {
              try {
                  while (count == 0)
                      notEmpty.await();
              } catch (InterruptedException ie) {
                  notEmpty.signal(); // propagate to non-interrupted thread
                  throw ie;
              }
              
              final E[] items = this.items;
              
              int i = takeIndex;
              int n = 0;
              int max = count;
              while (n < max) {
                  c.add(items[i]);
                  items[i] = null;
                  i = inc(i);
                  ++n;
              }
              if (n > 0) {
                  count = 0;
                  putIndex = 0;
                  takeIndex = 0;
                  notFull.signalAll();
              }
          } finally {
              lock.unlock();
          }
      }

      ---------- END SOURCE ----------

      CUSTOMER SUBMITTED WORKAROUND :
      The only workaround now is to reimplement BlockingQueue based on ArrayBlockingQueue.

            Unassigned Unassigned
            ndcosta Nelson Dcosta (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

              Created:
              Updated:
              Resolved:
              Imported:
              Indexed: