Spring Webflux
Reactive Programming
Reactive programming is a programming paradigm that promotes an asynchronous, non-blocking, event driven approach to data processing. Let us understand the difference between blocking and non blocking request processing.
Blocking vs Non-blocking (Async) Request Processing
Blocking Request Processing
- In traditional MVC applications,a new servelet thread is created when request comes to the server, this request is passed on to the worker threads for IO operations such as database calls etc.During the time worker threads are busy, remains in waiting status and thus it is blocked.It is called synchronous request processing.
- This limits the number of threads for processing,hampering the performance and limits the number of request processing at maximum server load.
Non-Blocking Request Processing
- In synchronous request processing thread is in waiting state for another request to be processed.
- In non-blocking or asynchronous request processing, no thread is in waiting state. There is generally only one request thread receiving the request.
- Incoming Requests (comes with event handler and callback information) -> Thread Pool(generally small number of threads) -> the request is passed to event handler function -> Handler function immediately starts processing requests -> One thread from the pool collects the response and passes it to the thread pool.
Reactive refers to programming models that are built around reacting to changes, built around publisher, subscriber pattern. According to the Reactive manifesto [1] reactive systems are:
- Responsive: Systems responds in timely manner.
- Resilient: Systems stay responsive in the face of failuer.
- Elastic: Systems stays responsive in the face of failure.
- Message Driven: Asynchronous message passing.
Backpressure
Back Pressure controls the rate of events so that a fast producer does not overhelm its destination.Reactive web programming is well suited for applications, that involve streaming data and real time interactions
Spring Webflux is a parallel version of Spring MVC and supports fully non-blocking reactive streams. It supports back pressure concept and uses netty as the inbuilt server to run reactive applications.
Spring webflux uses project reactor [3] as the reactive library.Reactor is a reactive stream library therefore all of its operators support non blocking back pressure.Spring WebFlux heavily uses two publishers:
- Mono: Returns 0 or 1 element.
```java
Mono<String> mono = Mono.just("Alex");
Mono<String> mono = Mono.empty();
```
Spring Boot with WebFlux
Non-blocking web stack to handle concurrency with small number of threads and to scale with few hardware resources.The disadvantage of spring web/mvc is the nature of blocking of threads of the servlet nature.The term Reactive
refers to programming models built around reacting to constact changes - network components be it I/O etc.This non blocking feature is asynchronous in nature compared to the synchronous servlet nature.
Reactor is the reactive library of choice for Spring WebFlux. It provides the Mono and Flux API types to work on data sequences of 0..1 (Mono) and 0..N (Flux) through a rich set of operators aligned with the ReactiveX vocabulary of operators.
Let us create a sample project for Tutorial Project
with basic REST API’s.
Dependencies: pom.xml
Apart from the usual dependencies, these are few dependencies which are required to implement spring boot with webflux.
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
The above are the dependencies for using relational database.Java uses JDBC
as the primary technology to connect to database.JDBC by default has the blocking nature,thread pool requires threads to run.Reactive runtimes typically use limited resources to run in an effective way for querying multiple API request calls.This additional burden of threads on JDBC reduces the resources,making the request to stack in queue.
What is R2DBC?
Major issue with reactive development is database access in the Java/JVM world remains synchronous.To address this asynchronous calls to the database in the Java world, ADBC (Asynchronous Database Access Connectivity) by Oracle and R2DBC ( Reactive Relational DataBase Connectivity).
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>com.github.jasync-sql</groupId>
<artifactId>jasync-r2dbc-mysql</artifactId>
<version>2.1.16</version>
</dependency>
</dependencies>
jasync-r2dbc-mysql
is a dependency for reactive support for MySQL, is an async, netty based JVM database driver.
Now create a spring starter project with the root configuration as below
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.r2dbc.repository.config.EnableR2dbcRepositories;
import org.springframework.r2dbc.connection.init.ConnectionFactoryInitializer;
import org.springframework.r2dbc.connection.init.ResourceDatabasePopulator;
import org.springframework.web.reactive.config.EnableWebFlux;
import io.r2dbc.spi.ConnectionFactory;
@EnableR2dbcRepositories
@SpringBootApplication
@EnableWebFlux
public class TutorialWebfluxApplication {
public static void main(String[] args) {
SpringApplication.run(TutorialWebfluxApplication.class, args);
}
@Bean
ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {
ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
initializer.setConnectionFactory(connectionFactory);
initializer.setDatabasePopulator(new ResourceDatabasePopulator(new ClassPathResource("schema.sql")));
return initializer;
}
}
@EnableR2dbcRepositories
: Annotation to activate reactive relational repositories usingr2dbc
.@EnableWebFlux
: enables standard spring web reactive configuration for the application.ConnectionFactoryInitializer
: provides a convinient way to configure and initialize a connection factory for reactive database connection in spring application. It will scan for theschema.sql
in the classpath,execute the SQL script to initialize the database when it is created.
schema.sql
should be in the location, /src/main/resources
having the contents
CREATE TABLE IF NOT EXISTS tutorial(tutorial_id INT NOT NULL AUTO_INCREMENT, title VARCHAR(255), description VARCHAR(255), published BOOLEAN, PRIMARY KEY (tutorial_id));
Configuring MySQL and R2DBC
Define the following properties in application.properties
where tutorial_webflux
is the database and spring.data.r2dbc.repositories.enabled=true
is same as @EnableR2dbcRepositories
.
spring.application.name=tutorial-webflux
spring.r2dbc.url=r2dbc:mysql://localhost:3306/tutorial_webflux
spring.r2dbc.username=root
spring.r2dbc.password=root
spring.data.r2dbc.repositories.enabled=true
Data models
Defining a data model named Tutorial
with the following attributes.
@Entity
@Getter
@Setter
@AllArgsConstructor
public class Tutorial {
@Id
private Long tutorialId;
private String title;
private String description;
private boolean published;
}
Repository
Creating an interface TutorialRepository
which extends R2dbcRepository
(which extends Spring Data Reactive ReactiveCrudRepository).
@Repository
public interface TutorialRepository extends R2dbcRepository<Tutorial, Long>{
Mono<Tutorial> findByTutorialId(Long id);
}
Service
Creating a TutorialService
to find a Tutorial entity and save a tutorial entity.
@Service
public class TutorialService {
@Autowired
TutorialRepository tutorialRepository;
public Mono<Tutorial> findTutorialById(Long id){
return tutorialRepository.findByTutorialId(id);
}
public Mono<Tutorial> saveTutorial(Tutorial tutorial){
return tutorialRepository.save(tutorial);
}
}
Mono object represents a single or empty value,here in the findByTutorialById()
method it returns single or empty object on finding by a particular tutorial id.
Controller Layer
@RestController
@AllArgsConstructor
public class TutorialController {
private TutorialService tutorialService;
@GetMapping("/tutorials/{id}")
@ResponseStatus(HttpStatus.OK)
public Mono<Tutorial> getTutorialById(@PathVariable("id") Long id) {
return tutorialService.findTutorialById(id);
}
@PostMapping("/tutorials")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Tutorial> createTutorial(@RequestBody Tutorial tutorial) {
return tutorialService.saveTutorial(
new Tutorial(tutorial.getTutorialId(), tutorial.getTitle(), tutorial.getDescription(), false));
}
}
In TutorialController
there are two endpoints, one GET
& POST
operations, observe the return type is Mono<Tutorial>
, where the object willl return either one instance of Tutorial
or empty.
GET
operation is to fetch a Tutorial
instance based on the id
from PathVariable from the repository, while the POST
operation saves data onto the database.
Test API’s
Testing the POST operation using curl to save data onto the database, setting the Content-Type
using the -H
flag as application/json
along with the URL.
$ curl -d '{
"tutorialId": 5,
"title": "breathe_become_air",
"description": "kalanithi"
}' -H 'Content-Type: application/json' http://localhost:8080/tutorials
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 179 100 89 100 90 3118 3153 --:--:-- --:--:-- --:--:-- 6392{"tutorialId":5,"title":"breathe_become_air","description":"kalanithi","published":false}
Testing the GET
operation of the application and fetching the tutorial record with id 5
$ curl -v http://localhost:8080/tutorials/5
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0* Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /tutorials/5 HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.87.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< Content-Type: application/json
< Content-Length: 89
<
{ [89 bytes data]
100 89 100 89 0 0 6098 0 --:--:-- --:--:-- --:--:-- 6357{"tutorialId":5,"title":"breathe_become_air","description":"kalanithi","published":false}
* Connection #0 to host localhost left intact
Mono & Flux
Till now we were using Mono
which fetches at most zero or one object, what if in a situation we want to fetch more than one object, maybe a list of objects,here comes FLUX
.
Mono is a type that represents a stream of data that emits zero or one item.
Features of Mono:
- Laziness: starts to emit data when there is one subscriber.
- Error Handling: provides operators for handling errors.
- Combining Operators: combine multiple monos into single mono.
Flux is a type in reactive programming that emits zero or more items.It is publisher of a sequence of elements and can emit data either synchronously or asynchronously.
Features of Flux
-
Backpressure: the ability to control publishing data where the data being produced is faster than the rate of date being consumed by the subscriber, which in turn overheads and causes performance issues. Flux can handle this situation.
-
Hot and Cold publisher: cold publisher is the one that emits same sequence of data to all subscribers, while hot publisher is the one that emits independent data to each subscriber.
-
Error Handling: provides operators to handle error within the reactive programming.
Flux Implementation
public Flux<Tutorial> findAll(){
return tutorialRepository.findAll();
}
@GetMapping("/tutorials")
@ResponseStatus(HttpStatus.OK)
public Flux<Tutorial> fetchAllTutorials(){
return tutorialService.findAll();
}
Testing the Flux implemented API using curl
$ curl -v http://localhost:8080/tutorials
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0* Trying 127.0.0.1:8080...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /tutorials HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.87.0
> Accept: */*
>
* Mark bundle as not supporting multiuse
< HTTP/1.1 201 Created
< transfer-encoding: chunked
< Content-Type: application/json
<
{ [386 bytes data]
100 357 0 357 0 0 15870 0 --:--:-- --:--:-- --:--:-- 16227
[{"tutorialId":1,"title":"wingsoffire","description":"apjabdulkalam","published":true},
{"tutorialId":2,"title":"breathe_become_air","description":"kalanithi","published":false},
{"tutorialId":3,"title":"breathe_become_air","description":"kalanithi","published":false},
{"tutorialId":5,"title":"breathe_become_air","description":"kalanithi","published":false}]
* Connection #0 to host localhost left intact
this is a GET
operation to fetch all the records present in the database, last few lines are the records fetched from the database.
Fetching tutorials starting with a particular string.
@GetMapping("/tutorials/title")
@ResponseStatus(HttpStatus.OK)
public Flux<Tutorial> getTutorials(@RequestParam String title) {
return tutorialService.findByTitleContaining(title);
}
public Flux<Tutorial> findByTitleContaining(String title){
return tutorialRepository.findByTitleContaining(title);
}
Testing: GET
operation based on request param using the flag -G
stands for GET and also using -d
for data.
$ curl -G -d "title=wings" http://localhost:8080/tutorials/title
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 87 0 87 0 0 5393 0 --:--:-- --:--:-- --:--:-- 5437[{"tutorialId":1,"title":"wingsoffire","description":"apjabdulkalam","published":true}]
References
[1] https://www.reactivemanifesto.org/