Spring Webflux

Spring Webflux

Reactive Programming

Reactive programming is a programming paradigm that promotes an asynchronous, non-blocking, event driven approach to data processing. Let us understand the difference between blocking and non blocking request processing.

Blocking vs Non-blocking (Async) Request Processing

Blocking Request Processing

- In traditional MVC applications,a new servelet thread is created when request comes to the server, this request is passed on to the worker threads for IO operations such as database calls etc.During the time worker threads are busy, remains in waiting status and thus it is blocked.It is called synchronous request processing.

- This limits the number of threads for processing,hampering the performance and limits the number of request processing at maximum server load.

Non-Blocking Request Processing

- In synchronous request processing thread is in waiting state for another request to be processed.

- In non-blocking or asynchronous request processing, no thread is in waiting state. There is generally only one request thread receiving the request.

- Incoming Requests (comes with event handler and callback information) -> Thread Pool(generally small number of threads) -> the request is passed to event handler function -> Handler function immediately starts processing requests -> One thread from the pool collects the response and passes it to the thread pool. 

Reactive refers to programming models that are built around reacting to changes, built around publisher, subscriber pattern. According to the Reactive manifesto [1] reactive systems are:

-  Responsive: Systems responds in timely manner.
-  Resilient: Systems stay responsive in the face of failuer.
-  Elastic: Systems stays responsive in the face of failure.
-  Message Driven: Asynchronous message passing.

Backpressure

Back Pressure controls the rate of events so that a fast producer does not overhelm its destination.Reactive web programming is well suited for applications, that involve streaming data and real time interactions

Spring Webflux is a parallel version of Spring MVC and supports fully non-blocking reactive streams. It supports back pressure concept and uses netty as the inbuilt server to run reactive applications.

Spring webflux uses project reactor [3] as the reactive library.Reactor is a reactive stream library therefore all of its operators support non blocking back pressure.Spring WebFlux heavily uses two publishers:

- Mono: Returns 0 or 1 element.
```java
Mono<String> mono = Mono.just("Alex");
Mono<String> mono = Mono.empty();
```

Spring Boot with WebFlux

Non-blocking web stack to handle concurrency with small number of threads and to scale with few hardware resources.The disadvantage of spring web/mvc is the nature of blocking of threads of the servlet nature.The term Reactive refers to programming models built around reacting to constact changes - network components be it I/O etc.This non blocking feature is asynchronous in nature compared to the synchronous servlet nature.

Reactor is the reactive library of choice for Spring WebFlux. It provides the Mono and Flux API types to work on data sequences of 0..1 (Mono) and 0..N (Flux) through a rich set of operators aligned with the ReactiveX vocabulary of operators.

Let us create a sample project for Tutorial Project with basic REST API’s.

Dependencies: pom.xml

Apart from the usual dependencies, these are few dependencies which are required to implement spring boot with webflux.

<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-jpa</artifactId>
		</dependency>
        		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-r2dbc</artifactId>
		</dependency>
        <dependency>
			<groupId>com.mysql</groupId>
			<artifactId>mysql-connector-j</artifactId>
			<scope>runtime</scope>
		</dependency>

The above are the dependencies for using relational database.Java uses JDBC as the primary technology to connect to database.JDBC by default has the blocking nature,thread pool requires threads to run.Reactive runtimes typically use limited resources to run in an effective way for querying multiple API request calls.This additional burden of threads on JDBC reduces the resources,making the request to stack in queue.

What is R2DBC?

Major issue with reactive development is database access in the Java/JVM world remains synchronous.To address this asynchronous calls to the database in the Java world, ADBC (Asynchronous Database Access Connectivity) by Oracle and R2DBC ( Reactive Relational DataBase Connectivity).

	</dependency>
        		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-r2dbc</artifactId>
		</dependency>

		<dependency>
			<groupId>com.github.jasync-sql</groupId>
			<artifactId>jasync-r2dbc-mysql</artifactId>
			<version>2.1.16</version>
		</dependency>
	</dependencies>

jasync-r2dbc-mysql is a dependency for reactive support for MySQL, is an async, netty based JVM database driver.

