Kafka Stream With Spring Boot

Prerequisite:

Sample Application:

  • This Microservice produces some data
  • In the real world, Producer could be you browser/some user action sending movie surf history / credit card transactions etc.
  • In this demo, I would be generating numbers sequentially from 1 to N, every second just to keep things simple to understand.
  • This Microservice consumes the data, does some processing on the data and writes back to another topic
  • In the real world, this could be the movie recommendation engine for Netflix.
  • In this demo, I would be skipping all the odd numbers and finding the square of the even numbers.
  • This Microservice consumes the processed data.
  • In the real world, this could be your browser again to get the latest recommendations based on your movie browsing.
  • In this demo, I would consume the data and print it on the console.

Kafka Set up:

Spring Boot — Project Set up:

Java Functional Interface:

Kafka Stream Producer:

  • If the bean type is supplier, Spring Boot treats it as a producer.
  • I use flux as it is going to be a data stream
@Configuration
public class KafkaProducer {

/*
* produce a number from 1, every second
* Supplier<T> makes this as kafka producer of T
* */

@Bean
public Supplier<Flux<Long>> numberProducer(){
return () -> Flux.range(1, 1000)
.map(i -> (long) i)
.delayElements(Duration.ofSeconds(1));
};

}
  • spring.cloud.stream.function.definition where you provide the list of bean names (; separated).
  • spring.cloud.stream.bindings.numberProducer-out-0.destination configures where the data has to go!
  • Spring does it own serialization/deserialization. I skip that and I go with Kafka native serialization and deserialization with these properties
spring.cloud.stream.bindings.numberProducer-out-0.producer.use-native-encodingspring.cloud.stream.kafka.bindings.numberProducer-out-0.producer.configuration.value
  • Then I configure the kafka brokers address.
spring.cloud.stream:
function:
definition: numberProducer
bindings:
numberProducer-out-0:
destination: numbers
producer:
use-native-encoding: true
kafka:
bindings:
numberProducer-out-0:
producer:
configuration:
value:
serializer: org.apache.kafka.common.serialization.LongSerializer
binder:
brokers:
- localhost:9091
- localhost:9092

Kafka Stream Consumer:

  • Create a bean of type Consumer to consume the data from a Kafka topic.
  • We simply print the consumed data.
@Configuration
public class KafkaConsumer {

/*
* consume the numbers received via kafka topic
* Consumer<T> makes this as kafka consumer of T
* */

@Bean
public Consumer<KStream<String, Long>> squaredNumberConsumer(){
return stream -> stream.foreach((key, value) -> System.out.println("Square Number Consumed : " + value));
};

}
  • As usual I update the spring cloud function bean name
  • We assume that squaredNumbers topic is created already and we consume the data from the topic.
  • To consume the data I use in
spring.cloud.stream:
function:
definition: squaredNumberConsumer
bindings:
squaredNumberConsumer-in-0:
destination: squaredNumbers
kafka:
binder:
brokers:
- localhost:9091
- localhost:9092

Kafka Stream Processor:

  • consume the data from numbers topic
  • remove the odd numbers
  • squares the even number
  • write back into another topic.
  • We consume the data which is KStream<String, Long>
  • We do some processing
  • Then we return the KStream<String, Long>. Do note that the return type could be anything. Does not have to be same as Input type.
@Configuration
public class KafkaProcessor {

/*
* process the numbers received via kafka topic
* Function<T, R> makes this as kafka stream processor
* T is input type
* R is output type
*
* */

@Bean
public Function<KStream<String, Long>, KStream<String, Long>> evenNumberSquareProcessor(){
return kStream -> kStream
.filter((k, v) -> v % 2 == 0)
.peek((k, v) -> System.out.println("Squaring Even : " + v))
.mapValues(v -> v * v);
};

}
spring.cloud.stream:
function:
definition: evenNumberSquareProcessor
bindings:
evenNumberSquareProcessor-in-0:
destination: numbers
evenNumberSquareProcessor-out-0:
destination: squaredNumbers
kafka:
binder:
brokers:
- localhost:9091
- localhost:9092

Kafka Stream Processing:

Squaring Even : 2
Squaring Even : 4
Squaring Even : 6
Squaring Even : 8
Squaring Even : 10
Squaring Even : 12
Squaring Even : 14
Square Number Consumed : 4
Square Number Consumed : 16
Square Number Consumed : 36
Square Number Consumed : 64
Square Number Consumed : 100
Square Number Consumed : 144
Square Number Consumed : 196

Bonus: Kafka + Spring Boot — Event Driven:

Summary:

--

--

--

Principal Software Engineer — passionate about software architectural design, microservices.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

How I Cleared AWS Solutions Architect Associate(SAA-C02) Certification in 2 weeks.

Automaton or Software Developer?

Speeding up small queries in BigQuery with BI Engine

Let’s be real about DRY for a moment

How to Provide Feedback to Translators During Localization

Starting Career With Android was a Bad Idea

Chain of Responsibility Design Pattern

How to run an OpenStreetMap tile server with custom styling as Docker container in Kubernetes

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Vinoth Selvaraj

Vinoth Selvaraj

Principal Software Engineer — passionate about software architectural design, microservices.

More from Medium

Distributed micro-services using Spring Cloud — Externalizing Configuration — Part 2

Kafka —  Dead Letter Topic

Object-relational Mapping Using JPA, Hibernate and Spring Data JPA.

How to enable logging MongoDB queries for Spring Boot applications