A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

more information could be found at CyclicBarrier in Oracle

The CyclicBarrier uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException (or InterruptedException if they too were interrupted at about the same time).

CyclicBarrier is a natural requirement for a concurrent program because it can be used to perform final part of the task once individual tasks  are completed. All threads which wait for each other to reach barrier are called parties, CyclicBarrier is initialized with a number of parties to wait and threads wait for each other by calling CyclicBarrier.await() method which is a blocking method in Java and  blocks until all Thread or parties call await(). In general calling await() is shout out that Thread is waiting on the barrier. await() is a blocking call but can be timed out or Interrupted by other thread.

The difference between CountDownLatch and CyclicBarrier: we have seen how CountDownLatch can be used to implement multiple threads waiting for each other. The CyclicBarrier also the does the same thing but  CountDownLatch cannot be not reused once the count reaches zero while CyclicBarrier can be reused by calling reset() method which resets Barrier to its initial State. Therefore the CountDownLatch is a good for one-time events like application start-up time but CyclicBarrier can be used in case of the recurrent event e.g. concurrently calculating a solution of the big problem etc.

Example

Create a model class

package com.roytuts.java.thread.model;

public class Person {

	private String id;
	private String name;
	private String email;
	private String phone;
	private String city;
	private String state;
	private String country;

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getEmail() {
		return email;
	}

	public void setEmail(String email) {
		this.email = email;
	}

	public String getPhone() {
		return phone;
	}

	public void setPhone(String phone) {
		this.phone = phone;
	}

	public String getCity() {
		return city;
	}

	public void setCity(String city) {
		this.city = city;
	}

	public String getState() {
		return state;
	}

	public void setState(String state) {
		this.state = state;
	}

	public String getCountry() {
		return country;
	}

	public void setCountry(String country) {
		this.country = country;
	}

}

Create a class which holds the list of Person object

package com.roytuts.java.thread.model;

import java.util.List;

public class Job {

	private List<Person> persons;

	public List<Person> getPersons() {
		return persons;
	}

	public void setPersons(List<Person> persons) {
		this.persons = persons;
	}

}

Create below three different service classes to fetch data from different sources for Person object.

Fetch data from file

package com.roytuts.java.thread.service.cyclicbarrier;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;

import com.roytuts.java.thread.model.Job;
import com.roytuts.java.thread.model.Person;

public class PersonFileService implements Callable<Job> {

	private CyclicBarrier barrier;

	public PersonFileService(CyclicBarrier barrier) {
		this.barrier = barrier;
	}

	@Override
	public Job call() throws Exception {
		// Dummy set of persons
		// the actual data should come from File
		Person p1 = new Person();
		p1.setId("1000");
		p1.setName("Debabrata");
		p1.setEmail("debabrata@gmail.com");
		p1.setPhone("1234567890");
		// set other fields for p1

		Person p2 = new Person();
		p2.setId("1000");
		p2.setName("Debina");
		p2.setEmail("debina@gmail.com");
		p2.setPhone("1234567890");
		// set other fields for p2

		Person p3 = new Person();
		p3.setId("1000");
		p3.setName("Baishali");
		p3.setEmail("baishali@gmail.com");
		p3.setPhone("1234567890");
		// set other fields for p3

		Person p4 = new Person();
		p4.setId("1000");
		p4.setName("Liton");
		p4.setEmail("liton@gmail.com");
		p4.setPhone("1234567890");
		// set other fields for p4

		List<Person> persons = new ArrayList<>();
		persons.add(p1);
		persons.add(p2);
		persons.add(p3);
		persons.add(p4);

		Job job = new Job();
		job.setPersons(persons);

		System.out.println(this.getClass().getName() + " is waiting on the barrier");

		// await
		barrier.await();

		System.out.println(this.getClass().getName() + " has crossed the barrier");

		return job;
	}

}

Fetch data from database

package com.roytuts.java.thread.service.cyclicbarrier;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;

import com.roytuts.java.thread.model.Job;
import com.roytuts.java.thread.model.Person;

public class PersonDBService implements Callable<Job> {

	private CyclicBarrier barrier;

	public PersonDBService(CyclicBarrier barrier) {
		this.barrier = barrier;
	}

	@Override
	public Job call() throws Exception {
		// Dummy set of persons
		// the actual data should come from Database
		Person p1 = new Person();
		p1.setId("1000");
		p1.setName("Debabrata");
		p1.setEmail("debabrata@gmail.com");
		p1.setPhone("1234567890");
		// set other fields for p1

		Person p2 = new Person();
		p2.setId("1000");
		p2.setName("Debina");
		p2.setEmail("debina@gmail.com");
		p2.setPhone("1234567890");
		// set other fields for p2

		Person p3 = new Person();
		p3.setId("1000");
		p3.setName("Baishali");
		p3.setEmail("baishali@gmail.com");
		p3.setPhone("1234567890");
		// set other fields for p3

		Person p4 = new Person();
		p4.setId("1000");
		p4.setName("Liton");
		p4.setEmail("liton@gmail.com");
		p4.setPhone("1234567890");
		// set other fields for p4

		List<Person> persons = new ArrayList<>();
		persons.add(p1);
		persons.add(p2);
		persons.add(p3);
		persons.add(p4);

		Job job = new Job();
		job.setPersons(persons);

		System.out.println(this.getClass().getName() + " is waiting on the barrier");

		// await
		barrier.await();

		System.out.println(this.getClass().getName() + " has crossed the barrier");

		return job;
	}

}

Fetch data from REST service

package com.roytuts.java.thread.service.cyclicbarrier;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;

import com.roytuts.java.thread.model.Job;
import com.roytuts.java.thread.model.Person;

public class PersonRestService implements Callable<Job> {

	private CyclicBarrier barrier;

	public PersonRestService(CyclicBarrier barrier) {
		this.barrier = barrier;
	}

	@Override
	public Job call() throws Exception {

		// Dummy set of persons
		// the actual data should come from REST webservice
		Person p1 = new Person();
		p1.setId("1000");
		p1.setName("Debabrata");
		p1.setEmail("debabrata@gmail.com");
		p1.setPhone("1234567890");
		// set other fields for p1

		Person p2 = new Person();
		p2.setId("1000");
		p2.setName("Debina");
		p2.setEmail("debina@gmail.com");
		p2.setPhone("1234567890");
		// set other fields for p2

		Person p3 = new Person();
		p3.setId("1000");
		p3.setName("Baishali");
		p3.setEmail("baishali@gmail.com");
		p3.setPhone("1234567890");
		// set other fields for p3

		Person p4 = new Person();
		p4.setId("1000");
		p4.setName("Liton");
		p4.setEmail("liton@gmail.com");
		p4.setPhone("1234567890");
		// set other fields for p4

		List<Person> persons = new ArrayList<>();
		persons.add(p1);
		persons.add(p2);
		persons.add(p3);
		persons.add(p4);

		Job job = new Job();
		job.setPersons(persons);

		System.out.println(this.getClass().getName() + " is waiting on the barrier");

		// await
		barrier.await();

		System.out.println(this.getClass().getName() + " has crossed the barrier");

		return job;
	}

}

Create below class to start different services on separate threads to fetch data and once all data are available then start the main thread

package com.roytuts.java.thread.cyclicbarrier.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.roytuts.java.thread.model.Job;
import com.roytuts.java.thread.model.Person;
import com.roytuts.java.thread.service.cyclicbarrier.PersonDBService;
import com.roytuts.java.thread.service.cyclicbarrier.PersonFileService;
import com.roytuts.java.thread.service.cyclicbarrier.PersonRestService;

public class CyclicBarrierMain {

	public static void main(String[] args) {
		int noOfThreads = 3;

		CyclicBarrier barrier = new CyclicBarrier(noOfThreads);

		ExecutorService executorService = Executors.newFixedThreadPool(noOfThreads);

		PersonFileService fileService = new PersonFileService(barrier);
		PersonDBService personDBService = new PersonDBService(barrier);
		PersonRestService personRestService = new PersonRestService(barrier);

		List<Future<Job>> futures = new ArrayList<>();
		futures.add(executorService.submit(fileService));
		futures.add(executorService.submit(personDBService));
		futures.add(executorService.submit(personRestService));
		try {

			List<Person> aggregatedList = new ArrayList<>();
			for (Future<Job> future : futures) {
				List<Person> persons = future.get().getPersons();
				aggregatedList.addAll(persons);
			}

			System.out.println("Aggregated List size : " + aggregatedList.size());

			executorService.shutdown();

			// do something with aggregatedList
			// main tasks start here

		} catch (InterruptedException | ExecutionException e) {
			e.printStackTrace();
		}

	}

}

Run the above class, you will get below output

com.roytuts.java.thread.service.cyclicbarrier.PersonRestService is waiting on the barrier
com.roytuts.java.thread.service.cyclicbarrier.PersonFileService is waiting on the barrier
com.roytuts.java.thread.service.cyclicbarrier.PersonDBService is waiting on the barrier
com.roytuts.java.thread.service.cyclicbarrier.PersonDBService has crossed the barrier
com.roytuts.java.thread.service.cyclicbarrier.PersonFileService has crossed the barrier
com.roytuts.java.thread.service.cyclicbarrier.PersonRestService has crossed the barrier
Aggregated List size : 12

Thanks for reading.

I am a professional Web developer, Enterprise Application developer, Software Engineer and Blogger. Connect me on Roy Tutorials Twitter Facebook  Google Plus Linkedin Or Email Me

Leave a Reply

Your email address will not be published. Required fields are marked *