Now create a spring starter project with the root configuration as below

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.r2dbc.connection.init.ConnectionFactoryInitializer;
import org.springframework.r2dbc.connection.init.ResourceDatabasePopulator;
import org.springframework.web.reactive.config.EnableWebFlux;

import io.r2dbc.spi.ConnectionFactory;

@EnableR2dbcRepositories
@SpringBootApplication
@EnableWebFlux
public class TutorialWebfluxApplication {

	public static void main(String[] args) {
		SpringApplication.run(TutorialWebfluxApplication.class, args);
	}
	
	@Bean
	  ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {

	    ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
	    initializer.setConnectionFactory(connectionFactory);
	    initializer.setDatabasePopulator(new ResourceDatabasePopulator(new ClassPathResource("schema.sql")));

	    return initializer;
	  }

}
  • @EnableR2dbcRepositories: Annotation to activate reactive relational repositories using r2dbc.
  • @EnableWebFlux: enables standard spring web reactive configuration for the application.
  • ConnectionFactoryInitializer: provides a convinient way to configure and initialize a connection factory for reactive database connection in spring application. It will scan for the schema.sql in the classpath,execute the SQL script to initialize the database when it is created.

schema.sql should be in the location, /src/main/resources having the contents

CREATE TABLE IF NOT EXISTS tutorial(tutorial_id INT NOT NULL AUTO_INCREMENT, title VARCHAR(255), description VARCHAR(255), published BOOLEAN, PRIMARY KEY (tutorial_id));

Configuring MySQL and R2DBC

Define the following properties in application.properties where tutorial_webflux is the database and spring.data.r2dbc.repositories.enabled=true is same as @EnableR2dbcRepositories.

spring.application.name=tutorial-webflux
spring.r2dbc.url=r2dbc:mysql://localhost:3306/tutorial_webflux
spring.r2dbc.username=root
spring.r2dbc.password=root
spring.data.r2dbc.repositories.enabled=true
Data models

Defining a data model named Tutorial with the following attributes.

@Entity
@Getter
@Setter
@AllArgsConstructor
public class Tutorial {

	@Id
	private Long tutorialId;

	private String title;

	private String description;

	private boolean published;

}
Repository

Creating an interface TutorialRepository which extends R2dbcRepository (which extends Spring Data Reactive ReactiveCrudRepository).

@Repository
public interface TutorialRepository extends R2dbcRepository<Tutorial, Long>{

	Mono<Tutorial> findByTutorialId(Long id);

}
Service

Creating a TutorialService to find a Tutorial entity and save a tutorial entity.

TutorialService.java
@Service
public class TutorialService {

	@Autowired
	TutorialRepository tutorialRepository;
	
	public Mono<Tutorial> findTutorialById(Long id){
		return tutorialRepository.findByTutorialId(id);
	}
	
	public Mono<Tutorial> saveTutorial(Tutorial tutorial){
		return tutorialRepository.save(tutorial);
	}
}

Mono object represents a single or empty value,here in the findByTutorialById() method it returns single or empty object on finding by a particular tutorial id.

Controller Layer
TutorialController
@RestController
@AllArgsConstructor
public class TutorialController {

	private TutorialService tutorialService;

	@GetMapping("/tutorials/{id}")
	@ResponseStatus(HttpStatus.OK)
	public Mono<Tutorial> getTutorialById(@PathVariable("id") Long id) {
		return tutorialService.findTutorialById(id);
	}

	@PostMapping("/tutorials")
	@ResponseStatus(HttpStatus.CREATED)
	public Mono<Tutorial> createTutorial(@RequestBody Tutorial tutorial) {
		return tutorialService.saveTutorial(
				new Tutorial(tutorial.getTutorialId(), tutorial.getTitle(), tutorial.getDescription(), false));
	}

}

In TutorialController there are two endpoints, one GET & POST operations, observe the return type is Mono<Tutorial>, where the object willl return either one instance of Tutorial or empty.

GET operation is to fetch a Tutorial instance based on the id from PathVariable from the repository, while the POST operation saves data onto the database.

