This tutorial will show you an example on Spring Boot Functional Reactive Programming (FRP).

What are Reactive and Functional Reactive Programming?

Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.

In other words, Reactive Programming is a style of micro-architecture involving intelligent routing and consumption of events, all combining to change behaviour.

Simply to say, it is about non-blocking applications that are asynchronous and event-driven and require a small number of threads to scale vertically (i.e. within the JVM) rather than horizontally (i.e. through clustering).

Functional Reactive Programming (FRP) is a programming paradigm for reactive programming (asynchronous dataflow programming) using the building blocks of functional programming.

Use Cases Of Reactive Programming?

1. External service calls backend services and most of the backend services are RESTful APIs and they operate on http protocol. So the underlying protocol is blocking and synchronous.

Let’s say implementations of such services often involve calling of other services and then more services depending on the result of the subsequent service calls.

In such situations with so much IO going on, you need to wait for one call to complete before you send next request and your client may get “disconnected/timed out” for some reasons before you managed to assemble the reply for your client.

FRP offers the promise of “composability” of the logic to optimize the complex orchestrations of dependencies between service calls. So it becomes easier to write for the developer of the calling service.

2. Highly concurrent message consumers in message processing system are common enterprise use cases.

When you have to process millions of messages per second and performance matters, then you should pay attention to use Reactive patterns. Reactive patterns fit naturally with faster message processing since an event gets easily translated into a message.

3. As long as developers accept extra layer of abstraction, they can easily forget about the code that they are calling is synchronous or asynchronous. Since it costs precious brain cells to deal with asynchronous programming. Reactive Programming is not the only the solution but FRP tools are also useful to this issue.

Reactive Programming in Java

Java itself does not support Reactive Programming until version 9. However, Java being the powerhouse of enterprise application development, there are a lot of Reactive libraries that provide the Reactive programming on top of JDK. Such libraries are Reactive Streams, RxJava, Reactor, Spring Framework 5.0, RatPack  or may be more.

What is driving to the rise of Reactive programming?

Well, it’s not all just about the technology. The driver is efficient resource utilization, or in other words, spending less money on servers and data centres.

The promise of Reactive is that you can do more with less, specifically you can process higher loads with fewer threads. This is where the intersection of Reactive and non-blocking, asynchronous I/O comes to the foreground.

For the right problem, the effects are elegant. For the wrong problem, the effects might go into reverse (you actually make things worse).

Reactive Programming using Spring Framework

The Spring framework uses Reactor internally for its own reactive support. Reactor is a Reactive Streams implementation that further extends the basic Reactive Streams Publisher contract with the Flux and Mono composable API types to provide declarative operations on data sequences of 0..N and 0..1.

Spring Framework 5 includes a new spring-webflux module. The module contains support for reactive HTTP and WebSocket clients as well as for reactive server web applications including REST, HTML browser, and WebSocket style interactions.

On the server-side WebFlux supports 2 distinct programming models:

  • Annotation-based with @Controller and the other annotations supported also with Spring MVC
  • Functional, Java 8 lambda style routing and handling

Both programming models are executed on the same reactive foundation that adapts non-blocking HTTP runtimes to the Reactive Streams API. The diagram below shows the server-side stack including traditional, Servlet-based Spring MVC on the left from the spring-webmvc module and also the reactive stack on the right from the spring-webflux module.

spring boot reactive

We will see here Functional, Java 8 lambda style routing and handling Reactive Programming example. If you want to implement annotation based programming model then you can check here at WebFlux Framework.

We will also use here Spring Boot Framework to create the example. The example will perform CRUD(Create, Read, Update and Delete) operations using REST APIs.

Enough talking about Reactive Programming… Now let’s move on to the Spring Boot Functional Reactive programming example.

Prerequisites

Knowledge of Spring Boot

Knowledge Java 8

Have JDK installed and Configured

Have Gradle installed and Configured

Softwares

Eclipse Photon for JDK 10 or Eclipse Oxygen 2 for JDK 8

Gradle 4.4.1, Spring Boot starter webflux dependency

Setting up Spring Boot project

Create a gradle based Spring Boot project in Eclipse. Here in this example I am using JDK 10 and Eclipse Photon. You may also use JDK 8 in Eclipse Oxygen 2.

Make sure to update the your build script as shown below:

