Kafka: Kafka consumer with SpringBoot

After successful configuration of producer with spring-boot. In this post we will configure consumer with spring-boot.

Let’s get started.

Step 1: Start the Zookeeper and Kafka server on your local.

Step 2: Create a spring boot project with Kafka dependencies.

Create a spring boot project, and add below dependencies in your build.gradle / pom.xml

implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.6.0'
implementation group: 'org.springframework.kafka', name:'spring-kafka'

Step 3: Consumer application properties

server.port=6000
kafka.bootstrap.server=localhost:9092
kafka.topic.name=greetings
kafka.group.id=G1

Step 4: Consumer Configuration

We need to create ConsumerFactory bean and KafkaListnerContainerFactory bean. Kafka consumer configuration class requires @EnableKafka annotation to detect @KafkaListener annotation in spring managed beans.

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    private static Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);

    @Value(value = "${kafka.bootstrap.server}")
    private String bootstrapAddress;

    @Value(value = "${kafka.topic.name}")
    public String topic;

    @Value(value = "${kafka.group.id}")
    private String kafkaGroupId;

    @Bean
    private ConsumerFactory consumerFactory() {
        log.info("Initializing consumer factory ...");
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

Step 5: Implement listener to consume messages

@Service
public class KafkaConsumerListener {
    private static Logger log = LoggerFactory.getLogger(KafkaConsumerListener.class);

    @KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.group.id}",
            containerFactory = "kafkaListenerContainerFactory")
    public void consumeGreetings(@Payload String greetings, @Headers MessageHeaders headers) {
        log.info("Message from kafka: " + greetings.toString());
    }

Spring supports one listener can listen from multiple topics.

@KafkaListener(topics = "topic1, topic2", groupId = "G1")

Also multiple listeners can be implemented for same topic. But listeners should be from different groups.

Summery :

In this post I have shown you how to configure Kafka consumer and consume messages from the topic in a spring-boot application.

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s