Spring CQRS/ES, Apache Kafka, Cassandra

Building high scalable microservices with CQRS and Event Sourcing Using Spring, Kafka and Cassandra

1. Overview

This article aims to provide readers with an in-depth understanding of Command Query Responsibility Segregation (CQRS) and Event Sourcing, along with a practical implementation using Spring Data (Apache Cassandra), Kafka and WebFlux. By the end of this tutorial, readers will have the knowledge and tools necessary to leverage the benefits of these concepts in their microservices architecture, you can checkout the sources from our repository

2. CQRS Pattern: Command Query Responsibility Segregation

In modern software development, building complex systems that efficiently handle both reads and writes can be a challenge. One pattern that has gained significant attention in recent years is Command Query Responsibility Segregation (CQRS). CQRS provides a way to separate the responsibilities of handling commands (write operations) and queries (read operations) in an application, leading to improved scalability, performance, and flexibility. In this article, we will delve into the CQRS pattern, its key principles, and the benefits it offers.

3. Event sourcing

Is a software architectural pattern that captures and persists all changes (events) made to an application's state as a sequence of immutable events. Instead of storing the current state of an entity, event sourcing focuses on storing the history of events that have occurred over time to derive the current state.

In event sourcing, every action or operation that modifies the state of the system is represented as an event. These events are appended to an event log or event store, which serves as the single source of truth for the application's state. The event log contains a sequential series of events that can be used to reconstruct the state of the system at any given point in time.

To derive the current state from the event log, the application replays the events in sequence, applying them one by one to create the current state. By replaying events, it becomes possible to rebuild the state of the system at any past point in time or perform complex queries by applying different combinations of events.

4. CQRS/ES:

(Command Query Responsibility Segregation / Event Sourcing) is a combination of two architectural patterns: CQRS and Event Sourcing.

When combined, CQRS and Event Sourcing provide several benefits:

  • Scalability: CQRS allows read and write models to scale independently, enabling better performance for both read-heavy and write-heavy workloads.
  • Flexibility: CQRS allows for different storage models and optimization strategies for the read and write sides, providing flexibility in choosing the most suitable data storage and retrieval mechanisms for each task.
  • Auditability and traceability: Event sourcing captures a complete history of events, facilitating audit trails and providing detailed insights into the system's behaviour.
  • Event-driven architecture: Event sourcing inherently aligns with event-driven architectures, as events serve as the integration points between different components or microservices.
  • Time travel and temporal querying: Event sourcing enables the ability to rebuild past states and perform temporal queries, which can be valuable for analysis and debugging.

