The ConcurrentLinkedQueue Class – Concurrency: Part II

The ConcurrentLinkedQueue<E> Class

The ConcurrentLinkedQueue class implements the Queue interface, as shown in Figure 23.6. No new methods are defined, and existing methods of the Queue interface (§15.6, p. 814) are implemented as shown in Table 23.7. Note that since it is an unbounded queue, the insert operations will always succeed.

The ConcurrentLinkedDeque<E> Class

The ConcurrentLinkedDeque class implements the Deque interface, as shown in Figure 23.6. No new methods are defined, and existing methods of the Deque interface (§15.7, p. 821) are implemented as shown in Table 23.8. Note that since it is an unbounded deque, the insert operations will always succeed. Methods inherited from the Queue interface are marked with an asterisk (*) in Table 23.8.

Table 23.7 Selected Methods in the ConcurrentLinkedQueue Class

OperationThrows exceptionReturns special value
Insert at the tailadd(e) will never throw IllegalArgumentExceptionoffer(e) will never return false
Remove from the headremove() can throw NoSuchElementExceptionpoll() returns null if empty
Examine element at the headelement() can throw NoSuchElementExceptionpeek() returns null if empty

Table 23.8 Selected Methods in the ConcurrentLinkedDeque Class

Insert at the headInsert at the tailRuntime behavior on failure
offerFirst(e)offerLast(e), offer(e)*Never returns false
addFirst(e)addLast(e), add(e)*Never throws IllegalStateException
Remove from the headRemove from the tailRuntime behavior on failure
pollFirst(), poll()*pollLast()Returns null if empty
removeFirst(), remove()*removeLast()Throws NoSuchElementException
Examine at the headExamine at the tailRuntime behavior on failure
peekFirst(), peek()*peekLast()Returns null if empty
getFirst(), element()*getLast()Throws NoSuchElementException

Example 23.18 illustrates the iterator of a ConcurrentSkipListSet (declared at (1)) that is weakly consistent during traversal. The idea is to have two tasks operating on the concurrent sorted set: one repeatedly creating an iterator to traverse the elements of the collection and summing them (2), and another continuously removing elements from the set (5).

The utility class ConcUtil declares the auxiliary method snooze() that is used to put a thread to sleep. If interrupted, the exception is caught and the interrupt reinstated so that the caller of the snooze() method can take the appropriate action. This method is also used in other examples in this section.

The Runnable sumValues at (2) repeatedly creates an iterator at (3) to sum the values in the set at the time the iterator is created, and prints the result. It snoozes a little after each traversal of the set. The thread determines at (4) whether it has been interrupted. If interrupted, the infinite loop terminates, thereby terminating the thread.

The Runnable removeValues at (5) polls the set and prints the value, snoozing after each polling operation. It also detects whether it has been interrupted, terminating the infinite loop at (6) if that is the case.

The main() method instantiates and initializes a ConcurrentSkipListSet at (7) with random numbers between 0 and 1000. The two tasks are submitted to a service executor at (8). The main thread snoozes a little to allow the tasks to run. The call to the shutdownNow() method at (10) leads to any thread running to be interrupted, and the threads taking appropriate action to terminate their execution as described above.

The output from the program shows that only elements that are in the Concur-rentSkipListSet instance at the time the iterator is created contribute to the sum, and the sum drops correctly as elements are removed.

Example 23.18 Concurrent Collections

Click here to view code image

package concurrent;
import java.util.concurrent.TimeUnit;
public class ConcUtil {
  public static void snooze(int timeout, TimeUnit unit) {
    String threadName = Thread.currentThread().getName();
    try {
      unit.sleep(timeout);
    } catch (InterruptedException ex) {
      System.out.println(threadName + “: ” + ex);
      Thread.currentThread().interrupt();         // Reinstate interrupt status.
    }
  }
}

Click here to view code image

package concurrent;
import java.util.*;
import java.util.concurrent.*;
public class ConcurrentSkipListSetDemo {
  private static ConcurrentSkipListSet<Integer> set;                      // (1)
  private static Runnable sumValues = () -> {                             // (2)
    String threadName = Thread.currentThread().getName();
    while (true) {
      int sum = 0;
      for (Integer v : set) {                                             // (3)
        sum += v;
      }
      System.out.printf(threadName + “: sum%9d%n”, sum);
      ConcUtil.snooze(2, TimeUnit.SECONDS);
      if (Thread.interrupted()) break;                                    // (4)
    }
  };
  private static Runnable removeValues = () -> {                          // (5)
    String threadName = Thread.currentThread().getName();
    while (true) {
      Integer value = set.pollFirst();
      if (value == null) continue;
      System.out.printf(threadName + “: removed%5d%n”, value);
      ConcUtil.snooze(2, TimeUnit.SECONDS);
      if (Thread.interrupted()) break;                                    // (6)
    }
  };
  public static void main(String[] args) {
    // Create and populate the set:                                          (7)
    set = new ConcurrentSkipListSet<>();
    new Random().ints(10, 0, 1000).forEach(val -> set.add(val));
    System.out.println(set);
    // Create an executor service to execute two tasks:                      (8)
    ExecutorService exs = Executors.newFixedThreadPool(2);
    try {
      exs.submit(sumValues);
      exs.submit(removeValues);
      ConcUtil.snooze(5, TimeUnit.SECONDS);                               // (9)
    } finally {
      System.out.println(“Shutting down now.”);
      exs.shutdownNow();                                                  // (10)
    }
  }
}

Probable output from the program:

Click here to view code image

[20, 100, 236, 263, 299, 359, 548, 552, 591, 686]
pool-1-thread-1: sum     3654
pool-1-thread-2: removed   20
pool-1-thread-1: sum     3634
pool-1-thread-2: removed  100
pool-1-thread-1: sum     3534
pool-1-thread-2: removed  236
Shutting down now.
pool-1-thread-1: java.lang.InterruptedException: sleep interrupted
pool-1-thread-2: java.lang.InterruptedException: sleep interrupted

Leave a Reply

Your email address will not be published. Required fields are marked *.

*
*
You may use these <abbr title="HyperText Markup Language">HTML</abbr> tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>