Featured

Kafka: Kafka consumer with SpringBoot

After successful configuration of producer with spring-boot. In this post we will configure consumer with spring-boot.

Let’s get started.

Step 1: Start the Zookeeper and Kafka server on your local.

Step 2: Create a spring boot project with Kafka dependencies.

Create a spring boot project, and add below dependencies in your build.gradle / pom.xml

implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.6.0'
implementation group: 'org.springframework.kafka', name:'spring-kafka'

Step 3: Consumer application properties

server.port=6000
kafka.bootstrap.server=localhost:9092
kafka.topic.name=greetings
kafka.group.id=G1

Step 4: Consumer Configuration

We need to create ConsumerFactory bean and KafkaListnerContainerFactory bean. Kafka consumer configuration class requires @EnableKafka annotation to detect @KafkaListener annotation in spring managed beans.

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    private static Logger log = LoggerFactory.getLogger(KafkaConsumerConfig.class);

    @Value(value = "${kafka.bootstrap.server}")
    private String bootstrapAddress;

    @Value(value = "${kafka.topic.name}")
    public String topic;

    @Value(value = "${kafka.group.id}")
    private String kafkaGroupId;

    @Bean
    private ConsumerFactory consumerFactory() {
        log.info("Initializing consumer factory ...");
        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

Step 5: Implement listener to consume messages

@Service
public class KafkaConsumerListener {
    private static Logger log = LoggerFactory.getLogger(KafkaConsumerListener.class);

    @KafkaListener(topics = "${kafka.topic.name}", groupId = "${kafka.group.id}",
            containerFactory = "kafkaListenerContainerFactory")
    public void consumeGreetings(@Payload String greetings, @Headers MessageHeaders headers) {
        log.info("Message from kafka: " + greetings.toString());
    }

Spring supports one listener can listen from multiple topics.

@KafkaListener(topics = "topic1, topic2", groupId = "G1")

Also multiple listeners can be implemented for same topic. But listeners should be from different groups.

Summery :

In this post I have shown you how to configure Kafka consumer and consume messages from the topic in a spring-boot application.

Advertisement

Kafka: Kafka producer with SpringBoot

In my earlier article we have seen how to produce and consume messages using terminal.

In this post i’ll show you how we can produce events/message using springboot project.

Spring also provided support for Kafka . Spring Kafka brings the simple and typical Spring template programming model with a KafkaTemplate and Message-driven POJOs via @KafkaListener annotation.

Now without any further delay let’s start implementing.

Step 1: Start the Zookeeper and Kafka server on your local.

Step 2: Create a spring boot project with Kafka dependencies.

Create a spring boot project, and add below dependencies in your build.gradle / pom.xml

implementation group: 'org.springframework.kafka', name:'spring-kafka'
implementation group: 'org.apache.kafka', name: 'kafka-clients', version: '2.6.0'

Step 3: Application configuration

We will define bootstrap server and topic name in application.properties.

server.port=7000
kafka.bootstrap.server=localhost:9092
kafka.topic.name=greetings

Step 3: Configuring Topic

You can create a topic using the command prompt or using spring boot configuration as below:

@Configuration
public class TopicConfig {

    @Value(value = "${kafka.bootstrap.server}")
    private String bootstrapAddress;

    @Value(value = "${kafka.topic.name}")
    public String topic;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic(topic, 1, (short) 1);
    }

Step 4: Producer Configuration

In producer configuration we need ProducerFactory bean and a KafkaTemplate bean.

@Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrap.server}")
    private String bootstrapAddress;


    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

Step 5: Publishing messages

Let’s create a rest controller which will take messages as input and publish them to kafka topic.

@RestController
@RequestMapping("/greetings")
public class MessageController {


    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value(value = "${kafka.topic.name}")
    public String topic;

    @GetMapping("/msg")
    public void sendMessage(@RequestParam String msg) {
        kafkaTemplate.send(topic, msg);
    }

Summery:

In this post I have shown you how to created a topic and publish messages to the topic from a spring-boot application.

Kafka: Publish and Consume messages

In my earlier posts, I have explained about Kafka and the how to install and run Kafka on your system.

Now we will see how to publish and consume messages in Kafka.

Step 1: Create a Topic

As we know in Kafka publisher publish messages to topic and Kafka will decides this message will be assigned to which partition in a topic.

So first we will create a topic named greetings. For this lets open a new command prompt and navigate to bin/windows folder. Then by using kafka-topic.bat we can create a topic.

Notice the bootstrap server port 9092. This is the default port of Kafka server.

$ kafka-topics.bat --create --topic greetings --bootstrap-server localhost:9092

So now we have successfully created the topic.

We can pass the –describe parameter to kafka-topic.bat to get the information about the topic.

$ kafka-topics.bat --describe --topic greetings --bootstrap-server localhost:9092

Step 2: Publish some events

Now let’s write some message or publish some event to the topic.

To do so open a new command prompt and navigate to bin/windows and type below command.

$ kafka-console-producer.bat --topic greetings --bootstrap-server localhost:9092

Then type the message you want to publish. By default each line you enter will trigger a separate event to the topic.

We can stop the publisher any time by pressing Ctrl + C.

Step 3: Consume events

Open an another terminal and by using kafka-console-consumer.bat you will be able to consume messages.

$ kafka-console-consumer.bat --topic greetings --from-beginning --bootstrap-server localhost:9092

Great 👏

Now you are publishing and consuming messages using Kafka.

Summery :

In this article we have demonstrate how you can create topic in Kafka and produce and consume messages by using Kafka’s producer and consumer console library.

Prev -> Kafka: Install and Run Apache Kafka on windows

Kafka: Install and Run Apache Kafka on windows

Install Apache Kafka on Windows

STEP 1: Install JAVA SDK >8

For this we need java-jdk installed on our system.

STEP 2: Download and Install Apache Kafka binaries

You can download the Apache Kafka binaries from Apache kafka official page:

https://kafka.apache.org/downloads

STEP 3: Extract the binary

Extract the binary to some folder. Create a ‘data‘ folder at bin level.

Inside data folder create zookeeper and kafka folder.

STEP 4: Update configuration value

Update zookeeper data directory path in “config/zookeeper.Properties” configuration file.

With the zookeeper folder path that you have created in data.

Update Apache Kafka log file path in “config/server.properties” configuration file.

STEP 5:  Start Zookeeper

Now we will start zookeeper from command prompt. Go to kafka bin\windows and execute zookeeper-server-start.bat command with config/zookeeper.Properties configuration file.

Here we are using default properties that already bundled with kafka bindary and persist into the config folder. later we can update this according to our uses.

To validate if zookeeper starts successfully check for below logs.

STEP 6:  Start Apache Kafka

Finally we will start Apache Kafka from command prompt just in the same way we started zookeeper. Open an another command prompt, run kafka-server-start.bat command with kafka config/server.properties configuration file.

Summery:

To proceed with kafka you need install and run kafka and zookeeper server on your machine. with the above steps.

Next-> Kafka: Publish and Consume messages

Prev-> Kafka: Introduction to Kafka

Kafka: Introduction to Kafka

In this world of data where things and systems started depending on data, it is very important to get the right data at a right time to get the most of it. In this a great architecture of data streaming – “Apache Kafka” has introduced in 2011

Here I am brining a short course for Kafka where try to provide a basic understanding of Kafka with it’s core architecture and some hands-on on the producer consumer code.

So let’s get started 😊

What is Kafka?

Apache Kafka was originated at LinkedIn and later became an open-sourced Apache project in 2011,  then a first-class Apache project in 2012. Kafka is written in Scala and Java.

Apache Kafka is a publisher-subscriber concept based on a fault-tolerant messaging system. It is fast, scalable, and distributed by design.

“Kafka is an Event Streaming architecture.”

Event streaming is capturing data in real-time from various event sources like databases, cloud services, software applications, etc.

Why Kafka?

Kafka is a messaging system. This is typically suits for the application that requires high throughput and low latency. It can be used for real-time analytics.

Kafka can work with Flume/Flafka, Spark Streaming, Storm, HBase, Flink, and Spark for real-time ingesting, analysis and processing of streaming data. Kafka is a data stream used to feed Hadoop BigData lakes. Kafka brokers support massive message streams for a low-latency follow-up analysis in Hadoop or Spark.

Basics of Kafka:

Apache.org states that:

  • Kafka runs as a cluster on one or more servers.
  • The Kafka cluster stores a stream of records in categories called topics.
  • Each record consists of a key, a value, and a timestamp.

Key Concepts :

Events and Offset :

Kafka uses Log data structure to store the Event/Messages. Each message/Event has a unique Key. Kafka ensures that the message should not be duplicate and must be in sequence.

Offsets are the pointers to understand from where data needs to be picked.

Events/Messages can stay in the partition for very long period and even forever.

Topic and Partitions :

Topic is a uniquely defined category in which producer publishes messages.

Each topic contain one or many partitions. Partitions contains messages.

Messages are written to topics and kafka uses round robin to selects which partition to write the message to.

To make sure that some particular type of messages should go to same partition we can assign Key to the messages, attaching a key to messages will ensure messages with the same key always go to the same partition in a topic. Kafka guarantees order within a partition, but not across partitions in a topic.

Cluster and Broker :

Kafka cluster can have multiple brokers inside it, to maintain load balancing. A single Kafka server is called as Kafka broker. Kafka cluster is stateless hence to maintain cluster state Kafka uses Zookeeper.

I’ll cover zookeeper in the next point. For now let’s understand what is broker.

Broker receives messages from producer and assign offset to it and then store it on local disk.

Broker is also responsible to serve message fetch request coming from consumer.

Each broker contains one or more Topics. Each topic along with their partitions can be assigned to multiple broker but the owner or leader will be only one.

For example in the below diagram Partition 0 is replicated along with topic X in Broker 1 and Broker 2, but the leader will always be only one. The replica is used as a backup of partition. So that if any particular broker fails then the replicator takes leadership.

Producer and consumer only connects to the Leader partition.

Zookeeper:

Kafka uses Zookeeper to maintain and coordinate between brokers.

Zookeeper is also sends notification to the Producer and consumer about the presence of any new broker or if any new leader created. So that according to that they can make decision and start coordinating  the task accordingly.

Consumer Group:

A consumer group is a platform where we can have multiple consumers. Each consumer group has one unique Id.

Only one consume in the group can pull the messages from a particular partition. Same consumer group can not have multiple consumers of same partition.  

Multiple consumers can consume messages from same partition but they must be from different consumer groups.

If the consumers are more in same group and partitions are less then there are changes to have some inactive consumers in the group.

Summery:

Kafka is an event based messaging system. Mostly suited for applications where big amount of real time data needs to be processed.

In the complete architecture of Kafka it provides load balancing, data backup, maintain message order, facility to read messages from a particular position, message storage for longer period, message can be fetched by multiple consumers of different groups.

Next -> Kafka: Install and Run Apache Kafka on windows

Featured

Getting started with MongoDB

mongodb_logo.jpg

Hello Everyone,

In this post, I am going to present some of the basic syntax and example of MongoDB to get started with it. For basic detail of NoSQL db visit this link.

What is MongoDB?

Mongo db is an open source, cross-platform NoSql database. It is a document-oriented db which is written in C++.

Mongo db stores its data on the filesystem. It stores all the data in BSON (Binary JSON). The format of BSON documents is very similar to the Object Oriented Programming. In MongoDB we can store complete information in one document rather than creating different tables for them and then define a relationship between them.

Let’s take a brief look at terms which mongo db uses to store the data:

  • Collections: You need to create collections in each Database. Each DB can contain multiple collections.
  • Documents: Each Collection contains multiple Documents.
  • Fields: Each Document contains multiple Fields.

 

Now get started with the commands:

Database:

  • Create db or Use db: There is no command to create a db in Mongo. Whenever we want to create a new db use following command.
    • Syntax: use <dbname>
    • Example: use customerdb
  • Show current db : This is very simple and small command
    • Syntax: db
  • Show db : This command will return the existing dbs only if it contains at least one collection.
    • Syntax: show dbs

At this time we did show dbs it will return only a default db. For this, we need to add a collection to it. We will see how to add collection in collection section. But for now, presenting an example:

    • Example: db.customer({first_name:”Robin”}
    • show dbs

Now we can see our db customerdb in the list.

  • Drop db: To drop database following is the command. Before deleting database first select the db.
    • Syntax: db.dropDatabase()
    • Example: use customerdb
      db.dropDatabase()

Collections

  • Create collection : Mongo db normally we do not need to create collection explicitly. When we write a command to insert the document it will create the collection if does not exist. But there is a way to create collection explicitly an define it as expected:
    • Syntax: db.createCollection(<collectionName>, option)
      db.createCollection(<name>, { capped: <boolean>,
      autoIndexId: <boolean>,
      size: <number>,
      max: <number>,
      storageEngine: <document>,
      validator: <document>,
      validationLevel: <string>,
      validationAction: <string>,
      indexOptionDefaults: <document>,
      viewOn: <string>,
      pipeline: <pipeline>,
      collation: <document> } )
    • Example: db.createCollection(customer)

Collection name’s type is String and option type is Document.
Some of the important fields I am describing below:

Field(optional) Type Description
capped boolean If it sets to true it creates capped collection. For capped collection we need to define the size as well.
size number It defines the size of the capped collection. If the documents size reached to its limit then on each insert mongo db started deleting the old entries.
max number This defines the max size of capped collection. If the size is defined less then max size mongo db will start deleting the old document. So we need to ensure that max size is always less then size.
autoIndexId boolean It automatically creates index on id.

 

  • Drop Collection: We can drop a collection by using the following index but before dropping any collection we should be in the same db.
    • Syntax: db.<collectionName>.drop()
    • Example: use customerdb
      db.customer.drop()

CRUD Operations:

Mongo db provides very flexibility for CRUD operations. We can insert or update document on the fly.

  • Insert Document:
    • Syntax: db.<collectionName>.insert(<documents>)
    • Example:  db.customer.insert([{first_name:”Robin”, last_name:”Desosa”}, {first_name:“Kanika”, last_name:”Bhatnagar”},{first_name:”Rakesh”, last_name:”Sharma”, gender:”male”}]);

In the above example we are adding 3 document, first 2 are having the same fields but the third document has an additional field gender. Mongo db provides a functionality to insert the non structural data.
When you insert a document mongo db will automatically create a unique Id for each document.

  • Update Document:
    • Syntax: db.<collectionName>.update({<documentIdentifier>}, {$set:{<update value>}})
    • Example: db.customer.update({first_name:”Robin”},  {$set:{gender:”male”}});
      db.customer.update({first_name:”Kanika”},  {$set:{gender:”female”}});db.customer.update({first_name:”Rakesh”}, {$set:{age:”25”}})The above example will add a new field in corresponding document.
  • Update or Insert: Upsert command updates the document if it already exists or inserts a new one.
    • Syntax: db.<collectionName>.upsert({<docIdentifier>}, {<document>}, {upsert:true})
    • Example: db.customer({first_name:”Amita”}, {first_name:” Amita”, last_name:”Jain”, gender:”female”}, {upsert: true});
  • Rename Field in Document: We can rename field of a specific document by using $rename in update command.
    • Syntax: db.<collectionName>. update({<documentIdentifier>}, {$rename:{<update value>}})
    • Example: db.customer.update({first_name:”Rakesh”}, {$rename:{“gender”:”sex”}});After this we renamed the gender field to sex only for the document whose first_name is “Rakesh”.
  • Remove a field: To remove a field $unset needs to be used in update command.
    • Syntax: db.<collectionName>.update({<documentIdentifier >}, {$unset:{<field:1>}})
    • Example: Db.customer.update({first_name:”Rakesh”}, {$unset:{age:1}});
  • Remove Document:
    • Syntax: db.<collectionName>.remove({<documentIdentifier >})
    • Example: db.customer.remove({first_name:”Amita”});
      (If we have multiple entries with first_name Amita and want to remove 1.)
      db.customer.remove({first_name:”Amita”}, {justOne:true});
  • Find Document: We can find the document in collection by using following command. The output of that command is an object in json form.
    • Syntax: db.<collectionName>.find()
    • Example: db.customer.find();

The output of the above command will be all the json object stored in that collection.
To see it in a formatted way like each object and field in new line we can use pretty on find.

Example: db.customer.find().pretty();

  • Find Specific: By passing the documentIdentifier value in find method.
    • Syntax: db.<collectionName>.find({<documentIdentifier >})
    • Example: db.customer.find({first_name:”Kanika”});
  • Or condition: 
    • Example: db.customer.find({$or:[{first_name:”Kanika”}, {first_name:”Robin”}]});

In the above example we have a document in find as parameter, and in that document we have give an array of first_name. $or is defining the operation which is going to be performed on the array.

  • Greater than, Less than: We can directly jump on the example of the greater than and less than.

Example:

  • db.customer.find({age:{$gt:26}});
    In the above example $gt defines that > operation need to be perform on age. It will find and print all the documents who has this field age and age >26.
  • db.customer.find({age:{$lt:26}});
    In the same way this $lt will help us to find all documents which have age field and age < 26.
  • db.customer.find({age:{$gte:26}});
    We can perform >= or <= operations as well by using $gte and $lte.

Following are some more example on features provided by Mongo db:

  • Sort:
    • db.customer.find().sort({first_name:1}); //descending order
      db.customer.find().sort({first_name:-1}); //ascending order
  • Count:
    • db.customer.find().count();
  • ForEach:
    • db.customer.find().forEach(function(doc){print(“Customer Name:”+ doc.first_name)});

These all are the basic syntax for getting started with MongoDb.