Using Spring Boot with Apache Kafka: Building Event-Driven Systems
Table of contents
Using Spring Boot with Apache Kafka: Building Event-Driven Systems
Event-driven architectures are becoming increasingly popular as organizations move towards more distributed and decoupled systems. Apache Kafka is a popular open-source distributed streaming platform that can be used to build event-driven systems. In this article, we will explore how to use Spring Boot with Apache Kafka to build event-driven systems.
What is Apache Kafka?
Apache Kafka is a distributed streaming platform that is used to build real-time data pipelines and streaming applications. It provides a unified, high-throughput, low-latency platform for handling real-time data feeds. Kafka can be used for a wide range of use cases, including:
Log aggregation
Stream processing
Event sourcing
Messaging
Kafka is a distributed system, which means it is designed to run across multiple servers, known as brokers. Each broker stores a subset of the data, and data is replicated across multiple brokers for fault tolerance.
Setting up a Kafka Cluster
Before we dive into using Kafka with Spring Boot, we need to set up a Kafka cluster. A Kafka cluster typically consists of one or more Kafka brokers, which are responsible for storing and managing data, and one or more ZooKeeper servers, which are used for coordination and configuration.
Setting up a Kafka Cluster with Docker
We can set up a Kafka cluster locally using Docker by running the following commands:
docker-compose up -d zookeeper
docker-compose up -d kafka
This will create a Kafka cluster with one ZooKeeper server and one Kafka broker.
Setting up a Kafka Cluster without Docker
To set up a Kafka cluster without Docker, we need to follow the following steps:
Download and install Kafka from the official website.
Start a ZooKeeper server:
bin/zookeeper-server-start.sh config/zookeeper.properties
Start one or more Kafka brokers:
bin/kafka-server-start.sh config/server.properties
By default, Kafka uses port 9092 for communication between brokers and clients.
Creating a Spring Boot Kafka Application
To create a Spring Boot Kafka application, we need to add the following dependencies to our pom.xml
file:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.6</version>
</dependency>
We can then create a simple Kafka producer that sends a message to a Kafka topic:
@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String topicName = "test-topic";
public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping("/publish")
public void sendMessage(@RequestBody String message) {
kafkaTemplate.send(topicName, message);
}
}
We can also create a Kafka consumer that listens to the same Kafka topic:
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "group_id")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
In the above code, KafkaProducerController
is a REST controller that exposes a /publish
endpoint that can be used to send messages to the test-topic
topic. The KafkaConsumer
class is a simple Kafka consumer that listens to the test-topic
topic and prints the received message to the console.
Configuring Kafka Properties in Spring Boot
To configure Kafka properties in a Spring Boot application, we can use the application.yml
or application.properties
file. We can configure the Kafka properties as follows:
spring:
kafka:
bootstrap-servers: localhost:9092
In the above code, we are specifying the Kafka bootstrap servers that our application should connect to. We can also specify other Kafka properties, such as the group ID, security protocol, SSL settings, and more.
Sending Messages to Kafka
To send messages to Kafka, we can use the KafkaTemplate
class provided by the Spring Kafka library. The KafkaTemplate
class is a high-level abstraction for sending messages to Kafka topics.
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("test-topic", message);
}
In the above code, we are injecting a KafkaTemplate
instance into our class using the @Autowired
annotation. We can then use the send
method to send a message to the test-topic
topic.
Consuming Messages from Kafka
To consume messages from Kafka, we can use the @KafkaListener
annotation provided by the SpringKafka library. The @KafkaListener
annotation is used to annotate a method that should be called when a new message is received on a specific Kafka topic.
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(String message) {
System.out.println("Received message: " + message);
}
In the above code, we are annotating a method with the @KafkaListener
annotation. This method will be called when a new message is received on the test-topic
topic. We are also specifying the group ID as test-group
.
Using Kafka Headers
Kafka headers are a key-value map that can be attached to messages. Headers can be used to add metadata to messages, such as correlation IDs, message IDs, or timestamps.
To add headers to a Kafka message, we can use the ProducerRecord
class provided by the Kafka library.
public void sendMessageWithHeaders(String message) {
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", message);
record.headers().add("correlation-id", "12345".getBytes());
kafkaTemplate.send(record);
}
In the above code, we are creating a new ProducerRecord
instance and adding a header with the key correlation-id
and the value 12345
. We can then send the message using the kafkaTemplate
.
To read headers from a Kafka message, we can use the @Header
annotation provided by the Spring Kafka library.
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consumeWithHeaders(@Payload String message,
@Header("correlation-id") String correlationId) {
System.out.println("Received message: " + message);
System.out.println("Correlation ID: " + correlationId);
}
In the above code, we are annotating a method with the @KafkaListener
annotation and specifying the @Header
annotation to read the correlation-id
header value.
Using Kafka Transactions
Kafka transactions provide atomicity and consistency guarantees when producing messages to Kafka. Transactions allow multiple Kafka producers to coordinate their actions to ensure that either all or none of the messages produced by a transaction are written to Kafka.
To use Kafka transactions in a Spring Boot application, we can use the KafkaTransactionManager
provided by the Spring Kafka library.
@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {
KafkaTransactionManager<String, String> transactionManager = new KafkaTransactionManager<>(producerFactory);
transactionManager.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return transactionManager;
}
In the above code, we are creating a new KafkaTransactionManager
instance and configuring it to use the SYNCHRONIZATION_ON_ACTUAL_TRANSACTION
synchronization policy.
We can then use the @Transactional
annotation provided by the Spring framework to wrap Kafka message production within a transaction.
@Transactional
public void sendMessageInTransaction(String message) {
kafkaTemplate.send("test-topic", message);
}
In the above code, we are annotating the sendMessageInTransaction
method with the @Transactional
annotation. This ensures that the Kafka message is produced within a transaction.
Conclusion
In this article, we have seen how to use Spring Boot with Apache Kafka to build event-driven systems. We started by discussing the benefits of using event-driven systems and Apache Kafka. We then saw how to configure Apache Kafka in a Spring Boot application using the spring-kafka
library. We also saw how to send and consume messages from Kafka using the KafkaTemplate
and @KafkaListener
annotations. Finally, we discussed how to use Kafka headers and transactions in a Spring Boot application.
With the help of the Spring Kafka library, building event-driven systems with Apache Kafka has become much simpler and more manageable. By leveraging Spring Boot's autoconfiguration and dependency management capabilities, we can quickly develop and deploy robust and scalable event-driven systems.