Building high scalable microservices with CQRS and Event Sourcing Using Spring, Kafka and Cassandra
1. Overview
This article aims to provide readers with an in-depth understanding of Command Query Responsibility Segregation (CQRS) and Event Sourcing, along with a practical implementation using Spring Data (Apache Cassandra), Kafka and WebFlux. By the end of this tutorial, readers will have the knowledge and tools necessary to leverage the benefits of these concepts in their microservices architecture, you can checkout the sources from our repository
2. CQRS Pattern: Command Query Responsibility Segregation
In modern software development, building complex systems that efficiently handle both reads and writes can be a challenge. One pattern that has gained significant attention in recent years is Command Query Responsibility Segregation (CQRS). CQRS provides a way to separate the responsibilities of handling commands (write operations) and queries (read operations) in an application, leading to improved scalability, performance, and flexibility. In this article, we will delve into the CQRS pattern, its key principles, and the benefits it offers.
3. Event sourcing
Is a software architectural pattern that captures and persists all changes (events) made to an application's state as a sequence of immutable events. Instead of storing the current state of an entity, event sourcing focuses on storing the history of events that have occurred over time to derive the current state.
In event sourcing, every action or operation that modifies the state of the system is represented as an event. These events are appended to an event log or event store, which serves as the single source of truth for the application's state. The event log contains a sequential series of events that can be used to reconstruct the state of the system at any given point in time.
To derive the current state from the event log, the application replays the events in sequence, applying them one by one to create the current state. By replaying events, it becomes possible to rebuild the state of the system at any past point in time or perform complex queries by applying different combinations of events.
4. CQRS/ES:
(Command Query Responsibility Segregation / Event Sourcing) is a combination of two architectural patterns: CQRS and Event Sourcing.
When combined, CQRS and Event Sourcing provide several benefits:
- Scalability: CQRS allows read and write models to scale independently, enabling better performance for both read-heavy and write-heavy workloads.
- Flexibility: CQRS allows for different storage models and optimization strategies for the read and write sides, providing flexibility in choosing the most suitable data storage and retrieval mechanisms for each task.
- Auditability and traceability: Event sourcing captures a complete history of events, facilitating audit trails and providing detailed insights into the system's behaviour.
- Event-driven architecture: Event sourcing inherently aligns with event-driven architectures, as events serve as the integration points between different components or microservices.
- Time travel and temporal querying: Event sourcing enables the ability to rebuild past states and perform temporal queries, which can be valuable for analysis and debugging.
5. Concepts
6. Designing the Architecture
In order to implement a read and a write side, two data stores are used one for read purposes to allow optimizations for read operations, and another optimized for write operations where this will be an append-only data storage, to solve the integration between commands and events, Kafka topics are being used.
7. Implementation of the service:
-
Read Side:
This is a simple Spring Reactive Data Cassandra Repository implementation where the data will be queried from a Cassandra database, this database will be the read side and is also referred to as read projection.
-
Write Side:
To achieve a complete decoupling and scalability of the overall system, a message-oriented approach is employed. Apache Kafka serves as the message hub in this implementation, enabling seamless communication and scalability. The system generates command messages, which subsequently produce events stored in the event store. In this specific implementation, another Cassandra Database is used as the event store. These events drive the changes on the read side of the system, ensuring near real-time updates. This architectural design facilitates efficient event processing and empowers the read side to reflect the generated events in a timely manner.
-
Entities:
User: Read the side definition of the User
import lombok.*; import org.springframework.data.cassandra.core.mapping.PrimaryKey; import org.springframework.data.cassandra.core.mapping.Table; @Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor @Table public class User { @PrimaryKey private String id; private String fullName; private String email; }
EventData: Event-related information
import lombok.*; import org.springframework.data.cassandra.core.mapping.PrimaryKey; import org.springframework.data.cassandra.core.mapping.Table; import java.util.Date; @Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor @Table public class EventData { @PrimaryKey private String id; private String data; private Date time; }
-
Repositories:
Spring data will be used to simplify Cassandra related data operations
EventDataRepository
import org.springframework.data.cassandra.repository.ReactiveCassandraRepository; import org.springframework.stereotype.Repository; import site.thesampleapp.domain.entity.EventData; @Repository public interface EventDataRepository extends ReactiveCassandraRepository
{ } UserRepository
import org.springframework.data.cassandra.repository.ReactiveCassandraRepository; import org.springframework.stereotype.Repository; import site.thesampleapp.domain.entity.User; @Repository public interface UserRepository extends ReactiveCassandraRepository
{ } -
Services:
The service layer will take care of business logic
UserEventService
import site.thesampleapp.domain.event.Event; public interface UserEventService { void onEvent(Event event); }
UserService
import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import site.thesampleapp.domain.entity.User; public interface UserService { Flux
findAll(); Mono findById(String id); void save(User user); void deleteById(String id); } -
Commands:
As explained above, these are the data manipulation intents where all required information in order to perform this intent will be stored.
@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ @JsonSubTypes.Type(value = CreateUser.class, name = "create"), @JsonSubTypes.Type(value = UpdateUser.class, name = "update"), @JsonSubTypes.Type(value = DeleteUser.class, name = "delete") }) public interface Command { }
-
Events:
Same here, these are notification messages of events on the pass, and this will be the input to modify the read side.
import java.util.Date; @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ @JsonSubTypes.Type(value = UserCreated.class, name = "created"), @JsonSubTypes.Type(value = UserUpdated.class, name = "updated"), @JsonSubTypes.Type(value = UserDeleted.class, name = "deleted") }) public interface Event { String getId(); Date getTime(); }
-
Consumers:
The consumer is responsible to ingest messages from Kafka and for this application, two will be built one for consuming events and the other for commands.
CommandConsumer
import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import site.thesampleapp.domain.command.Command; import site.thesampleapp.domain.command.CreateUser; import site.thesampleapp.domain.command.DeleteUser; import site.thesampleapp.domain.command.UpdateUser; import site.thesampleapp.domain.event.UserCreated; import site.thesampleapp.domain.event.UserDeleted; import site.thesampleapp.domain.event.UserUpdated; import site.thesampleapp.domain.service.UserEventService; import site.thesampleapp.util.JsonSerializer; import java.util.Date; @Component @Slf4j @RequiredArgsConstructor public class CommandConsumer { private final JsonSerializer jsonSerializer; private final UserEventService userEventService; @KafkaListener(topics = "${command.topic_name}", groupId = "${command.consumer_group}", containerFactory = "commandKafkaListenerContainerFactory") public void onCommand(ConsumerRecord
record){ Date currentDate = new Date(); String key = record.key(); String commandMessage = record.value(); Command command = jsonSerializer.deSerialize(commandMessage, Command.class); switch (command){ case CreateUser createUser-> userEventService.onEvent(new UserCreated(key, createUser.fullName(), createUser.email(), currentDate)); case UpdateUser updateUser -> userEventService.onEvent(new UserUpdated(updateUser.id(), updateUser.fullName(),currentDate)); case DeleteUser deleteUser -> userEventService.onEvent(new UserDeleted(deleteUser.id(), currentDate)); default -> log.error("Invalid Command : ",command); } } } EventConsumer
import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import site.thesampleapp.domain.entity.User; import site.thesampleapp.domain.event.Event; import site.thesampleapp.domain.event.UserCreated; import site.thesampleapp.domain.event.UserDeleted; import site.thesampleapp.domain.event.UserUpdated; import site.thesampleapp.domain.service.UserService; import site.thesampleapp.util.JsonSerializer; @Component @Slf4j @RequiredArgsConstructor public class EventConsumer { private final JsonSerializer jsonSerializer; private final UserService userService; @KafkaListener(topics = "${event.topic_name}", groupId = "${event.consumer_group}", containerFactory = "eventKafkaListenerContainerFactory") public void onCommand(ConsumerRecord
record){ String eventMessage = record.value(); Event event = jsonSerializer.deSerialize(eventMessage,Event.class); switch (event){ case UserCreated userCreated -> userService.save(User.builder(). id(userCreated.id()). email(userCreated.email()). fullName(userCreated.fullName()).build()); case UserUpdated userUpdated -> userService.findById(userUpdated.id()).map(user->{ user.setFullName(userUpdated.fullName()); userService.save(user); return user; }); case UserDeleted userDeleted -> userService.deleteById(userDeleted.id()); default -> log.error("Invalid Event:",event); } } } -
Producers:
Since two consumers are going to be built, there two producers will be required one for commands and another for events.
CommandProducerKafkaImpl
import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import site.thesampleapp.domain.command.Command; import site.thesampleapp.domain.command.CreateUser; import site.thesampleapp.domain.command.DeleteUser; import site.thesampleapp.domain.command.UpdateUser; import site.thesampleapp.domain.messaging.CommandProducer; import site.thesampleapp.util.JsonSerializer; import java.util.UUID; @Component @RequiredArgsConstructor public class CommandProducerKafkaImpl implements CommandProducer { @Value("${command.topic_name}") private final String topicName; private final KafkaTemplate
kafkaTemplate; private final JsonSerializer jsonSerializer; @Override public void produce(Command command) { switch (command){ case CreateUser createUser -> { String userId = UUID.randomUUID().toString(); kafkaTemplate.send(topicName,userId,jsonSerializer.serialize(createUser)); } case UpdateUser updateUser -> kafkaTemplate.send(topicName,updateUser.id(),jsonSerializer.serialize(updateUser)); case DeleteUser deleteUser -> kafkaTemplate.send(topicName,deleteUser.id(),jsonSerializer.serialize(deleteUser)); default -> throw new IllegalStateException("Unexpected value: " + command); } } } EventProducerKafkaImpl
import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import site.thesampleapp.domain.event.Event; import site.thesampleapp.domain.event.UserCreated; import site.thesampleapp.domain.event.UserDeleted; import site.thesampleapp.domain.event.UserUpdated; import site.thesampleapp.domain.messaging.EventProducer; import site.thesampleapp.util.JsonSerializer; @Component @RequiredArgsConstructor public class EventProducerKafkaImpl implements EventProducer { @Value("${event.topic_name}") private final String topicName; private final KafkaTemplate
kafkaTemplate; private final JsonSerializer jsonSerializer; @Override public void produce(Event event) { switch (event){ case UserCreated userCreated -> kafkaTemplate.send(topicName,userCreated.id(),jsonSerializer.serialize(userCreated)); case UserUpdated userUpdated -> kafkaTemplate.send(topicName,userUpdated.id(),jsonSerializer.serialize(userUpdated)); case UserDeleted userDeleted -> kafkaTemplate.send(topicName,userDeleted.id(),jsonSerializer.serialize(userDeleted)); default -> throw new IllegalStateException("Unexpected value: " + event); } } } -
Rest Controller:
This component will be the main entry point of the microservice and creates separation between the read side for query usage, and the write side for data manipulation operations.
import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.*; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import site.thesampleapp.domain.command.CreateUser; import site.thesampleapp.domain.command.DeleteUser; import site.thesampleapp.domain.command.UpdateUser; import site.thesampleapp.domain.entity.User; import site.thesampleapp.domain.messaging.CommandProducer; import site.thesampleapp.domain.service.UserService; @RequiredArgsConstructor public class UserRestController { private final UserService userService; private final CommandProducer commandProducer; @GetMapping("/") public Flux
findAll() { return userService.findAll(); } @GetMapping("/{id}") public Mono findUserById(@PathVariable("id") String id) { return userService.findById(id); } @PostMapping("/") public void saverUser(@RequestParam(name = "fullName")String fullName, @RequestParam(name = "email") String email) { CreateUser createUser = new CreateUser(fullName,email); commandProducer.produce(createUser); } @PutMapping("/{id}") public void updateUser(@RequestParam(name = "id")String id, @RequestParam(name = "fullName")String fullName) { UpdateUser updateUser = new UpdateUser(id,fullName); commandProducer.produce(updateUser); } @DeleteMapping("/{id}") public void deleteUser(@PathVariable("id") String id) { commandProducer.produce(new DeleteUser(id)); } }
8. Configuration
To keep this sample very simple one Cassandra keyspace is used, however in production two separate databases should be used Database connection parameters are defined in the src/main/resources/application.properties of the service module
- spring.data.cassandra.keyspace-name - defines the Cassandra keyspace where your tables are located
- spring.data.cassandra.contact-points - defines the Cassandra the hosts that will be conecting to
- spring.data.cassandra.port - defines the Cassandra Port
- command.topic_name - defines Kafka Topic for Commands
- event.topic_name - defines Kafka Topic for Events