The BlockingDeque Interface – Concurrency: Part II

The BlockingDeque<E> Interface

The java.util.concurrent.BlockingDeque interface extends both the java.util.Deque interface and the BlockingQueue interface, as shown in Figure 23.6. Compared to the operations shown for the ConcurrentLinkedDeque interface in Table 23.8, the Blocking-Deque interface provides new insert and remove operations, both at the head and tail of a deque, that can block or can time out—shown in the light-gray rows in Table 23.14. Methods inherited from the BlockingQueue interface are marked with an asterisk (*) in Table 23.14. The LinkedBlockingDeque class that implements the BlockingDeque interface is shown in Table 23.11. This class represents an all-round implementation of a blocking deque.

Table 23.14 Selected Methods in the BlockingDeque Interface

Insert at the headInsert at the tailRuntime behavior on failure
offerFirst(e)offerLast(e), offer(e)*Returns false if full
addFirst(e)addLast(e), add(e)*Throws IllegalStateException
putFirst(e)putLast(e), put(e)*Blocks if full
offerFirst(e, time, unit)offerLast(e,time,unit), offer(e, time, unit)*Times out
Remove from the headRemove from the tailRuntime behavior on failure
pollFirst(), poll()*pollLast()Returns null if empty
removeFirst(), remove()*removeLast()Throws NoSuchElementException
takeFirst(), take()*takeLast()Blocks if empty
pollFirst(time, unit), poll(time, unit)*pollLast(time, unit)Times out
Examine at the headExamine at the tailRuntime behavior on failure
peekFirst(), peek()*peekLast()Returns null if empty
getFirst(), element()*getLast()Throws NoSuchElementException

Example 23.20 demonstrates using the LinkedBlockingQueue class that implements the BlockingQueue interface. A producer is defined at (2) that puts a fixed number of random values (between 0 and 100) in the blocking queue. Note that the put() call at (3) can block. The thread sleeps for a little while after each put() operation. When done, it puts a poison value (which is not a legal value) in the queue at (4), that is interpreted as no more values are in the queue.

A consumer is defined at (5) that continuously takes a value from the blocking queue at (7) and prints it. It sleeps for a little while after each take() operation that can block. Before taking a value, it checks at (6) to see if the value at the head of the queue is the poison value. If so, the infinite loop is terminated, and thereby the consumer as well.

The LinkedBlockingQueue class is instantiated at (8). Note that the blocking queue is bounded, but the number of values to put in the queue is greater than its capacity—which can result in the put() operation to block if the queue is full. A service executor is created at (9). One producer and two consumers are submitted to the service executor at (10). The call to the shutdown() method at (11) allows threads to complete execution.

The output from the program shows that the producer puts the values in the blocking queue and the two consumers take the values and process them.

Example 23.20 Linked Blocking Queue

Click here to view code image

package concurrent;
import java.util.concurrent.*;
public class LinkedBlockingQueueDemo {
  public static final int UPPER_BOUND = 3;
  public static final int NUM_OF_VALUES = 5;
  public static final int STOP_VALUE = -1;
  private static BlockingQueue<Integer> queue;                            // (1)
  private static Runnable producer = () -> {                              // (2)
    String threadName = Thread.currentThread().getName();
    ThreadLocalRandom tlrng = ThreadLocalRandom.current();
    try {
      for (int i = 0; i < NUM_OF_VALUES; i++) {
        Integer value = tlrng.nextInt(100);
        queue.put(value);                                                 // (3)
        System.out.println(threadName + “: put ” + value);
        Thread.sleep(tlrng.nextInt(200));
      }
      queue.put(STOP_VALUE);                                              // (4)
      System.out.println(threadName + “: done.”);
    } catch (InterruptedException ie) {
      ie.printStackTrace();
    }
  };
  private static Runnable consumer = () -> {                              // (5)
    String threadName = Thread.currentThread().getName();
    ThreadLocalRandom tlrng = ThreadLocalRandom.current();
    while (true) {
      try {
        Integer head = queue.peek();                                      // (6)
        if (head != null && head.equals(STOP_VALUE)) {
            System.out.println(threadName + “: done.”);
            break;
        }
        Integer value = queue.take();                                     // (7)
        System.out.println(threadName + “: processing ” + value);
        Thread.sleep(tlrng.nextInt(1000));
      } catch (InterruptedException ie) {
        ie.printStackTrace();
      }
    }
  };
  public static void main(String[] args) {
    queue = new LinkedBlockingQueue<>(UPPER_BOUND);                       // (8)
    ExecutorService exs = Executors.newFixedThreadPool(3);                // (9)
    try {
      exs.submit(producer);                                               // (10)
      exs.submit(consumer);
      exs.submit(consumer);
    } finally {
      exs.shutdown();                                                     // (11)
    }
  }
}

Probable output from the program:

Click here to view code image

pool-1-thread-1: put 43
pool-1-thread-2: processing 43
pool-1-thread-1: put 84
pool-1-thread-3: processing 84
pool-1-thread-1: put 60
pool-1-thread-3: processing 60
pool-1-thread-1: put 55
pool-1-thread-1: put 17
pool-1-thread-3: processing 55
pool-1-thread-1: done.
pool-1-thread-3: processing 17
pool-1-thread-2: done.
pool-1-thread-3: done.

Leave a Reply

Your email address will not be published. Required fields are marked *.

*
*
You may use these <abbr title="HyperText Markup Language">HTML</abbr> tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>