Sunday, May 16, 2021

Kafka Producer and Consumer Example using Spring Boot

Kafka Producer and Consumer Example using Spring Boot

In this post, we will see how to create a Kafka Producer application which publishes messages to a kafka topic and a Kafka Consumer application which subscribes and consumes messages from the kafka topic.

Pre-requisites for below tutorial:

  • JDK 1.8 or later
  • Maven
  • Git

This example requires below two applications.
  1. Kafka Producer
  2. Kafka Consumer

1. Create Kafka Producer application

Visit the Spring Initializr to generate new project with required dependencies.

Below kafka producer application produces a message to kafka topic every 1 sec.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.5</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>info.code2learn</groupId>
	<artifactId>kafka-producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafka-producer</name>
	<description>Kafka Producer Example</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>



KafkaProducerApplication.java

package info.code2learn.kafkaproducer;

import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication {
	
	private static final Logger log = LoggerFactory.getLogger(KafkaProducerApplication.class);
	private static final String TOPIC = "topic1";
	
	private Integer counter = 0;
	
	@Autowired
	KafkaTemplate<String, String> kafkaTemplate;

	public static void main(String[] args) {
		SpringApplication.run(KafkaProducerApplication.class, args);
	}
	
	@Bean
    public NewTopic topic() {
        return TopicBuilder.name(TOPIC)
                .partitions(10)
                .replicas(1)
                .build();
    }
	
	@Scheduled(fixedDelay = 1000)
	public void produce() {
		String message = "Hello " + counter++;
		log.info("Sending message to topic : {}, message: {}", TOPIC, message);
		kafkaTemplate.send(TOPIC, String.valueOf(message.hashCode()), message);
	}

}




application.yml

server:
  port: 8081
  
spring:
  kafka:
    producer:
      acks: 1
      bootstrap-servers:
      - localhost:9092
      client-id: kafka-producer
      key-serializer:
        org.apache.kafka.common.serialization.StringSerializer
      value-serializer:
        org.apache.kafka.common.serialization.StringSerializer




2. Create Kafka Consumer application

Visit the Spring Initializr to generate new project with required dependencies.
This application consumes the messages from the kafka topic.


pom.xml
    
    <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.5</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>info.code2learn</groupId>
	<artifactId>kafka-consumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafka-consumer</name>
	<description>Kafka Consumer Example</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project> 

KafkaConsumerApplication.java

package info.code2learn.kafkaconsumer;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;

@SpringBootApplication
@EnableKafka
public class KafkaConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaConsumerApplication.class, args);
	}
	
	@Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }
	
	@KafkaListener(id = "kafkaConsumer", topics = { "topic1" })
	public void listen(String message) {
		System.out.println(message);
	}
	
}   
application.yml
    
server:
  port: 8082

spring:
  kafka:
    consumer:
      group-id: kafka-consumer-group
      bootstrap-servers:
      - localhost:9092
      client-id: kafka-consumer
      key-deserializer:
        org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:
        org.apache.kafka.common.serialization.StringDeserializer
    
    
Start both producer and consumer applications.

Output:



Spring Boot HTTPS Rest Call with Rest Template

We will see how to call an application which is HTTPS enabled using Spring Boot RestTemplate in a client application. Prerequisites:  1) M...