The BlockingDeque<E> Interface
The java.util.concurrent.BlockingDeque interface extends both the java.util.Deque interface and the BlockingQueue interface, as shown in Figure 23.6. Compared to the operations shown for the ConcurrentLinkedDeque interface in Table 23.8, the Blocking-Deque interface provides new insert and remove operations, both at the head and tail of a deque, that can block or can time out—shown in the light-gray rows in Table 23.14. Methods inherited from the BlockingQueue interface are marked with an asterisk (*) in Table 23.14. The LinkedBlockingDeque class that implements the BlockingDeque interface is shown in Table 23.11. This class represents an all-round implementation of a blocking deque.
Table 23.14 Selected Methods in the BlockingDeque Interface
Insert at the head | Insert at the tail | Runtime behavior on failure |
offerFirst(e) | offerLast(e), offer(e)* | Returns false if full |
addFirst(e) | addLast(e), add(e)* | Throws IllegalStateException |
putFirst(e) | putLast(e), put(e)* | Blocks if full |
offerFirst(e, time, unit) | offerLast(e,time,unit), offer(e, time, unit)* | Times out |
Remove from the head | Remove from the tail | Runtime behavior on failure |
pollFirst(), poll()* | pollLast() | Returns null if empty |
removeFirst(), remove()* | removeLast() | Throws NoSuchElementException |
takeFirst(), take()* | takeLast() | Blocks if empty |
pollFirst(time, unit), poll(time, unit)* | pollLast(time, unit) | Times out |
Examine at the head | Examine at the tail | Runtime behavior on failure |
peekFirst(), peek()* | peekLast() | Returns null if empty |
getFirst(), element()* | getLast() | Throws NoSuchElementException |
Example 23.20 demonstrates using the LinkedBlockingQueue class that implements the BlockingQueue interface. A producer is defined at (2) that puts a fixed number of random values (between 0 and 100) in the blocking queue. Note that the put() call at (3) can block. The thread sleeps for a little while after each put() operation. When done, it puts a poison value (which is not a legal value) in the queue at (4), that is interpreted as no more values are in the queue.
A consumer is defined at (5) that continuously takes a value from the blocking queue at (7) and prints it. It sleeps for a little while after each take() operation that can block. Before taking a value, it checks at (6) to see if the value at the head of the queue is the poison value. If so, the infinite loop is terminated, and thereby the consumer as well.
The LinkedBlockingQueue class is instantiated at (8). Note that the blocking queue is bounded, but the number of values to put in the queue is greater than its capacity—which can result in the put() operation to block if the queue is full. A service executor is created at (9). One producer and two consumers are submitted to the service executor at (10). The call to the shutdown() method at (11) allows threads to complete execution.
The output from the program shows that the producer puts the values in the blocking queue and the two consumers take the values and process them.
Example 23.20 Linked Blocking Queue
package concurrent;
import java.util.concurrent.*;
public class LinkedBlockingQueueDemo {
public static final int UPPER_BOUND = 3;
public static final int NUM_OF_VALUES = 5;
public static final int STOP_VALUE = -1;
private static BlockingQueue<Integer> queue; // (1)
private static Runnable producer = () -> { // (2)
String threadName = Thread.currentThread().getName();
ThreadLocalRandom tlrng = ThreadLocalRandom.current();
try {
for (int i = 0; i < NUM_OF_VALUES; i++) {
Integer value = tlrng.nextInt(100);
queue.put(value); // (3)
System.out.println(threadName + “: put ” + value);
Thread.sleep(tlrng.nextInt(200));
}
queue.put(STOP_VALUE); // (4)
System.out.println(threadName + “: done.”);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
};
private static Runnable consumer = () -> { // (5)
String threadName = Thread.currentThread().getName();
ThreadLocalRandom tlrng = ThreadLocalRandom.current();
while (true) {
try {
Integer head = queue.peek(); // (6)
if (head != null && head.equals(STOP_VALUE)) {
System.out.println(threadName + “: done.”);
break;
}
Integer value = queue.take(); // (7)
System.out.println(threadName + “: processing ” + value);
Thread.sleep(tlrng.nextInt(1000));
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
};
public static void main(String[] args) {
queue = new LinkedBlockingQueue<>(UPPER_BOUND); // (8)
ExecutorService exs = Executors.newFixedThreadPool(3); // (9)
try {
exs.submit(producer); // (10)
exs.submit(consumer);
exs.submit(consumer);
} finally {
exs.shutdown(); // (11)
}
}
}
Probable output from the program:
pool-1-thread-1: put 43
pool-1-thread-2: processing 43
pool-1-thread-1: put 84
pool-1-thread-3: processing 84
pool-1-thread-1: put 60
pool-1-thread-3: processing 60
pool-1-thread-1: put 55
pool-1-thread-1: put 17
pool-1-thread-3: processing 55
pool-1-thread-1: done.
pool-1-thread-3: processing 17
pool-1-thread-2: done.
pool-1-thread-3: done.