Uploaded image for project: 'JDK'
  1. JDK
  2. JDK-8343250

ArrayBlockingQueue serialization not thread safe

XMLWordPrintable

      ADDITIONAL SYSTEM INFORMATION :
      This issue has been in Java since the ArrayBlockingQueue was added in Java 1.5.

      A DESCRIPTION OF THE PROBLEM :
      The ArrayBlockingQueue has had a readObject() method since Java 7, which checks invariants of the deserialized object. However, it does not have a writeObject() method. This means that the ArrayBlockingQueue could be modified whilst it is being written, resulting in broken invariants. The readObject() method's invariant checking is not exhaustive, which means that it is possible to end up with ArrayBlockingQueue instances that contain null values, leading to a difference between "size()" and how many objects would be returned with "poll()".

      The ABQ should get a writeObject() method that is locking on the same locks as the rest of the class.

      STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
      We run the demo code below to show what happens if we serialize whilst the ArrayBlockingQueue is being changed.

      EXPECTED VERSUS ACTUAL BEHAVIOR :
      EXPECTED -
      it should have output either [42, 42, 42, 42] or [42, 42, 42, 42, 42]
      ACTUAL -
      It throws a bunch of exceptions and sometimes returns queues that contain null in the middle, at which point our class exits.

      ---------- BEGIN SOURCE ----------
      import java.io.*;
      import java.util.*;
      import java.util.concurrent.*;
      import java.util.concurrent.atomic.*;

      public class ArrayBlockingQueueSerializationBug {
          public static void main(String... args)
                  throws Exception {
              final BlockingQueue<Integer> queue =
                      // new LinkedBlockingQueue<Integer>();
                      new ArrayBlockingQueue<Integer>(10);
              for (int i = 0; i < 5; i++) queue.add(42);

              final AtomicBoolean modifying = new AtomicBoolean(true);
              Thread thread = new Thread(new Runnable() {
                  public void run() {
                      while (modifying.get()) {
                          queue.poll();
                          queue.add(42);
                      }
                  }
              });
              thread.start();

              Collection<ByteArrayOutputStream> outs = new ArrayList<ByteArrayOutputStream>();
              for (int i = 0; i < 10000; i++) {
                  ByteArrayOutputStream baos = new ByteArrayOutputStream();
                  ObjectOutputStream oos = new ObjectOutputStream(baos);
                  oos.writeObject(queue);
                  oos.close();
                  outs.add(baos);
              }
              modifying.set(false);

              for (ByteArrayOutputStream out : outs) {
                  ObjectInputStream ois = new ObjectInputStream(
                          new ByteArrayInputStream(out.toByteArray()));
                  try {
                      BlockingQueue<Integer> read = (BlockingQueue<Integer>) ois.readObject();
                      System.out.println("read = " + read);
                      check(read);
                  } catch (IOException e) {
                      System.out.println(e);
                  }
                  ois.close();
              }
          }

          private static void check(BlockingQueue<Integer> read) {
              if (read.toString().contains("null")) {
                  System.out.println("read.size() = " + read.size());
                  int count = 0;
                  for (Integer i : read) {
                      count++;
                  }
                  System.out.println("count = " + count);

                  throw new AssertionError();
              }
          }
      }

      ---------- END SOURCE ----------

      CUSTOMER SUBMITTED WORKAROUND :
      It is an easy fix. There is no quick solution with ArrayBlockingQueue, but LinkedBlockingQueue is thread-safe and can be used instead.

      Add this:

          /**
           * Writes the queue to the stream whilst holding the lock to prevent
           * broken invariants.
           *
           * @param s the stream
           * @throws java.io.IOException if an I/O error occurs
           */
          private void writeObject(java.io.ObjectOutputStream s) throws IOException {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  s.defaultWriteObject();
              } finally {
                  lock.unlock();
              }
          }


      FREQUENCY : always


            hkabutz Heinz Kabutz
            webbuggrp Webbug Group
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: