After successful configuration of producer with spring-boot. In this post we will configure consumer with spring-boot.
Let’s get started.
Step 1: Start the Zookeeper and Kafka server on your local.
Step 2: Create a spring boot project with Kafka dependencies.
Create a spring boot project, and add below dependencies in your build.gradle / pom.xml
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.6.0'
implementation group: 'org.springframework.kafka', name:'spring-kafka'
Step 3: Consumer application properties
server.port=6000
kafka.bootstrap.server=localhost:9092
kafka.topic.name=greetings
kafka.group.id=G1
Step 4: Consumer Configuration
We need to create ConsumerFactory bean and KafkaListnerContainerFactory bean. Kafka consumer configuration class requires @EnableKafka annotation to detect @KafkaListener annotation in spring managed beans.
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
private static Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);
@Value(value = "${kafka.bootstrap.server}")
private String bootstrapAddress;
@Value(value = "${kafka.topic.name}")
public String topic;
@Value(value = "${kafka.group.id}")
private String kafkaGroupId;
@Bean
private ConsumerFactory consumerFactory() {
log.info("Initializing consumer factory ...");
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Step 5: Implement listener to consume messages
@Service
public class KafkaConsumerListener {
private static Logger log = LoggerFactory.getLogger(KafkaConsumerListener.class);
@KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.group.id}",
containerFactory = "kafkaListenerContainerFactory")
public void consumeGreetings(@Payload String greetings, @Headers MessageHeaders headers) {
log.info("Message from kafka: " + greetings.toString());
}
Spring supports one listener can listen from multiple topics.
@KafkaListener(topics = "topic1, topic2", groupId = "G1")
Also multiple listeners can be implemented for same topic. But listeners should be from different groups.
Summery :
In this post I have shown you how to configure Kafka consumer and consume messages from the topic in a spring-boot application.