Kafka: Kafka consumer with SpringBoot

After successful configuration of producer with spring-boot. In this post we will configure consumer with spring-boot.

Let’s get started.

Step 1: Start the Zookeeper and Kafka server on your local.

Step 2: Create a spring boot project with Kafka dependencies.

Create a spring boot project, and add below dependencies in your build.gradle / pom.xml

implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.6.0'
implementation group: 'org.springframework.kafka', name:'spring-kafka'

Step 3: Consumer application properties


Step 4: Consumer Configuration

We need to create ConsumerFactory bean and KafkaListnerContainerFactory bean. Kafka consumer configuration class requires @EnableKafka annotation to detect @KafkaListener annotation in spring managed beans.

public class KafkaConsumerConfig {
    private static Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);

    @Value(value = "${kafka.bootstrap.server}")
    private String bootstrapAddress;

    @Value(value = "${kafka.topic.name}")
    public String topic;

    @Value(value = "${kafka.group.id}")
    private String kafkaGroupId;

    private ConsumerFactory consumerFactory() {
        log.info("Initializing consumer factory ...");
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);

    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        return factory;

Step 5: Implement listener to consume messages

public class KafkaConsumerListener {
    private static Logger log = LoggerFactory.getLogger(KafkaConsumerListener.class);

    @KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.group.id}",
            containerFactory = "kafkaListenerContainerFactory")
    public void consumeGreetings(@Payload String greetings, @Headers MessageHeaders headers) {
        log.info("Message from kafka: " + greetings.toString());

Spring supports one listener can listen from multiple topics.

@KafkaListener(topics = "topic1, topic2", groupId = "G1")

Also multiple listeners can be implemented for same topic. But listeners should be from different groups.

Summery :

In this post I have shown you how to configure Kafka consumer and consume messages from the topic in a spring-boot application.


