Introduction

In this tutorial I am going to show how to create custom thread pool in Java.

Multi-threaded and multi-process programmings are great ways to optimize CPU usage and get things done quickly.

What is thread pool?

Thread pool is a collection of already created worker threads ready to perform certain tasks. Instead of creating and discarding thread once the task is done, thread pool reuses the thread in the form of worker thread. The worker thread is differently exists from Runnable or Callable tasks it executes and often used to execute multiple tasks.

Why do we need thread pool?

Creation of Threads in Java is a costly IO operation because thread objects use a significant amount of memory. In a large scale applications, allocating and deallocating many thread objects create a memory management overhead, because creation of a thread object takes a time. So as soon as client requests come, the actual jobs do not start and clients will see a slight delay. And a time comes when application receives more requests than it can handle immediately and as a result application will stop responding to the requests.

Therefore it is not advisable to create & destroy thread(s) every often. It is recommended to use pool of threads as per the needs.

Example

Here custom thread pool will be implemented using BlockingQueue that is used for storing tasks.

For more information on BlockingQueue please read https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

The responsibility of the BlockingQueue is to hold Runnables, and to have a way to poll them and check if the BlockingQueue is empty or not in order to help the threads in the pool utilize their resources better.

Creating Custom Thread Pool

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class CustomThreadPool {
	// holds tasks
	private BlockingQueue<Runnable> runnableQueue;
	// holds the pool of worker threads
	private List<WorkerThread> threads;
	// check if shutdown is initiated
	private AtomicBoolean isThreadPoolShutDownInitiated;

	public CustomThreadPool(final int noOfThreads) {
		this.runnableQueue = new LinkedBlockingQueue<>();
		this.threads = new ArrayList<>(noOfThreads);
		this.isThreadPoolShutDownInitiated = new AtomicBoolean(false);
		// create worker threads
		for (int i = 1; i <= noOfThreads; i++) {
			WorkerThread thread = new WorkerThread(runnableQueue, this);
			thread.setName("Worker Thread - " + i);
			thread.start();
			threads.add(thread);
		}
	}

	public void execute(Runnable r) throws InterruptedException {
		if (!isThreadPoolShutDownInitiated.get()) {
			runnableQueue.put(r);
		} else {
			throw new InterruptedException("Thread Pool shutdown is initiated, unable to execute task");
		}
	}

	public void shutdown() {
		isThreadPoolShutDownInitiated = new AtomicBoolean(true);
	}

	private class WorkerThread extends Thread {
		// holds tasks
		private BlockingQueue<Runnable> taskQueue;
		// check if shutdown is initiated
		private CustomThreadPool threadPool;

		public WorkerThread(BlockingQueue<Runnable> taskQueue, CustomThreadPool threadPool) {
			this.taskQueue = taskQueue;
			this.threadPool = threadPool;
		}

		@Override
		public void run() {
			try {
				// continue until all tasks finished processing
				while (!threadPool.isThreadPoolShutDownInitiated.get() || !taskQueue.isEmpty()) {
					Runnable r;
					// Poll a runnable from the queue and execute it
					while ((r = taskQueue.poll()) != null) {
						r.run();
					}
					Thread.sleep(1);
				}
			} catch (RuntimeException | InterruptedException e) {
				throw new CustomThreadPoolException(e);
			}
		}
	}

	private class CustomThreadPoolException extends RuntimeException {
		private static final long serialVersionUID = 1L;

		public CustomThreadPoolException(Throwable t) {
			super(t);
		}
	}
}

Here in the above CustomThreadPool we have inner private class that basically polls the BlockingQueue and executes the runnables.

isThreadPoolShutDownInitiated is declared as AtomicBoolean that provides a boolean value, which can be read and written atomically.

Testing Custom Thread Pool

Create below class to test the custom ThreadPool. Here custom ThreadPool is created with two threads and runnables are submitted to this ThreadPool.

public class CustomThreadPoolTest {

	public static void main(String[] args) throws InterruptedException {
		Runnable r = () -> {
			try {
				Thread.sleep(1000);
				System.out.println(Thread.currentThread().getName() + " is executing task.");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		};

		CustomThreadPool threadPool = new CustomThreadPool(2);

		threadPool.execute(r);
		threadPool.execute(r);

		threadPool.shutdown();

		// threadPool.execute(r);
	}
}

Output

Worker Thread - 2 is executing task.
Worker Thread - 1 is executing task.

Thanks for reading.

Tags:

Leave a Reply

Your email address will not be published. Required fields are marked *