5. Concepts

  • Command: an intent to make a write operation.
  • Event: a message that notifies that a write operation has occurred
  • Query: a read operation, that will not modify data.
  • 6. Designing the Architecture

    In order to implement a read and a write side, two data stores are used one for read purposes to allow optimizations for read operations, and another optimized for write operations where this will be an append-only data storage, to solve the integration between commands and events, Kafka topics are being used.

    7. Implementation of the service:

    • Read Side:

      This is a simple Spring Reactive Data Cassandra Repository implementation where the data will be queried from a Cassandra database, this database will be the read side and is also referred to as read projection.

    • Write Side:

      To achieve a complete decoupling and scalability of the overall system, a message-oriented approach is employed. Apache Kafka serves as the message hub in this implementation, enabling seamless communication and scalability. The system generates command messages, which subsequently produce events stored in the event store. In this specific implementation, another Cassandra Database is used as the event store. These events drive the changes on the read side of the system, ensuring near real-time updates. This architectural design facilitates efficient event processing and empowers the read side to reflect the generated events in a timely manner.

    • Entities:

      User: Read the side definition of the User

                              
      import lombok.*;
      import org.springframework.data.cassandra.core.mapping.PrimaryKey;
      import org.springframework.data.cassandra.core.mapping.Table;
      
      @Getter
      @Setter
      @Builder
      @NoArgsConstructor
      @AllArgsConstructor
      @Table
      public class User {
          @PrimaryKey
          private String id;
          private String fullName;
          private String email;
      }                 
                              
                          

      EventData: Event-related information

                              
      import lombok.*;
      import org.springframework.data.cassandra.core.mapping.PrimaryKey;
      import org.springframework.data.cassandra.core.mapping.Table;
      
      import java.util.Date;
      
      @Getter
      @Setter
      @Builder
      @NoArgsConstructor
      @AllArgsConstructor
      @Table
      public class EventData {
          @PrimaryKey
          private String id;
          private String data;
          private Date time;
      }               
                              
                          
    • Repositories:

      Spring data will be used to simplify Cassandra related data operations

      EventDataRepository

                              
      import org.springframework.data.cassandra.repository.ReactiveCassandraRepository;
      import org.springframework.stereotype.Repository;
      import site.thesampleapp.domain.entity.EventData;
      
      @Repository
      public interface EventDataRepository extends ReactiveCassandraRepository {
      }              
                              
                          

      UserRepository

                              
      import org.springframework.data.cassandra.repository.ReactiveCassandraRepository;
      import org.springframework.stereotype.Repository;
      import site.thesampleapp.domain.entity.User;
      @Repository
      public interface UserRepository extends ReactiveCassandraRepository {
      }
                  
                              
                          
    • Services:

      The service layer will take care of business logic

      UserEventService

                              
      import site.thesampleapp.domain.event.Event;
      public interface UserEventService {
          void onEvent(Event event);
      }             
                              
                          

      UserService

                              
      import reactor.core.publisher.Flux;
      import reactor.core.publisher.Mono;
      import site.thesampleapp.domain.entity.User;
      
      public interface UserService {
          Flux findAll();
          Mono findById(String id);
          void save(User user);
          void deleteById(String id);
      }
                  
                              
                          
    • Commands:

      As explained above, these are the data manipulation intents where all required information in order to perform this intent will be stored.

                              
      @JsonTypeInfo(
              use = JsonTypeInfo.Id.NAME,
              property = "type")
      @JsonSubTypes({
              @JsonSubTypes.Type(value = CreateUser.class, name = "create"),
              @JsonSubTypes.Type(value = UpdateUser.class, name = "update"),
              @JsonSubTypes.Type(value = DeleteUser.class, name = "delete")
      })
      public interface Command {
      }            
                              
                          
    • Events:

      Same here, these are notification messages of events on the pass, and this will be the input to modify the read side.

                              
      import java.util.Date;
      @JsonTypeInfo(
              use = JsonTypeInfo.Id.NAME,
              property = "type")
      @JsonSubTypes({
              @JsonSubTypes.Type(value = UserCreated.class, name = "created"),
              @JsonSubTypes.Type(value = UserUpdated.class, name = "updated"),
              @JsonSubTypes.Type(value = UserDeleted.class, name = "deleted")
      })
      public interface Event {
          String getId();
          Date getTime();
      }        
                              
                          
    • Consumers:

      The consumer is responsible to ingest messages from Kafka and for this application, two will be built one for consuming events and the other for commands.

      CommandConsumer

                              
      import lombok.RequiredArgsConstructor;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.springframework.kafka.annotation.KafkaListener;
      import org.springframework.stereotype.Component;
      import site.thesampleapp.domain.command.Command;
      import site.thesampleapp.domain.command.CreateUser;
      import site.thesampleapp.domain.command.DeleteUser;
      import site.thesampleapp.domain.command.UpdateUser;
      import site.thesampleapp.domain.event.UserCreated;
      import site.thesampleapp.domain.event.UserDeleted;
      import site.thesampleapp.domain.event.UserUpdated;
      import site.thesampleapp.domain.service.UserEventService;
      import site.thesampleapp.util.JsonSerializer;
      
      import java.util.Date;
      
      @Component
      @Slf4j
      @RequiredArgsConstructor
      public class CommandConsumer {
          private final JsonSerializer jsonSerializer;
          private final UserEventService userEventService;
          @KafkaListener(topics = "${command.topic_name}",
                  groupId = "${command.consumer_group}",
                  containerFactory = "commandKafkaListenerContainerFactory")
          public void onCommand(ConsumerRecord record){
              Date currentDate = new Date();
              String key = record.key();
              String commandMessage = record.value();
              Command command = jsonSerializer.deSerialize(commandMessage, Command.class);
              switch (command){
                  case CreateUser createUser->
                          userEventService.onEvent(new UserCreated(key, createUser.fullName(), createUser.email(), currentDate));
                  case UpdateUser updateUser ->
                          userEventService.onEvent(new UserUpdated(updateUser.id(), updateUser.fullName(),currentDate));
                  case DeleteUser deleteUser ->
                          userEventService.onEvent(new UserDeleted(deleteUser.id(), currentDate));
                  default -> log.error("Invalid Command : ",command);
              }
          }
      }           
                              
                          

      EventConsumer

                              
      
      import lombok.RequiredArgsConstructor;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.springframework.kafka.annotation.KafkaListener;
      import org.springframework.stereotype.Component;
      import site.thesampleapp.domain.entity.User;
      import site.thesampleapp.domain.event.Event;
      import site.thesampleapp.domain.event.UserCreated;
      import site.thesampleapp.domain.event.UserDeleted;
      import site.thesampleapp.domain.event.UserUpdated;
      import site.thesampleapp.domain.service.UserService;
      import site.thesampleapp.util.JsonSerializer;
      
      @Component
      @Slf4j
      @RequiredArgsConstructor
      public class EventConsumer {
          private final JsonSerializer jsonSerializer;
          private final UserService userService;
          @KafkaListener(topics = "${event.topic_name}",
                  groupId = "${event.consumer_group}",
                  containerFactory = "eventKafkaListenerContainerFactory")
          public void onCommand(ConsumerRecord record){
              String eventMessage = record.value();
              Event event = jsonSerializer.deSerialize(eventMessage,Event.class);
              switch (event){
                  case UserCreated userCreated ->
                          userService.save(User.builder().
                                  id(userCreated.id()).
                                  email(userCreated.email()).
                                  fullName(userCreated.fullName()).build());
                  case UserUpdated userUpdated -> userService.findById(userUpdated.id()).map(user->{
                      user.setFullName(userUpdated.fullName());
                      userService.save(user);
                      return user;
                  });
                  case UserDeleted userDeleted -> userService.deleteById(userDeleted.id());
                  default -> log.error("Invalid Event:",event);
              }
          }
      }
                  
                              
                          
    • Producers:

      Since two consumers are going to be built, there two producers will be required one for commands and another for events.

      CommandProducerKafkaImpl

                              
      
      import lombok.RequiredArgsConstructor;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Component;
      import site.thesampleapp.domain.command.Command;
      import site.thesampleapp.domain.command.CreateUser;
      import site.thesampleapp.domain.command.DeleteUser;
      import site.thesampleapp.domain.command.UpdateUser;
      import site.thesampleapp.domain.messaging.CommandProducer;
      import site.thesampleapp.util.JsonSerializer;
      
      import java.util.UUID;
      
      @Component
      @RequiredArgsConstructor
      public class CommandProducerKafkaImpl implements CommandProducer {
          @Value("${command.topic_name}")
          private final String topicName;
          private final KafkaTemplate kafkaTemplate;
          private final JsonSerializer jsonSerializer;
          @Override
          public void produce(Command command) {
              switch (command){
                  case CreateUser createUser -> {
                      String userId = UUID.randomUUID().toString();
                      kafkaTemplate.send(topicName,userId,jsonSerializer.serialize(createUser));
                  }
                  case UpdateUser updateUser ->
                          kafkaTemplate.send(topicName,updateUser.id(),jsonSerializer.serialize(updateUser));
                  case DeleteUser deleteUser ->
                          kafkaTemplate.send(topicName,deleteUser.id(),jsonSerializer.serialize(deleteUser));
                  default -> throw new IllegalStateException("Unexpected value: " + command);
              }
          }
      }         
                              
                          

      EventProducerKafkaImpl

                              
      
      import lombok.RequiredArgsConstructor;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Component;
      import site.thesampleapp.domain.event.Event;
      import site.thesampleapp.domain.event.UserCreated;
      import site.thesampleapp.domain.event.UserDeleted;
      import site.thesampleapp.domain.event.UserUpdated;
      import site.thesampleapp.domain.messaging.EventProducer;
      import site.thesampleapp.util.JsonSerializer;
      
      @Component
      @RequiredArgsConstructor
      public class EventProducerKafkaImpl implements EventProducer {
          @Value("${event.topic_name}")
          private final String topicName;
          private final KafkaTemplate kafkaTemplate;
          private final JsonSerializer jsonSerializer;
          @Override
          public void produce(Event event) {
              switch (event){
                  case UserCreated userCreated ->
                          kafkaTemplate.send(topicName,userCreated.id(),jsonSerializer.serialize(userCreated));
                  case UserUpdated userUpdated ->
                          kafkaTemplate.send(topicName,userUpdated.id(),jsonSerializer.serialize(userUpdated));
                  case UserDeleted userDeleted ->
                          kafkaTemplate.send(topicName,userDeleted.id(),jsonSerializer.serialize(userDeleted));
                  default -> throw new IllegalStateException("Unexpected value: " + event);
              }
          }
      }
                  
                              
                          
    • Rest Controller:

      This component will be the main entry point of the microservice and creates separation between the read side for query usage, and the write side for data manipulation operations.

                              
      
      import lombok.RequiredArgsConstructor;
      import org.springframework.web.bind.annotation.*;
      import reactor.core.publisher.Flux;
      import reactor.core.publisher.Mono;
      import site.thesampleapp.domain.command.CreateUser;
      import site.thesampleapp.domain.command.DeleteUser;
      import site.thesampleapp.domain.command.UpdateUser;
      import site.thesampleapp.domain.entity.User;
      import site.thesampleapp.domain.messaging.CommandProducer;
      import site.thesampleapp.domain.service.UserService;
      
      @RequiredArgsConstructor
      public class UserRestController {
          private final UserService userService;
          private final CommandProducer commandProducer;
          @GetMapping("/")
          public Flux findAll() {
              return userService.findAll();
          }
      
          @GetMapping("/{id}")
          public Mono findUserById(@PathVariable("id") String id) {
              return userService.findById(id);
          }
      
          @PostMapping("/")
          public void saverUser(@RequestParam(name = "fullName")String fullName,
                                      @RequestParam(name = "email") String email) {
              CreateUser createUser = new CreateUser(fullName,email);
              commandProducer.produce(createUser);
          }
      
          @PutMapping("/{id}")
          public void updateUser(@RequestParam(name = "id")String id,
                                      @RequestParam(name = "fullName")String fullName) {
              UpdateUser updateUser = new UpdateUser(id,fullName);
              commandProducer.produce(updateUser);
          }
      
          @DeleteMapping("/{id}")
          public void deleteUser(@PathVariable("id") String id) {
              commandProducer.produce(new DeleteUser(id));
          }
      
      }     
                              
                          

    8. Configuration

    To keep this sample very simple one Cassandra keyspace is used, however in production two separate databases should be used Database connection parameters are defined in the src/main/resources/application.properties of the service module

    • spring.data.cassandra.keyspace-name - defines the Cassandra keyspace where your tables are located
    • spring.data.cassandra.contact-points - defines the Cassandra the hosts that will be conecting to
    • spring.data.cassandra.port - defines the Cassandra Port
    • command.topic_name - defines Kafka Topic for Commands
    • event.topic_name - defines Kafka Topic for Events