How to build a Spring Kafka Producer/Consumer
1. Overview
This tutorial will explain how to implement the most important parts of a message oriented architecture using as a message bus one the most used event streaming platform, Kafka, we will use spring to allow an easy integration to your application the source code will be on Java / Kotlin / Scala and can be found in our repository
2. Kafka
Apache Kafka is a distributed event store and stream-processing platform. It is an open-source system developed by the Apache Software Foundation written in Java and Scala. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Kafka can connect to external systems (for data import/export) via Kafka Connect, and provides the Kafka Streams libraries for stream processing applications. Kafka uses a binary TCP-based protocol that is optimized for efficiency and relies on a "message set" abstraction that naturally groups messages together to reduce the overhead of the network roundtrip. This "leads to larger network packets, larger sequential disk operations, contiguous memory blocks which allows Kafka to turn a bursty stream of random message writes into linear writes.
3. Topics and partitions
Since Kafka is an event store you can picture it as a database where the database will be the topic and partitions will be tables and the events will be the records, the most important is that what defines an event to go a specific partition is the key, therefore all events with the same key will go to the same partition, and each topic will have different partitions, and the more partitions the more parallelism your app can take benefit from
Topic Creation using kafka command line utilities:
$bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
4. Consumer Groups
To achieve high parallelism on kafka one of the most common methodologies is using consumer groups, which In one consumer group, each partition will be processed by one consumer only. and These are the possible scenarios:
- Number of consumers is less than number of topic partitions then multiple partitions can be assigned to one of the consumers in the group
- Number of consumers same as number of topic partitions, then partition and consumer mapping can be like below
5. Producer
The event producer is the component that will have the responsibility of writing the events, this task is achieve by using the Spring Kafka template, for this example we will send String messages, however you can send JSON, Avro, Protobuf or any other supported message by Kafka, you can specified the type of Key and Value using the serializer/deserializer settings
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
@AllArgsConstructor
public class EventProducerKafkaImpl implements EventProducer{
private final KafkaTemplate kafkaTemplate;
@Value("${kafka.topic.name}")
private final String topicName;
@Override
public void produce(String eventKey, String eventValue) {
kafkaTemplate.send(topicName,eventKey,eventValue);
}
}
EventProducerKafkaImpl.java
import org.springframework.beans.factory.annotation.Value
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component
interface EventProducer {
fun produce(eventKey: String, eventValue: String): Unit
}
@Component
class EventProducerKafkaImpl(val kafkaTemplate: KafkaTemplate,
@Value("${kafka.topic.name}") val topicName: String) : EventProducer{
override fun produce(eventKey: String, eventValue: String) {
kafkaTemplate.send(topicName,eventKey,eventValue)
}
}
EventProducer.kt
import org.springframework.beans.factory.annotation.Value
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
trait EventProducer {
def produce(eventKey: String, eventValue: String): Unit
}
@Component
class EventProducerImpl(kafkaTemplate: KafkaTemplate[String, String],
@Value("${kafka.topic.name}") val topicName: String) extends EventProducer {
override def produce (eventKey: String, eventValue: String): Unit =
kafkaTemplate.send (topicName, eventKey, eventValue)
}
EventProducer.scala
6. Producer Configuration
Using annotation based configuration, we will be able to build the kafka template instance
@Bean
public ProducerFactory producerFactory(String bootstrapAddress) {
Map configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate kafkaTemplate(@Value("${kafka.bootstrap.address}") String bootstrapAddress) {
return new KafkaTemplate<>(producerFactory(bootstrapAddress));
}
KafkaConfiguration.java
@Bean
fun producerFactory(bootstrapAddress: String): ProducerFactory? {
val configProps: MutableMap = HashMap()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate(@Value("${kafka.bootstrap.address}") bootstrapAddress: String): KafkaTemplate? {
return KafkaTemplate(producerFactory(bootstrapAddress))
}
KafkaConfiguration.kt
@Bean def producerFactory(bootstrapAddress: String): ProducerFactory[String, String] = {
val configProps = new util.HashMap[String, AnyRef]
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
new DefaultKafkaProducerFactory[String, String](configProps)
}
@Bean def kafkaTemplate(@Value("${kafka.bootstrap.address}") bootstrapAddress: String) = new KafkaTemplate[String, String](producerFactory(bootstrapAddress))
KafkaConfiguration.scala
7. Consumer
The event consumer is the component that will have the responsibility of read the events using the annotation @KafkaListener which will especified
import org.springframework.kafka.annotation.KafkaListener;
public class EventListener {
@KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.consumer.group}")
public void onEvent(String eventMessage){
//process the event
}
}
EventListener.java
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class EventListener {
@KafkaListener(topics = ["${kafka.topic.name}"], groupId = "${kafka.consumer.group}")
fun onEvent(eventMessage: String?) {
//process the event
}
}
EventListener.kt
import org.springframework.kafka.annotation.KafkaListener
class EventListener {
@KafkaListener(topics = Array("${kafka.topic.name}"), groupId = "${kafka.consumer.group}")
def onEvent(eventMessage: String): Unit = {
//process the event
}
}
EventListener.scala
8. Consumer Configuration
Using annotation based configuration, we will be able to build the kafka listener instance
public ConsumerFactory consumerFactory(
String bootstrapAddress,
String consumerGroup) {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
@Value("${kafka.bootstrap.address}") String bootstrapAddress,
@Value("${kafka.consumer.group}") String consumerGroup) {
ConcurrentKafkaListenerContainerFactory
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(bootstrapAddress,consumerGroup));
return factory;
}
KafkaConfiguration.java
fun consumerFactory(
bootstrapAddress: String,
consumerGroup: String
): ConsumerFactory {
val props: MutableMap = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapAddress
props[ConsumerConfig.GROUP_ID_CONFIG] = consumerGroup
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(props)
}
@Bean
fun kafkaListenerContainerFactory(
@Value("${kafka.bootstrap.address}") bootstrapAddress: String,
@Value("${kafka.consumer.group}") consumerGroup: String
): ConcurrentKafkaListenerContainerFactory? {
val factory = ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = consumerFactory(bootstrapAddress, consumerGroup)
return factory
}
KafkaConfiguration.kt
def consumerFactory(bootstrapAddress: String, consumerGroup: String): ConsumerFactory[String, String] = {
val props = new util.HashMap[String, AnyRef]
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress)
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
new DefaultKafkaConsumerFactory[String, String](props)
}
@Bean def kafkaListenerContainerFactory(@Value("${kafka.bootstrap.address}") bootstrapAddress: String, @Value("${kafka.consumer.group}") consumerGroup: String): ConcurrentKafkaListenerContainerFactory[String, String] = {
val factory = new ConcurrentKafkaListenerContainerFactory[String, String]
factory.setConsumerFactory(consumerFactory(bootstrapAddress, consumerGroup))
factory
}
KafkaConfiguration.scala