buildscript {
    ext {
        springBootVersion = '2.0.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

bootJar {
    baseName = 'SpringBootReactive'
    version =  '0.1.0'
}

sourceCompatibility = 10
targetCompatibility = 10

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-webflux')
}

In the above build script I have used JDK version 10 and Spring Boot 2.0.3.

Once you update the build script, try to build the blank project. If you face any build issue related to main class then create a class with main method you should be fine to build the blank project.

Creating a model class

Create below model class that simply has few attributes.

public class WebSite {

	private Integer id;
	private String url;
	private String title;
        
        //getters and setters

}

Creating a repository class

Create below repository class that is a source of data on which you want to do apply some business or processing logic.

For this example, we will use this class as a source of data. Ideally your source of data should be any database or any external service.

Here I have initially provided my two websites in the repository that will help you to test the reactive application.

@Component
public class WebSiteRepository {

	private static final Map<Integer, WebSite> WEBSITES = new HashMap<>();
	static {
		WebSite webSite1 = new WebSite();
		webSite1.setId(1);
		webSite1.setTitle("Roy Tutorials");
		webSite1.setUrl("https://www.roytuts.com");
		WEBSITES.put(1, webSite1);
		WebSite webSite2 = new WebSite();
		webSite2.setId(2);
		webSite2.setTitle("JEE Tutorials");
		webSite2.setUrl("https://www.jeejava.com");
		WEBSITES.put(2, webSite2);
	}
	private static int ID_COUNTER = 3;

	public Flux<WebSite> findAll() {
		return Flux.fromIterable(WEBSITES.values());
	}

	public Mono<WebSite> findById(Integer id) {
		return Mono.just(WEBSITES.get(id));
	}

	public Mono<Void> delete(Integer id) {
		WEBSITES.remove(id);
		return Mono.empty();
	}

	public Mono<Void> add(WebSite webSite) {
		Integer id = ID_COUNTER++;
		webSite.setId(id);
		WEBSITES.put(id, webSite);
		return Mono.empty();
	}

	public Mono<Void> update(WebSite website) {
		WEBSITES.put(website.getId(), website);
		return Mono.empty();
	}

}

In the above example you can find details here on Flux and Mono.

Now we will see two important functions of Reactive Streams – Handler Function and Router Function.

Creating Handler Functions

Writing handler functions as lambdas’ is convenient, but perhaps lacks in readability and becomes less maintainable when dealing with multiple functions. Therefore, it is recommended to group related handler functions into a handler or controller class.

For example, here is a class that exposes a reactive WebSite repository:

@Component
public class WebSiteHandler {

	@Autowired
	private WebSiteRepository webSiteRepository;

	public Mono<ServerResponse> getAllWebSites(ServerRequest serverRequest) {
		Flux<WebSite> webSites = webSiteRepository.findAll();
		return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(webSites, WebSite.class);
	}

	public Mono<ServerResponse> addWebSite(ServerRequest serverRequest) {
		Mono<WebSite> webSite = serverRequest.bodyToMono(WebSite.class);
		webSite.subscribe(wb -> webSiteRepository.add(wb));
		return ServerResponse.ok().build(Mono.empty());
	}

	public Mono<ServerResponse> updateWebSite(ServerRequest serverRequest) {
		Mono<WebSite> webSite = serverRequest.bodyToMono(WebSite.class);
		webSite.subscribe(wb -> webSiteRepository.update(wb));
		return ServerResponse.ok().build(Mono.empty());
	}

	public Mono<ServerResponse> deleteWebSite(ServerRequest serverRequest) {
		Integer webSiteId = Integer.valueOf(serverRequest.pathVariable("id"));
		webSiteRepository.delete(webSiteId);
		return ServerResponse.ok().build(Mono.empty());
	}

	public Mono<ServerResponse> getWebSite(ServerRequest serverRequest) {
		Integer webSiteId = Integer.valueOf(serverRequest.pathVariable("id"));
		Mono<ServerResponse> notFound = ServerResponse.notFound().build();
		Mono<WebSite> webSiteMono = webSiteRepository.findById(webSiteId);
		return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(webSiteMono, WebSite.class)
				.switchIfEmpty(notFound);
	}

}

In the above class, incoming HTTP requests are handled by a HandlerFunction, which is essentially a function that takes a ServerRequest and returns a Mono<ServerResponse>.

ServerRequest and ServerResponse are immutable interfaces and both are fully reactive by building on top of Reactor: the request expose the body as Flux or Mono; the response accepts any Reactive Streams Publisher as body.

ServerRequest gives access to various HTTP request elements: the method, URI, query parameters, and — through the separate ServerRequest.Headers interface — the headers.

Similarly, ServerResponse provides access to the HTTP response. Since it is immutable, you create a ServerResponse with a builder. The builder allows you to set the response status, add response headers, and provide a body.

getAllWebSites is a handler function that returns all WebSite objects found in the repository as JSON.

getWebSite is a handler function that returns single WebSite object for a given id found in the repository as JSON.

addWebSite and updateWebSite are handler functions that store a new WebSite and update existing WebSite contained in the request body, respectively.

Note that WebSiteRepository.add(WebSite) returns Mono<Void>: an empty Mono that emits a completion signal when the website has been read from the request and stored. So we use the build(Publisher<Void>) method to send a response when that completion signal is received, i.e. when the WebSite has been added.

getWebSite is a handler function that returns a single website, identified via the path variable id. We retrieve that WebSite via the repository, and create a JSON response if it is found. If it is not found, we use switchIfEmpty(Mono<T>) to return a 404 Not Found response.

Creating Router Functions

Incoming requests are routed to handler functions with a RouterFunction, which is a function that takes a ServerRequest, and returns a Mono<HandlerFunction>. If a request matches a particular route, a handler function is returned; otherwise it returns an empty Mono.

Given the WebSiteHandler we showed above, we can now define a router function that routes to the respective handler functions.

We use method-references to refer to the handler functions:

@Configuration
public class WebSiteRouter {

	@Bean
	public RouterFunction<ServerResponse> route(WebSiteHandler webSiteHandler) {
		RouterFunction<ServerResponse> webSiteRoute = RouterFunctions
				.route(RequestPredicates.GET("/website").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
						webSiteHandler::getAllWebSites)
				.andRoute(RequestPredicates.GET("/website/{id}")
						.and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), webSiteHandler::getWebSite)
				.andRoute(
						RequestPredicates.POST("/website").and(RequestPredicates.accept(MediaType.APPLICATION_JSON))
								.and(RequestPredicates.contentType(MediaType.APPLICATION_JSON)),
						webSiteHandler::addWebSite)
				.andRoute(
						RequestPredicates.PUT("/website").and(RequestPredicates.accept(MediaType.APPLICATION_JSON))
								.and(RequestPredicates.contentType(MediaType.APPLICATION_JSON)),
						webSiteHandler::updateWebSite)
				.andRoute(
						RequestPredicates.DELETE("/website/{id}")
								.and(RequestPredicates.contentType(MediaType.APPLICATION_JSON)),
						webSiteHandler::deleteWebSite);
		return webSiteRoute;
	}

}