Test API’s

Testing the POST operation using curl to save data onto the database, setting the Content-Type using the -H flag as application/json along with the URL.

$ curl -d '{
    "tutorialId": 5,
    "title": "breathe_become_air",
    "description": "kalanithi"
}' -H 'Content-Type: application/json' http://localhost:8080/tutorials

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   179  100    89  100    90   3118   3153 --:--:-- --:--:-- --:--:--  6392{"tutorialId":5,"title":"breathe_become_air","description":"kalanithi","published":false}

Testing the GET operation of the application and fetching the tutorial record with id 5

$ curl -v http://localhost:8080/tutorials/5
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /tutorials/5 HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.87.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: application/json
< Content-Length: 89
<
{ [89 bytes data]
100    89  100    89    0     0   6098      0 --:--:-- --:--:-- --:--:--  6357{"tutorialId":5,"title":"breathe_become_air","description":"kalanithi","published":false}
* Connection #0 to host localhost left intact
Mono & Flux

Till now we were using Mono which fetches at most zero or one object, what if in a situation we want to fetch more than one object, maybe a list of objects,here comes FLUX.

Mono is a type that represents a stream of data that emits zero or one item.

Features of Mono:

  • Laziness: starts to emit data when there is one subscriber.
  • Error Handling: provides operators for handling errors.
  • Combining Operators: combine multiple monos into single mono.

Flux is a type in reactive programming that emits zero or more items.It is publisher of a sequence of elements and can emit data either synchronously or asynchronously.

Features of Flux

  • Backpressure: the ability to control publishing data where the data being produced is faster than the rate of date being consumed by the subscriber, which in turn overheads and causes performance issues. Flux can handle this situation.

  • Hot and Cold publisher: cold publisher is the one that emits same sequence of data to all subscribers, while hot publisher is the one that emits independent data to each subscriber.

  • Error Handling: provides operators to handle error within the reactive programming.

Flux Implementation
TutorialService.java
public Flux<Tutorial> findAll(){
		return tutorialRepository.findAll();
	}
TutorialController.java
	@GetMapping("/tutorials")
	@ResponseStatus(HttpStatus.OK)
	public Flux<Tutorial> fetchAllTutorials(){
		return tutorialService.findAll();
	}

Testing the Flux implemented API using curl

$ curl -v http://localhost:8080/tutorials
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0*   Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /tutorials HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.87.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 201 Created
< transfer-encoding: chunked
< Content-Type: application/json
<
{ [386 bytes data]
100   357    0   357    0     0  15870      0 --:--:-- --:--:-- --:--:-- 16227
[{"tutorialId":1,"title":"wingsoffire","description":"apjabdulkalam","published":true},
{"tutorialId":2,"title":"breathe_become_air","description":"kalanithi","published":false},
{"tutorialId":3,"title":"breathe_become_air","description":"kalanithi","published":false},
{"tutorialId":5,"title":"breathe_become_air","description":"kalanithi","published":false}]
* Connection #0 to host localhost left intact

this is a GET operation to fetch all the records present in the database, last few lines are the records fetched from the database.

Fetching tutorials starting with a particular string.

TutorialController.java
	@GetMapping("/tutorials/title")
	@ResponseStatus(HttpStatus.OK)
	public Flux<Tutorial> getTutorials(@RequestParam String title) {
		return tutorialService.findByTitleContaining(title);
	}
TutorialService.java
	public Flux<Tutorial> findByTitleContaining(String title){
		return tutorialRepository.findByTitleContaining(title);
		
	}

Testing: GET operation based on request param using the flag -G stands for GET and also using -d for data.

$ curl -G -d "title=wings" http://localhost:8080/tutorials/title
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    87    0    87    0     0   5393      0 --:--:-- --:--:-- --:--:--  5437[{"tutorialId":1,"title":"wingsoffire","description":"apjabdulkalam","published":true}]

References

[1] https://www.reactivemanifesto.org/

[2] https://www.reactive-streams.org/

[3] http://projectreactor.io/

[4] https://www.bezkoder.com/spring-r2dbc-mysql/