Using Spring Boot with Apache Kafka: Building Event-Driven Systems

Using Spring Boot with Apache Kafka: Building Event-Driven Systems

Event-driven architectures are becoming increasingly popular as organizations move towards more distributed and decoupled systems. Apache Kafka is a popular open-source distributed streaming platform that can be used to build event-driven systems. In this article, we will explore how to use Spring Boot with Apache Kafka to build event-driven systems.

What is Apache Kafka?

Apache Kafka is a distributed streaming platform that is used to build real-time data pipelines and streaming applications. It provides a unified, high-throughput, low-latency platform for handling real-time data feeds. Kafka can be used for a wide range of use cases, including:

  • Log aggregation

  • Stream processing

  • Event sourcing

  • Messaging

Kafka is a distributed system, which means it is designed to run across multiple servers, known as brokers. Each broker stores a subset of the data, and data is replicated across multiple brokers for fault tolerance.

Setting up a Kafka Cluster

Before we dive into using Kafka with Spring Boot, we need to set up a Kafka cluster. A Kafka cluster typically consists of one or more Kafka brokers, which are responsible for storing and managing data, and one or more ZooKeeper servers, which are used for coordination and configuration.

Setting up a Kafka Cluster with Docker

We can set up a Kafka cluster locally using Docker by running the following commands:

docker-compose up -d zookeeper
docker-compose up -d kafka

This will create a Kafka cluster with one ZooKeeper server and one Kafka broker.

Setting up a Kafka Cluster without Docker

To set up a Kafka cluster without Docker, we need to follow the following steps:

  1. Download and install Kafka from the official website.

  2. Start a ZooKeeper server:

     bin/zookeeper-server-start.sh config/zookeeper.properties
    
  3. Start one or more Kafka brokers:

     bin/kafka-server-start.sh config/server.properties
    

    By default, Kafka uses port 9092 for communication between brokers and clients.

Creating a Spring Boot Kafka Application

To create a Spring Boot Kafka application, we need to add the following dependencies to our pom.xml file:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.7.3</version>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
  <version>2.7.6</version>
</dependency>

We can then create a simple Kafka producer that sends a message to a Kafka topic:

@RestController
@RequestMapping("/kafka")
public class KafkaProducerController {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final String topicName = "test-topic";

    public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/publish")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send(topicName, message);
    }
}

We can also create a Kafka consumer that listens to the same Kafka topic:

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "test-topic", groupId = "group_id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

In the above code, KafkaProducerController is a REST controller that exposes a /publish endpoint that can be used to send messages to the test-topic topic. The KafkaConsumer class is a simple Kafka consumer that listens to the test-topic topic and prints the received message to the console.

Configuring Kafka Properties in Spring Boot

To configure Kafka properties in a Spring Boot application, we can use the application.yml or application.properties file. We can configure the Kafka properties as follows:

spring:
  kafka:
    bootstrap-servers: localhost:9092

In the above code, we are specifying the Kafka bootstrap servers that our application should connect to. We can also specify other Kafka properties, such as the group ID, security protocol, SSL settings, and more.

Sending Messages to Kafka

To send messages to Kafka, we can use the KafkaTemplate class provided by the Spring Kafka library. The KafkaTemplate class is a high-level abstraction for sending messages to Kafka topics.

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(String message) {
    kafkaTemplate.send("test-topic", message);
}

In the above code, we are injecting a KafkaTemplate instance into our class using the @Autowired annotation. We can then use the send method to send a message to the test-topic topic.

Consuming Messages from Kafka

To consume messages from Kafka, we can use the @KafkaListener annotation provided by the SpringKafka library. The @KafkaListener annotation is used to annotate a method that should be called when a new message is received on a specific Kafka topic.

@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(String message) {
    System.out.println("Received message: " + message);
}

In the above code, we are annotating a method with the @KafkaListener annotation. This method will be called when a new message is received on the test-topic topic. We are also specifying the group ID as test-group.

Using Kafka Headers

Kafka headers are a key-value map that can be attached to messages. Headers can be used to add metadata to messages, such as correlation IDs, message IDs, or timestamps.

To add headers to a Kafka message, we can use the ProducerRecord class provided by the Kafka library.

public void sendMessageWithHeaders(String message) {
    ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", message);
    record.headers().add("correlation-id", "12345".getBytes());
    kafkaTemplate.send(record);
}

In the above code, we are creating a new ProducerRecord instance and adding a header with the key correlation-id and the value 12345. We can then send the message using the kafkaTemplate.

To read headers from a Kafka message, we can use the @Header annotation provided by the Spring Kafka library.

@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consumeWithHeaders(@Payload String message,
                               @Header("correlation-id") String correlationId) {
    System.out.println("Received message: " + message);
    System.out.println("Correlation ID: " + correlationId);
}

In the above code, we are annotating a method with the @KafkaListener annotation and specifying the @Header annotation to read the correlation-id header value.

Using Kafka Transactions

Kafka transactions provide atomicity and consistency guarantees when producing messages to Kafka. Transactions allow multiple Kafka producers to coordinate their actions to ensure that either all or none of the messages produced by a transaction are written to Kafka.

To use Kafka transactions in a Spring Boot application, we can use the KafkaTransactionManager provided by the Spring Kafka library.

@Bean
public KafkaTransactionManager<String, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {
    KafkaTransactionManager<String, String> transactionManager = new KafkaTransactionManager<>(producerFactory);
    transactionManager.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return transactionManager;
}

In the above code, we are creating a new KafkaTransactionManager instance and configuring it to use the SYNCHRONIZATION_ON_ACTUAL_TRANSACTION synchronization policy.

We can then use the @Transactional annotation provided by the Spring framework to wrap Kafka message production within a transaction.

@Transactional
public void sendMessageInTransaction(String message) {
    kafkaTemplate.send("test-topic", message);
}

In the above code, we are annotating the sendMessageInTransaction method with the @Transactional annotation. This ensures that the Kafka message is produced within a transaction.

Conclusion

In this article, we have seen how to use Spring Boot with Apache Kafka to build event-driven systems. We started by discussing the benefits of using event-driven systems and Apache Kafka. We then saw how to configure Apache Kafka in a Spring Boot application using the spring-kafka library. We also saw how to send and consume messages from Kafka using the KafkaTemplate and @KafkaListener annotations. Finally, we discussed how to use Kafka headers and transactions in a Spring Boot application.

With the help of the Spring Kafka library, building event-driven systems with Apache Kafka has become much simpler and more manageable. By leveraging Spring Boot's autoconfiguration and dependency management capabilities, we can quickly develop and deploy robust and scalable event-driven systems.