BlockingQueue in Java

What is BlockingQueue ?

A blocking queue is a queue that blocks when you try to dequeue an empty queue, or if you try to enqueue items in a full queue. A thread trying to dequeue from an empty queue is blocked until some other threads insert an item into the queue. A thread trying to enqueue an item in a full queue is blocked until some other threads make space available in the queue, either by dequeuing one or more items or clearing the queue completely, it means a queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

A BlockingQueue may be capacity bounded. At any given time it may have a remainingCapacity beyond which no additional elements can be put without blocking. A BlockingQueue without any intrinsic capacity constraints always reports a remaining capacity of Integer.MAX_VALUE.
A BlockingQueue does not intrinsically support any kind of “close” or “shutdown” operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-stream or poison objects, that are interpreted accordingly when taken by consumers.

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a BlockingQueue happen-before actions subsequent to the access or removal of that element from the BlockingQueue in another thread.

Does BlockingQueue accept null element ?

A BlockingQueue does not accept null elements. Implementations throw NullPointerException on attempts to add, put or offer a null. A null is used as a sentinel value to indicate failure of poll operations.

Are BlockingQueue implementations thread-safe ?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control. However, the bulk Collection operations addAll, containsAll, retainAll and removeAll are not necessarily performed atomically unless specified otherwise in an implementation. So it is possible, for example, for addAll(c) to fail (throwing an exception) after adding only some of the elements in c.

Why BlockingQueue ?

BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the Collection interface. So, for example, it is possible to remove an arbitrary element from a queue using remove(x). However, such operations are in general not performed very efficiently, and are intended for only occasional use, such as when a queued message is cancelled.

The main advantage is that a BlockingQueue provides a correct, thread-safe implementation. The “blocking” nature of the queue has a couple of advantages. First, on adding elements, if the queue capacity is limited, memory consumption is limited as well. Also, if the queue consumers get too far behind producers, the producers are naturally throttled since they have to wait to add elements. When taking elements from the queue, the main advantage is simplicity; waiting forever is trivial, and correctly waiting for a specified time-out is only a little more complicated.

More information can be found at https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

Usage example, based on a typical producer-consumer scenario. Note that a BlockingQueue can safely be used with multiple producers and multiple consumers.

Create below Producer class which will produce item and put it into the queue in 1000 millis interval

package com.roytuts.concurrency;

import java.util.UUID;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private final BlockingQueue<Item> queue;

    public Producer(BlockingQueue<Item> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String uuid = UUID.randomUUID().toString();
                System.out.println("Produced Item Id : " + uuid);
                Item item = new Item(uuid);
                queue.put(item);
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

Create below Consumer class which will consume item from queue

package com.roytuts.concurrency;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private final BlockingQueue<Item> queue;

    public Consumer(BlockingQueue<Item> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Item item = queue.take();
                System.out.println("Consumed Item Id : " + item.getId());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

The Item class is given below

package com.roytuts.concurrency;

public class Item {

    private String id;

    public Item(String id) {
        this.id = id;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

}

Create the producer-consumer service class to test the LinkedBlockingQueue

package com.roytuts.concurrency;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class LinkedBlockingQueueTest {

    public static void main(String[] args) {
        BlockingQueue<Item> queue = new LinkedBlockingQueue<Item>();
        Producer producer = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer1).start();
        new Thread(consumer2).start();
    }

}

Output in console

Produced Item Id : c35c4c8f-f533-4b30-b216-acfc4be7c4a1
Consumed Item Id : c35c4c8f-f533-4b30-b216-acfc4be7c4a1
Produced Item Id : 7d62f77f-fac0-493c-8a6e-06319b13933f
Consumed Item Id : 7d62f77f-fac0-493c-8a6e-06319b13933f
Produced Item Id : 9727fef4-aee6-4d10-9cfd-0826545f0c32
Consumed Item Id : 9727fef4-aee6-4d10-9cfd-0826545f0c32
Produced Item Id : 119edd81-abbb-40b5-a148-4a13d22c6acc
Consumed Item Id : 119edd81-abbb-40b5-a148-4a13d22c6acc
Produced Item Id : e725a909-080f-453e-81db-b1ad7d618e5c
Consumed Item Id : e725a909-080f-453e-81db-b1ad7d618e5c
Produced Item Id : edf027f1-af71-4a52-8e58-7e197371369b
Consumed Item Id : edf027f1-af71-4a52-8e58-7e197371369b
Produced Item Id : 98c02377-05ca-4fa1-934e-fe8b2ff225fa
Consumed Item Id : 98c02377-05ca-4fa1-934e-fe8b2ff225fa

...more

Thanks for reading.

Soumitra Roy Sarkar

I am a professional Web developer, Enterprise Application developer, Software Engineer and Blogger. Connect me on Roy Tutorials Twitter Facebook  Google Plus Linkedin Or Email Me

Leave a Reply

Your email address will not be published. Required fields are marked *