Kafka: Kafka producer with SpringBoot

In my earlier article we have seen how to produce and consume messages using terminal.

In this post i’ll show you how we can produce events/message using springboot project.

Spring also provided support for Kafka . Spring Kafka brings the simple and typical Spring template programming model with a KafkaTemplate and Message-driven POJOs via @KafkaListener annotation.

Now without any further delay let’s start implementing.

Step 1: Start the Zookeeper and Kafka server on your local.

Step 2: Create a spring boot project with Kafka dependencies.

Create a spring boot project, and add below dependencies in your build.gradle / pom.xml

implementation group: 'org.springframework.kafka', name:'spring-kafka'
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.6.0'

Step 3: Application configuration

We will define bootstrap server and topic name in application.properties.

server.port=7000
kafka.bootstrap.server=localhost:9092
kafka.topic.name=greetings

Step 3: Configuring Topic

You can create a topic using the command prompt or using spring boot configuration as below:

@Configuration
public class TopicConfig {

    @Value(value = "${kafka.bootstrap.server}")
    private String bootstrapAddress;

    @Value(value = "${kafka.topic.name}")
    public String topic;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic(topic, 1, (short) 1);
    }

Step 4: Producer Configuration

In producer configuration we need ProducerFactory bean and a KafkaTemplate bean.

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrap.server}")
    private String bootstrapAddress;


    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

Step 5: Publishing messages

Let’s create a rest controller which will take messages as input and publish them to kafka topic.

@RestController
@RequestMapping("/greetings")
public class MessageController {


    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value(value = "${kafka.topic.name}")
    public String topic;

    @GetMapping("/msg")
    public void sendMessage(@RequestParam String msg) {
        kafkaTemplate.send(topic, msg);
    }

Summery:

In this post I have shown you how to created a topic and publish messages to the topic from a spring-boot application.

Leave a comment