TheDeveloperBlog.com

Home | Contact Us

C-Sharp | Java | Python | Swift | GO | WPF | Ruby | Scala | F# | JavaScript | SQL | PHP | Angular | HTML

Creating Kafka Consumer in Java

Creating Kafka Consumer in Java with Apache Kafka Introduction, What is Kafka, Kafka Topic Replication, Kafka Fundamentals, Architecture, Kafka Installation, Tools, Kafka Application etc.

<< Back to CREATING

Creating Kafka Consumer in Java

In the previous section, we learned to create a producer in java. In this section, we will learn to implement a Kafka consumer in java.

There are following steps taken to create a consumer:

  1. Create Logger
  2. Create consumer properties.
  3. Create a consumer.
  4. Subscribe the consumer to a specific topic.
  5. Poll for some new data

Let's discuss each step to learn consumer implementation in java.

Creating Logger

The logger is implemented to write log messages during the program execution. The user needs to create a Logger object which will require to import 'org.slf4j class'. Below snapshot shows the Logger implementation:

Creating Kafka Consumer in Java

Creating Consumer Properties

Similar to the producer properties, Apache Kafka offers various different properties for creating a consumer as well. To know about each consumer property, visit the official website of Apache Kafa>Documentation>Configuration>Consumer Configs. Here, we will list the required properties of a consumer, such as:

key.deserializer: It is a Deserializer class for the key, which is used to implement the 'org.apache.kafka.common.serialization.Deserializer' interface.

value.deserializer: A Deserializer class for value which implements the 'org.apache.kafka.common.serialization.Desrializer' interface.

bootstrap.servers: It is a list of host/port pairs which is used to establish an initial connection with the Kafka cluster. It does not contain a full set of servers that a client requires. Only the servers which are required for bootstrapping are required.

group.id: It is a unique string which identifies the consumer of a consumer group. This property is needed when a consumer uses either Kafka based offset management strategy or group management functionality via subscribing to a topic.

auto.offset.reset: This property is required when no initial offset is present or if the current offset does not exist anymore on the server. There are the following values used to reset the offset values:

earliest: This offset variable automatically reset the value to its earliest offset.

latest: This offset variable reset the offset value to its latest offset.

none: If no previous offset is found for the previous group, it throws an exception to the consumer.

anything else: It throws an exception to the consumer.

Note: In our code, we have used 'earliest' variable to reset the value to its earliest.

These are some essential properties which are required to implement a consumer. Let's implement using IntelliJ IDEA.

Step1) Define a new java class as 'consumer1.java'.

Step2) Describe the consumer properties in the class, as shown in the below snapshot:

Creating Kafka Consumer in Java

In the snapshot, all the necessary properties are described.

Creating the consumer

Create an object of KafkaConsumer for creating the consumer, as shown below:

Creating Kafka Consumer in Java

The above described properties are passed while creating the consumer.

Subscribing the consumer

To read the message from a topic, we need to connect the consumer to the specified topic. A consumer can be subscribed through various subscribe API's. Here, we have used Arrays.asList() because may be the user wants to subscribe either to one or multiple topics. Therefore, Arrays.asList() allows to subscribe the consumer to multiple topics.

Below code shows the implementation of subscription of the consumer:

Creating Kafka Consumer in Java

The user needs to specify the topics name directly or through a string variable to read the messages. There can be multiple topics also separated by the comma.

Polling for new data

The consumer reads data from Kafka through the polling method.

Creating Kafka Consumer in Java

The poll method returns the data fetched from the current partition's offset. The time duration is specified till which it waits for the data, else returns an empty ConsumerRecord to the consumer. Also, the logger will fetch the record key, partitions, record offset and its value.

The complete code to craete a java consumer is given below:

package com.firstgroupapp.aktutorial;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class consumer1 {
    public static void main(String[] args) {
        Logger logger= LoggerFactory.getLogger(consumer1.class.getName());
        String bootstrapServers="127.0.0.1:9092";
        String grp_id="third_app";
        String topic="my_first";
        //Creating consumer properties
        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,grp_id);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //creating consumer
        KafkaConsumer<String,String> consumer= new KafkaConsumer<String,String>(properties);
        //Subscribing
                consumer.subscribe(Arrays.asList(topic));
        //polling
        while(true){
            ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(100));
            for(ConsumerRecord<String,String> record: records){
                logger.info("Key: "+ record.key() + ", Value:" +record.value());
                logger.info("Partition:" + record.partition()+",Offset:"+record.offset());
            }


        }
    }
}

In this way, a consumer can read the messages by following each step sequentially.

The output of the consumer implementation can be seen in the below snapshot:

Creating Kafka Consumer in Java

The key value is null. It is because we had not specified any key earlier. Due to 'earliest', all the messages from the beginning are displayed.

Reading data in Consumer Group

The user can have more than one consumer reading data altogether. This can be done via a consumer group. In the consumer group, one or more consumers will be able to read the data from Kafka. If the user wants to read the messages from the beginning, either reset the group_id or change the group_id. This will reset the user's application and will display the messages from the starting.






Related Links:


Related Links

Adjectives Ado Ai Android Angular Antonyms Apache Articles Asp Autocad Automata Aws Azure Basic Binary Bitcoin Blockchain C Cassandra Change Coa Computer Control Cpp Create Creating C-Sharp Cyber Daa Data Dbms Deletion Devops Difference Discrete Es6 Ethical Examples Features Firebase Flutter Fs Git Go Hbase History Hive Hiveql How Html Idioms Insertion Installing Ios Java Joomla Js Kafka Kali Laravel Logical Machine Matlab Matrix Mongodb Mysql One Opencv Oracle Ordering Os Pandas Php Pig Pl Postgresql Powershell Prepositions Program Python React Ruby Scala Selecting Selenium Sentence Seo Sharepoint Software Spellings Spotting Spring Sql Sqlite Sqoop Svn Swift Synonyms Talend Testng Types Uml Unity Vbnet Verbal Webdriver What Wpf