The ConcurrentLinkedQueue<E> Class
The ConcurrentLinkedQueue class implements the Queue interface, as shown in Figure 23.6. No new methods are defined, and existing methods of the Queue interface (§15.6, p. 814) are implemented as shown in Table 23.7. Note that since it is an unbounded queue, the insert operations will always succeed.
The ConcurrentLinkedDeque<E> Class
The ConcurrentLinkedDeque class implements the Deque interface, as shown in Figure 23.6. No new methods are defined, and existing methods of the Deque interface (§15.7, p. 821) are implemented as shown in Table 23.8. Note that since it is an unbounded deque, the insert operations will always succeed. Methods inherited from the Queue interface are marked with an asterisk (*) in Table 23.8.
Table 23.7 Selected Methods in the ConcurrentLinkedQueue Class
Operation | Throws exception | Returns special value |
Insert at the tail | add(e) will never throw IllegalArgumentException | offer(e) will never return false |
Remove from the head | remove() can throw NoSuchElementException | poll() returns null if empty |
Examine element at the head | element() can throw NoSuchElementException | peek() returns null if empty |
Table 23.8 Selected Methods in the ConcurrentLinkedDeque Class
Insert at the head | Insert at the tail | Runtime behavior on failure |
offerFirst(e) | offerLast(e), offer(e)* | Never returns false |
addFirst(e) | addLast(e), add(e)* | Never throws IllegalStateException |
Remove from the head | Remove from the tail | Runtime behavior on failure |
pollFirst(), poll()* | pollLast() | Returns null if empty |
removeFirst(), remove()* | removeLast() | Throws NoSuchElementException |
Examine at the head | Examine at the tail | Runtime behavior on failure |
peekFirst(), peek()* | peekLast() | Returns null if empty |
getFirst(), element()* | getLast() | Throws NoSuchElementException |
Example 23.18 illustrates the iterator of a ConcurrentSkipListSet (declared at (1)) that is weakly consistent during traversal. The idea is to have two tasks operating on the concurrent sorted set: one repeatedly creating an iterator to traverse the elements of the collection and summing them (2), and another continuously removing elements from the set (5).
The utility class ConcUtil declares the auxiliary method snooze() that is used to put a thread to sleep. If interrupted, the exception is caught and the interrupt reinstated so that the caller of the snooze() method can take the appropriate action. This method is also used in other examples in this section.
The Runnable sumValues at (2) repeatedly creates an iterator at (3) to sum the values in the set at the time the iterator is created, and prints the result. It snoozes a little after each traversal of the set. The thread determines at (4) whether it has been interrupted. If interrupted, the infinite loop terminates, thereby terminating the thread.
The Runnable removeValues at (5) polls the set and prints the value, snoozing after each polling operation. It also detects whether it has been interrupted, terminating the infinite loop at (6) if that is the case.
The main() method instantiates and initializes a ConcurrentSkipListSet at (7) with random numbers between 0 and 1000. The two tasks are submitted to a service executor at (8). The main thread snoozes a little to allow the tasks to run. The call to the shutdownNow() method at (10) leads to any thread running to be interrupted, and the threads taking appropriate action to terminate their execution as described above.
The output from the program shows that only elements that are in the Concur-rentSkipListSet instance at the time the iterator is created contribute to the sum, and the sum drops correctly as elements are removed.
Example 23.18 Concurrent Collections
package concurrent;
import java.util.concurrent.TimeUnit;
public class ConcUtil {
public static void snooze(int timeout, TimeUnit unit) {
String threadName = Thread.currentThread().getName();
try {
unit.sleep(timeout);
} catch (InterruptedException ex) {
System.out.println(threadName + “: ” + ex);
Thread.currentThread().interrupt(); // Reinstate interrupt status.
}
}
}
package concurrent;
import java.util.*;
import java.util.concurrent.*;
public class ConcurrentSkipListSetDemo {
private static ConcurrentSkipListSet<Integer> set; // (1)
private static Runnable sumValues = () -> { // (2)
String threadName = Thread.currentThread().getName();
while (true) {
int sum = 0;
for (Integer v : set) { // (3)
sum += v;
}
System.out.printf(threadName + “: sum%9d%n”, sum);
ConcUtil.snooze(2, TimeUnit.SECONDS);
if (Thread.interrupted()) break; // (4)
}
};
private static Runnable removeValues = () -> { // (5)
String threadName = Thread.currentThread().getName();
while (true) {
Integer value = set.pollFirst();
if (value == null) continue;
System.out.printf(threadName + “: removed%5d%n”, value);
ConcUtil.snooze(2, TimeUnit.SECONDS);
if (Thread.interrupted()) break; // (6)
}
};
public static void main(String[] args) {
// Create and populate the set: (7)
set = new ConcurrentSkipListSet<>();
new Random().ints(10, 0, 1000).forEach(val -> set.add(val));
System.out.println(set);
// Create an executor service to execute two tasks: (8)
ExecutorService exs = Executors.newFixedThreadPool(2);
try {
exs.submit(sumValues);
exs.submit(removeValues);
ConcUtil.snooze(5, TimeUnit.SECONDS); // (9)
} finally {
System.out.println(“Shutting down now.”);
exs.shutdownNow(); // (10)
}
}
}
Probable output from the program:
[20, 100, 236, 263, 299, 359, 548, 552, 591, 686]
pool-1-thread-1: sum 3654
pool-1-thread-2: removed 20
pool-1-thread-1: sum 3634
pool-1-thread-2: removed 100
pool-1-thread-1: sum 3534
pool-1-thread-2: removed 236
Shutting down now.
pool-1-thread-1: java.lang.InterruptedException: sleep interrupted
pool-1-thread-2: java.lang.InterruptedException: sleep interrupted