Sunday, May 16, 2021

Kafka Producer and Consumer Example using Spring Boot

Kafka Producer and Consumer Example using Spring Boot

In this post, we will see how to create a Kafka Producer application which publishes messages to a kafka topic and a Kafka Consumer application which subscribes and consumes messages from the kafka topic.

Pre-requisites for below tutorial:

  • JDK 1.8 or later
  • Maven
  • Git

This example requires below two applications.
  1. Kafka Producer
  2. Kafka Consumer

1. Create Kafka Producer application

Visit the Spring Initializr to generate new project with required dependencies.

Below kafka producer application produces a message to kafka topic every 1 sec.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.5</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>info.code2learn</groupId>
	<artifactId>kafka-producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafka-producer</name>
	<description>Kafka Producer Example</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>



KafkaProducerApplication.java

package info.code2learn.kafkaproducer;

import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication {
	
	private static final Logger log = LoggerFactory.getLogger(KafkaProducerApplication.class);
	private static final String TOPIC = "topic1";
	
	private Integer counter = 0;
	
	@Autowired
	KafkaTemplate<String, String> kafkaTemplate;

	public static void main(String[] args) {
		SpringApplication.run(KafkaProducerApplication.class, args);
	}
	
	@Bean
    public NewTopic topic() {
        return TopicBuilder.name(TOPIC)
                .partitions(10)
                .replicas(1)
                .build();
    }
	
	@Scheduled(fixedDelay = 1000)
	public void produce() {
		String message = "Hello " + counter++;
		log.info("Sending message to topic : {}, message: {}", TOPIC, message);
		kafkaTemplate.send(TOPIC, String.valueOf(message.hashCode()), message);
	}

}




application.yml

server:
  port: 8081
  
spring:
  kafka:
    producer:
      acks: 1
      bootstrap-servers:
      - localhost:9092
      client-id: kafka-producer
      key-serializer:
        org.apache.kafka.common.serialization.StringSerializer
      value-serializer:
        org.apache.kafka.common.serialization.StringSerializer




2. Create Kafka Consumer application

Visit the Spring Initializr to generate new project with required dependencies.
This application consumes the messages from the kafka topic.


pom.xml
    
    <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.5</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>info.code2learn</groupId>
	<artifactId>kafka-consumer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafka-consumer</name>
	<description>Kafka Consumer Example</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project> 

KafkaConsumerApplication.java

package info.code2learn.kafkaconsumer;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;

@SpringBootApplication
@EnableKafka
public class KafkaConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaConsumerApplication.class, args);
	}
	
	@Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1")
                .partitions(10)
                .replicas(1)
                .build();
    }
	
	@KafkaListener(id = "kafkaConsumer", topics = { "topic1" })
	public void listen(String message) {
		System.out.println(message);
	}
	
}   
application.yml
    
server:
  port: 8082

spring:
  kafka:
    consumer:
      group-id: kafka-consumer-group
      bootstrap-servers:
      - localhost:9092
      client-id: kafka-consumer
      key-deserializer:
        org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer:
        org.apache.kafka.common.serialization.StringDeserializer
    
    
Start both producer and consumer applications.

Output:



2 comments:

  1. Caesars Casino Review (2021) - Get $10 Free with No Deposit
    Caesars Casino Review · febcasino.com 1. Claim your $10 free bonus and ventureberg.com/ receive up to 토토 $20 in casino credits (30 Free poormansguidetocasinogambling Spins) · 2. Play Slots 1등 사이트 at Caesars Casino.

    ReplyDelete
  2. For instance, this win above was on a “Jacks or Better” that wasn’t really Jacks or Better, however Bonus Poker Deluxe. I didn’t mind outcome of|as a outcome of} my dealt four of a sort paid four hundred credits as a substitute of 125 for Jacks or Better, yielding me 솔카지노 $1,000 on this $12.50 wager. It’s common to get a 3rd to half your a reimbursement on a poor beginning hand. I’ve all the time been into Video Poker, and find it relaxing to play.

    ReplyDelete

Spring Boot HTTPS Rest Call with Rest Template

We will see how to call an application which is HTTPS enabled using Spring Boot RestTemplate in a client application. Prerequisites:  1) M...