Besides router functions, you can also compose request predicates, by calling RequestPredicate.and(RequestPredicate) or RequestPredicate.or(RequestPredicate).

These work as expected: for and the resulting predicate matches if both given predicates match; or matches if either predicate does. Most of the predicates found in RequestPredicates are compositions.

Notice how we have used lambda expression to functional interfaces in the above Router Functions, that’s why the title name is given as Spring Boot Functional Reactive Programming example.

Creating main class

We need to deploy the application into a server. So creating below main class will simply treated as a standalone application. Along the way, you use Reactive Spring’s support for embedding the Netty server as the HTTP runtime, instead of deploying to an external instance.

@SpringBootApplication(scanBasePackages = "com.jeejava")
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

}

Reactive programming is just about finding ways to stream data and events through a pipeline, rather than dealing discreetly with each event, change, or response. You can develop your application not in a set of interconnected callback functions and event listeners, but a logical flow from events to actions, which themselves flow into other things.

Creating Cors filter to allow access to server from your client(s)

This is required in order to gain access to server REST services. You are easily allowed to get access to http GET method without any issue but for other methods you need below filter class otherwise you will be stuck in preflight request.

@Component
public class CorsFilter implements WebFilter {

	@Override
	public Mono<Void> filter(final ServerWebExchange serverWebExchange, final WebFilterChain webFilterChain) {
		// Adapted from
		// https://sandstorm.de/de/blog/post/cors-headers-for-spring-boot-kotlin-webflux-reactor-project.html
		serverWebExchange.getResponse().getHeaders().add("Access-Control-Allow-Origin", "*");
		serverWebExchange.getResponse().getHeaders().add("Access-Control-Allow-Methods",
				"GET, PUT, POST, DELETE, OPTIONS");
		serverWebExchange.getResponse().getHeaders().add("Access-Control-Allow-Headers",
				"DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Content-Range,Range");
		if (serverWebExchange.getRequest().getMethod() == HttpMethod.OPTIONS) {
			serverWebExchange.getResponse().getHeaders().add("Access-Control-Max-Age", "1728000");
			serverWebExchange.getResponse().setStatusCode(HttpStatus.NO_CONTENT);
			return Mono.empty();
		} else {
			serverWebExchange.getResponse().getHeaders().add("Access-Control-Expose-Headers",
					"DNT,X-CustomHeader,Keep-Alive,User-Agent,X-Requested-With,If-Modified-Since,Cache-Control,Content-Type,Content-Range,Range");
			return webFilterChain.filter(serverWebExchange);
		}
	}

}

Testing the application

Now you can test the application using any client as per your choice. You can use curl command to access the services. You could create Java we client to access the services or even you can create your own client using ReactJS, AngularJS or any other technologies to consume the services.

Congratulations! You have developed Spring Boot Functional Reactive Programming example.

You may also like to read Spring Boot + Angular Functional Reactive Programming

Thanks for reading.

Tags:

I am a professional Web developer, Enterprise Application developer, Software Engineer and Blogger. Connect me on Roy Tutorials | TwitterFacebook Google PlusLinkedin | Reddit

Leave a Reply

Your email address will not be published. Required fields are marked *