Spring Kafka Producer/Consumer

How to build a Spring Kafka Producer/Consumer

1. Overview

This tutorial will explain how to implement the most important parts of a message oriented architecture using as a message bus one the most used event streaming platform, Kafka, we will use spring to allow an easy integration to your application the source code will be on Java / Kotlin / Scala and can be found in our repository

2. Kafka

Apache Kafka is a distributed event store and stream-processing platform. It is an open-source system developed by the Apache Software Foundation written in Java and Scala. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Kafka can connect to external systems (for data import/export) via Kafka Connect, and provides the Kafka Streams libraries for stream processing applications. Kafka uses a binary TCP-based protocol that is optimized for efficiency and relies on a "message set" abstraction that naturally groups messages together to reduce the overhead of the network roundtrip. This "leads to larger network packets, larger sequential disk operations, contiguous memory blocks which allows Kafka to turn a bursty stream of random message writes into linear writes.

3. Topics and partitions

Since Kafka is an event store you can picture it as a database where the database will be the topic and partitions will be tables and the events will be the records, the most important is that what defines an event to go a specific partition is the key, therefore all events with the same key will go to the same partition, and each topic will have different partitions, and the more partitions the more parallelism your app can take benefit from

Kafka topic and partitions

Topic Creation using kafka command line utilities:

$bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

4. Consumer Groups

To achieve high parallelism on kafka one of the most common methodologies is using consumer groups, which In one consumer group, each partition will be processed by one consumer only. and These are the possible scenarios:

  • Number of consumers is less than number of topic partitions then multiple partitions can be assigned to one of the consumers in the group Kafka Consumer Group less consumers than partitions
  • Number of consumers same as number of topic partitions, then partition and consumer mapping can be like below Kafka Consumer Group same number of consumers and partitions
                

import lombok.AllArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;

@AllArgsConstructor
public class EventProducerKafkaImpl implements EventProducer{
    private final KafkaTemplate kafkaTemplate;
    @Value("${kafka.topic.name}")
    private final String topicName;
    @Override
    public void produce(String eventKey, String eventValue) {
        kafkaTemplate.send(topicName,eventKey,eventValue);
    }
}                                       
                
            

EventProducerKafkaImpl.java

                

import org.springframework.beans.factory.annotation.Value
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component

interface EventProducer {
    fun produce(eventKey: String, eventValue: String): Unit
}
@Component
class EventProducerKafkaImpl(val kafkaTemplate: KafkaTemplate,
                             @Value("${kafka.topic.name}") val topicName: String) : EventProducer{

    override fun produce(eventKey: String, eventValue: String) {
        kafkaTemplate.send(topicName,eventKey,eventValue)
    }
}

                    
                
            

EventProducer.kt

                
import org.springframework.beans.factory.annotation.Value
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component

trait EventProducer {
  def produce(eventKey: String, eventValue: String): Unit
}

@Component
class EventProducerImpl(kafkaTemplate: KafkaTemplate[String, String],
                        @Value("${kafka.topic.name}") val topicName: String) extends EventProducer {
  override def produce (eventKey: String, eventValue: String): Unit =
    kafkaTemplate.send (topicName, eventKey, eventValue)
}

                    
                
            

EventProducer.scala

6. Producer Configuration

Using annotation based configuration, we will be able to build the kafka template instance

                
@Bean
public ProducerFactory producerFactory(String bootstrapAddress) {
    Map configProps = new HashMap<>();
    configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
    configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
    configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate kafkaTemplate(@Value("${kafka.bootstrap.address}") String bootstrapAddress) {
        return new KafkaTemplate<>(producerFactory(bootstrapAddress));
}                                    
                
            

KafkaConfiguration.java

                

@Bean
fun producerFactory(bootstrapAddress: String): ProducerFactory? {
    val configProps: MutableMap = HashMap()
    configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
    configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
    configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
    return DefaultKafkaProducerFactory(configProps)
}

@Bean
fun kafkaTemplate(@Value("${kafka.bootstrap.address}") bootstrapAddress: String): KafkaTemplate? {
    return KafkaTemplate(producerFactory(bootstrapAddress))
}

                    
                
            

KafkaConfiguration.kt

                

@Bean def producerFactory(bootstrapAddress: String): ProducerFactory[String, String] = {
    val configProps = new util.HashMap[String, AnyRef]
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    new DefaultKafkaProducerFactory[String, String](configProps)
}
@Bean def kafkaTemplate(@Value("${kafka.bootstrap.address}") bootstrapAddress: String) = new KafkaTemplate[String, String](producerFactory(bootstrapAddress))

                    
                
            

KafkaConfiguration.scala

7. Consumer

The event consumer is the component that will have the responsibility of read the events using the annotation @KafkaListener which will especified

                

import org.springframework.kafka.annotation.KafkaListener;

public class EventListener {
    @KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.consumer.group}")
    public void onEvent(String eventMessage){
        //process the event
    }
}                                   
                
            

EventListener.java

                

import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class EventListener {
    @KafkaListener(topics = ["${kafka.topic.name}"], groupId = "${kafka.consumer.group}")
    fun onEvent(eventMessage: String?) {
        //process the event
    }
}

                    
                
            

EventListener.kt

                
import org.springframework.kafka.annotation.KafkaListener

class EventListener {
  @KafkaListener(topics = Array("${kafka.topic.name}"), groupId = "${kafka.consumer.group}")
  def onEvent(eventMessage: String): Unit = {
    //process the event
  }
}

                    
                
            

EventListener.scala

8. Consumer Configuration

Using annotation based configuration, we will be able to build the kafka listener instance

                

public ConsumerFactory consumerFactory(
            String bootstrapAddress,
            String consumerGroup) {
    Map props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
        @Value("${kafka.bootstrap.address}") String bootstrapAddress,
        @Value("${kafka.consumer.group}") String consumerGroup) {
    ConcurrentKafkaListenerContainerFactory
            factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory(bootstrapAddress,consumerGroup));
    return factory;
}                                
                
            

KafkaConfiguration.java

                

fun consumerFactory(
    bootstrapAddress: String,
    consumerGroup: String
): ConsumerFactory {
    val props: MutableMap = HashMap()
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
    props[ConsumerConfig.GROUP_ID_CONFIG] = consumerGroup
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    return DefaultKafkaConsumerFactory(props)
}

@Bean
fun kafkaListenerContainerFactory(
    @Value("${kafka.bootstrap.address}") bootstrapAddress: String,
    @Value("${kafka.consumer.group}") consumerGroup: String
): ConcurrentKafkaListenerContainerFactory? {
    val factory = ConcurrentKafkaListenerContainerFactory()
    factory.consumerFactory = consumerFactory(bootstrapAddress, consumerGroup)
    return factory
}

                    
                
            

KafkaConfiguration.kt

                

def consumerFactory(bootstrapAddress: String, consumerGroup: String): ConsumerFactory[String, String] = {
    val props = new util.HashMap[String, AnyRef]
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
  props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
  new DefaultKafkaConsumerFactory[String, String](props)
}
@Bean def kafkaListenerContainerFactory(@Value("${kafka.bootstrap.address}") bootstrapAddress: String, @Value("${kafka.consumer.group}") consumerGroup: String): ConcurrentKafkaListenerContainerFactory[String, String] = {
  val factory = new ConcurrentKafkaListenerContainerFactory[String, String]
  factory.setConsumerFactory(consumerFactory(bootstrapAddress, consumerGroup))
  factory
}

                    
                
            

KafkaConfiguration.scala

9. Source Code

Checkout the whole source code for the following programming languages: