In my earlier article we have seen how to produce and consume messages using terminal.
In this post i’ll show you how we can produce events/message using springboot project.
Spring also provided support for Kafka . Spring Kafka brings the simple and typical Spring template programming model with a KafkaTemplate and Message-driven POJOs via @KafkaListener annotation.
Now without any further delay let’s start implementing.
Step 1: Start the Zookeeper and Kafka server on your local.
Step 2: Create a spring boot project with Kafka dependencies.
Create a spring boot project, and add below dependencies in your build.gradle / pom.xml
implementation group: 'org.springframework.kafka', name:'spring-kafka'
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.6.0'
Step 3: Application configuration
We will define bootstrap server and topic name in application.properties.
server.port=7000
kafka.bootstrap.server=localhost:9092
kafka.topic.name=greetings
Step 3: Configuring Topic
You can create a topic using the command prompt or using spring boot configuration as below:
@Configuration
public class TopicConfig {
@Value(value = "${kafka.bootstrap.server}")
private String bootstrapAddress;
@Value(value = "${kafka.topic.name}")
public String topic;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic(topic, 1, (short) 1);
}
Step 4: Producer Configuration
In producer configuration we need ProducerFactory bean and a KafkaTemplate bean.
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrap.server}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
Step 5: Publishing messages
Let’s create a rest controller which will take messages as input and publish them to kafka topic.
@RestController
@RequestMapping("/greetings")
public class MessageController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value(value = "${kafka.topic.name}")
public String topic;
@GetMapping("/msg")
public void sendMessage(@RequestParam String msg) {
kafkaTemplate.send(topic, msg);
}
Summery:
In this post I have shown you how to created a topic and publish messages to the topic from a spring-boot application.