In this tutorial we will learn how to connect to a Kafka cluster from a Spring Boot REST Controller. As a proof of concept, we will set up a basic Web application which produces and consumes messages that will be streamed to Kafka.

First of all some basics: what is Apache Kafka? Apache Kafka is a Streaming Platform which provides some key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Apache Kafka is generally used for two types of applications:

  • Application which build real-time streaming data pipelines that reliably get data between systems or applications
  • Applications which transform or react to the streams of data

To understand how Kafka does these things, let's dive in a real example using Spring Boot.

First generate a project:

spring init -dweb,kafka kafka-demo

The following dependencies will be added to your project:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

Then, let's write a KafkaConsumer. You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation. We will use the latter option:

package com.masterspringboot;

import org.springframework.stereotype.Service;
import org.springframework.kafka.annotation.KafkaListener;

import java.io.IOException;
import java.util.logging.Logger;

@Service
public class KafkaConsumer {

    private static final Logger logger = Logger.getLogger(KafkaConsumer.class.getName());

    @KafkaListener(topics = "mytopic", groupId = "group_id")
    public void consume(String message) throws IOException {
        logger.info(String.format("Consumed message -> %s", message));
    }
}

Then, let's write a KafkaProducer. The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics:

package com.masterspringboot;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.util.logging.Logger;

    @Service
    public class KafkaProducer {

        private static final Logger logger = Logger.getLogger(KafkaProducer.class.getName());

        private static final String TOPIC = "mytopic";

        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;

        public void sendMessage(String message) {
            logger.info(String.format("Sending message -> %s", message));
            this.kafkaTemplate.send(TOPIC, message);
        }
}

The next component we will add is a simple RestController which will publish messages to the Kafka topic, as new GET requests (with a Path Variable) arrive under the "/kafka/publish/{message}" URI:

package com.masterspringboot;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final KafkaProducer producer;

    @Autowired
    KafkaController(KafkaProducer producer) {
        this.producer = producer;
    }

    @GetMapping(value = "/publish/{message}")
    public String sendMessageToKafkaTopic(@PathVariable ("message") String message) {
        this.producer.sendMessage(message);
        return "Message sent! check logs!";
    }
}

To run the application, a simple @SpringBootApplication is included:

package com.masterspringboot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SimpleKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(SimpleKafkaApplication.class, args);
    }
}

Starting Kafka

The simplest way to start Kafka is by means of a Docker Compose YAML file, which will take care to start both the Container image of Kafka and Zookeeper, which is needed for the Cluster Management. Here is a sample docker-compose.yaml file:

version: '2'

services:

  zookeeper:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/zookeeper-server-start.sh config/zookeeper.properties"
    ]
    ports:
      - "2181:2181"
    environment:
      LOG_DIR: /tmp/logs

  kafka:
    image: strimzi/kafka:0.11.3-kafka-2.1.0
    command: [
      "sh", "-c",
      "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
    ]
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      LOG_DIR: "/tmp/logs"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

From the same directory where you have saved the docker-compose.yaml file execute:

docker-compose up

Check from the Console that Kafka started successfully:

kafka_1      | [2019-11-04 07:58:50,051] INFO Kafka version : 2.1.0 (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2019-11-04 07:58:50,051] INFO Kafka commitId : 809be928f1ae004e (org.apache.kafka.common.utils.AppInfoParser)
kafka_1      | [2019-11-04 07:58:50,053] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Now you can start your Spring Boot application with:

mvn install spring-boot:run

Then, you can start sending and consuming messages with: http://localhost:9000/kafka/publish/hello

spring boot kafka spring boot kafka spring boot kafka

On the Console, you should see:

2019-11-04 09:01:18.624  INFO 16891 --- [ntainer#0-0-C-1] com.masterspringboot.KafkaConsumer       : Consumed message -> hello

Congratulations! You just managed to connect Apache Kafka with a Spring Boot application!

Source code for this article is available at: https://github.com/fmarchioni/masterspringboot/tree/master/kafka/kafka-demo

Java EE User?  if you are interested in the Enterprise version of this tutorial, check it out: WildFly and Kafka quickstart.

FREE WildFly Application Server - JBoss - Quarkus - Drools Tutorials