Saturday, May 27, 2023

Creating a Reactive Microservice with Kotlin Live Code Example with MongoDb

 Creating A Reactive Microservice with Kotlin

Reactive microservices are the next step in the evolution of microservices. Based on the reactive paradigm, they target delivering more responsive, resilient, and elastic message-driven services that will outperform the more traditional non-reactive architectures. We will learn how easily we can create them using Spring Framework 5.0, and how we can use reactive programming to create them. We learned about the benefits of reactive programming. You can review the section covering reactive programming to understand this topic further. The reactive microservices that we will create based on our previously created RESTful API examples, showing how easily we can adapt to this new model.

In this blog, you will learn about: 

Spring WebFlux 

Router functions

Mono 

Flux 

Introduction to functional programming Reactive error handling

Understanding Spring WebFlux 

Spring WebFlux is a new component introduced in Spring Framework 5.0 that allows the creation of reactive microservices using Netty as the new web/application server. WebFlux extensively uses the Reactor Framework to implement the reactive streams pattern. In this section, we will understand how to create Spring WebFlux applications, and how we can use them to migrate our non-reactive microservices into this new technological stack.

Using Netty 

Netty was originally developed by JBoss with the idea to create a Client-Server Framework that allows us to perform non-blocking IO operations. To archive this capability, we use a message-driven implementation of the reactor pattern. Nowadays, it is supported by an extensive open source community. Netty includes support for major algorithm and protocols such as HTTP, SSL/TSL, or DNS, but adds support for modern protocols such as HTTP/2, WebSocket, or Google Protocol Buffers. This is just a summary of some of Netty's capabilities. You can find more information on Netty on their website at https://netty.io/. Spring Boot 1.x uses Apache Tomcat as the default application/web server, but Tomcat performs only blocking operations. In order to be able to create reactive services, Spring Boot 2.0 has chosen Netty instead, because of its non-blocking IO capabilities.

When using Spring Boot 2.0, if we include the Spring WebFlux component, when our application starts it will create a new Netty server, but if we use the standard Spring Web, a new Tomcat server will be started instead. Understanding Microservices, one of the desired qualities for any reactive system is to be responsive; event-driven software, such as Netty, will fulfill this requirement among the rest of the Reactive Manifesto. We analyze the Reactive Manifesto in Chapter 1, Understanding Microservices, in the Reactive microservices section. With this technology, our microservices can handle a higher workload and will be more effective than ever before. There are other non-blocking IO systems for different technologies such as NodeJS, Nginx, Apache Mina, Vert.X, or Akka. It will probably be a good idea to learn some of them since reactive systems will be one of the most used technologies in coming years.

Blocking is not reactive 

We can see that we can easily convert our more traditional non-reactive microservices to reactive, but this service is not fully reactive yet. Our controller may be, but if the operation performed is a blocking operation, it will do blocking operations as any other non-reactive system would do. So as we have declared, when a client invokes our URL, we will call our service to either get a customer or search for them, and when this operation is completed, we serialize the results as JSON; only then will we return the value to the consumer, as our operation is a blocking operation. We may need to modify our service to become a reactive service.

Understanding subscribers and publishers First

we need to understand a core component in reactive programming, the subscribe and publish mechanism. Reactive programming is based on the event-model mechanism, in which a set of events are triggered and dispatched to whoever needs them. This abstract concept can easily be understood with how we handle actions from users in almost all UI Frameworks. Let's think that we want to react to a UI action, for example, pressing a button and then performing an operation such as closing a window. In a normal UI Framework, user actions such as pressing buttons, selecting menus, or scrolling the contents of a window can be considered events. When we choose to listen to those events, what we are defining is a subscriber, an object whose responsibility is to receive an event when it is produced. On the other hand, we may need to generate events; for example, when we click with our mouse on a button, a new event about the button being clicked needs to be triggered. By doing so we define a publisher, an object whose responsibility is to emit events. Our UI Framework will probably connect things together and when we click the button, the publisher will generate the event, and the subscriber will be notified about it; then, it can react by performing the desired functionality, closing the window. But events can be chained and combined. In a UI Framework, we may already have a publisher that is sending any mouse click event on the screen, and we may also have a subscriber for those events. Then, when we click on the screen, our publisher emits that generic message and the more generic subscribe will get it and find out that the position of the mouse is actually on a certain button. This generic subscribe will trigger a new event about that button being clicked on, however, is subscribe to that event could process that as an action to perform the desired logic.

Let's visualize this with a bit of pseudocode:

Framework Starts

Registering MouseClick as a subscriber for mouse.click events

Registering MouseHandler as a publisher of mouse.click events

Application Starts

This is probably not the most optimal implementation of a UI event system, but it serves to illustrate the concept. In each of these subscriber and publisher objects, we are not blocking the operation waiting for a result; they are in a way listeners that will be invoked when required. Meanwhile, other actions and events in our UI can happen, even simultaneously, such as pressing a button at the same time that we press a key. The publish and subscribe mechanism is not a new concept, and neither is the event-driven system; they have been used for a very long time, from managing our hardware to the more complicated message queue systems. However, with the reactive programming approach, they are more relevant than ever.

Spring uses the Reactor Framework to create reactive microservices. Our controllers can become publishers of results, and Spring will subscribe to produce more events that will send the data back to whoever is using our services. This data will be sent in a reactive stream, as was defined in the reactive streams specification, providing non-blocking back-pressure.

MONO

Publishing a single object Reactor provides a way to define a reactive publisher through a class named Mono, but that publisher can only send one result. What we should understand is that a Mono is not actually the value of the Customer instance that we create, it is a promise of what we are going to get. When we declare something as Mono<Customer>, we are just indicating that this is a publisher that in future will publish a customer. When someone is subscribed to that publisher, the data will be consumed.

FLUX

Publishing multiple objects Reactor provides a class to create a publisher that will return from 0 to an undetermined number of elements; that class is named Flux. Mono can be used when we only need to return one result in our publisher, whereas Flux can be used for 0 to N including 1, but if we are going to return just 1, it is preferable to use Mono instead of Flux since it is more optimal for that usage.

Now lets have a look at the sample code : This is a live Reactive Microservice Code.

package com.example.demo

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class ReactiveMicroserviceApplication

fun main(args: Array<String>) {
runApplication<ReactiveMicroserviceApplication>(*args)
}
package com.example.demo

import org.springframework.data.mongodb.core.mapping.Document

@Document(collection = "Customers")
data class Customer(var id: Int = 0, var name: String = "",
                         var telephone: Telephone? = null) {
data class Telephone(var countryCode: String = "",
                    var telephoneNumber: String = "")
}
package com.example.demo

import org.springframework.http.HttpStatus
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.BodyInserters.fromObject
import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse.*
import org.springframework.web.reactive.function.server.bodyToMono
import java.net.URI

@Component
class CustomerHandler(val customerService: CustomerService) {
fun get(serverRequest: ServerRequest) =
customerService.getCustomer(serverRequest.pathVariable("id").toInt())
.flatMap { ok().body(fromObject(it)) }
.switchIfEmpty(status(HttpStatus.NOT_FOUND).build())

fun create(serverRequest: ServerRequest) =
customerService.createCustomer(serverRequest.bodyToMono()).flatMap {
created(URI.create("/functional/customer/${it.id}")).build()
}

fun delete(serverRequest: ServerRequest) =
customerService.deleteCustomer(serverRequest.pathVariable("id").toInt())
.flatMap {
if (it) ok().build()
else status(HttpStatus.NOT_FOUND).build()
}

fun search(serverRequest: ServerRequest) =
ok().body(
customerService.searchCustomers(
serverRequest.queryParam("nameFilter")
.orElse("")
), Customer::class.java
)
}
package com.example.demo

import jakarta.annotation.PostConstruct
import org.springframework.data.mongodb.core.ReactiveMongoTemplate
import org.springframework.data.mongodb.core.find
import org.springframework.data.mongodb.core.findById
import org.springframework.data.mongodb.core.query.Criteria.where
import org.springframework.data.mongodb.core.query.Query
import org.springframework.data.mongodb.core.query.isEqualTo
import org.springframework.data.mongodb.core.remove
import org.springframework.stereotype.Repository
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toMono


@Repository
class CustomerRepository(private val template: ReactiveMongoTemplate) {
companion object {
val initialCustomers = listOf(
Customer(1, "Kotlin"),
Customer(2, "Spring"),
Customer(3, "Microservice",Customer.Telephone("+91", "7123456789"))
)
}

@PostConstruct
fun initializeRepository() =
initialCustomers.map(Customer::toMono).map(this::create).map(Mono<Customer>::subscribe)

fun create(customer: Mono<Customer>) = template.save(customer)
fun findById(id: Int) = template.findById<Customer>(id)
fun deleteById(id: Int) = template.remove<Customer>(Query(where("_id").isEqualTo(id)))
fun findCustomer(nameFilter: String) = template.find<Customer>(
Query(where("name").regex(".*$nameFilter.*", "i"))
)
}
package com.example.demo

import org.springframework.context.annotation.Bean
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.server.router

@Component
class CustomerRouter(private val customerHandler: CustomerHandler) {
@Bean
fun customerRoutes() = router {
"/customer".nest {
GET("/{id}", customerHandler::get)
POST("/", customerHandler::create)
DELETE("/{id}", customerHandler::delete)
}
"/customers".nest {
GET("/", customerHandler::search)
}
}
}
package com.example.demo

import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

interface CustomerService {
fun getCustomer(id: Int): Mono<Customer>
fun createCustomer(customer: Mono<Customer>): Mono<Customer>
fun deleteCustomer(id: Int): Mono<Boolean>
fun searchCustomers(nameFilter: String): Flux<Customer>
}
package com.example.demo

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono

@Service
class CustomerServiceImpl : CustomerService {
@Autowired
lateinit var customerRepository: CustomerRepository

override fun getCustomer(id: Int) = customerRepository.findById(id)
override fun createCustomer(customer: Mono<Customer>) =
                   customerRepository.create(customer)
override fun deleteCustomer(id: Int) =
                   customerRepository.deleteById(id).map { it.deletedCount > 0 }
override fun searchCustomers(nameFilter: String) =
                    customerRepository.findCustomer(nameFilter)
}
spring:
data:
mongodb:
uri: "mongodb://localhost:27700"
database: "microservices"
spring.jackson.default-property-inclusion: NON_NULL
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>ReactiveMicroservice</name>
<description>Demo project for Spring Boot</description>
<properties>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>17</java.version>
<kotlin.version>1.8.21</kotlin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kotlin</groupId>
<artifactId>reactor-kotlin-extensions</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<sourceDirectory>${project.basedir}/src/main/kotlin</sourceDirectory>
<testSourceDirectory>${project.basedir}/src/test/kotlin</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<configuration>
<args>
<arg>-Xjsr305=strict</arg>
</args>
<compilerPlugins>
<plugin>spring</plugin>
</compilerPlugins>
</configuration>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-allopen</artifactId>
<version>${kotlin.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>

</project>

Microservices Responses :

[

  {

    "id": 1,

    "name": "Kotlin"

  },

  {

    "id": 2,

    "name": "Spring"

  },

  {

    "id": 18,

    "name": "New Customer",

    "telephone": {

      "countryCode": "+44",

      "telephoneNumber": "7123456789"

    }

  },

  {

    "id": 3,

    "name": "Microservice",

    "telephone": {

      "countryCode": "+44",

      "telephoneNumber": "7123456789"

    }

  }

]

Bad Request Example :

{

  "error": "error creating customer",

  "message": "JSON decoding error: Unexpected character ('b' (code 98)): expected a valid value (number, String, array, object, 'true', 'false' or 'null'); nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('b' (code 98)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')\n at [Source: UNKNOWN; line: 9, column: 2]"

}

I have shared the code . you can copy-paste the code and use it  and start your Reactive Microservice journey with Kotlin.


No comments: