Consumer API Usage in Kafka

This guide covers the basics of using the Kafka Consumer API, which is responsible for reading records from Kafka topics. Understanding how to configure and use the Consumer API is essential for processing data from Kafka.

1. What is a Kafka Consumer?

A Kafka consumer is an application or component that reads records from Kafka topics. Consumers subscribe to one or more topics and process the records they receive. They are an integral part of the Kafka ecosystem for consuming and processing streaming data.

2. Basic Configuration Options

Kafka consumers are configured using various properties to control their behavior. Key configuration options include:

3. Example: Basic Kafka Consumer Configuration

Below is a Java example demonstrating how to configure and use a Kafka consumer to read messages from a Kafka topic.


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // Consumer configuration
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        
        // Create KafkaConsumer
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        
        // Subscribe to topic
        consumer.subscribe(Collections.singletonList("my-topic"));
        
        // Poll for new records
        while (true) {
            consumer.poll(100).forEach(record -> {
                System.out.printf("Received record: key=%s value=%s partition=%d offset=%d%n",
                                  record.key(), record.value(), record.partition(), record.offset());
            });
        }
    }
}
    

4. Kafka Consumer Configuration Diagram

The following diagram illustrates the key components involved in Kafka consumer configuration, including the Kafka broker connection, deserialization, and offset management.

Kafka Consumer Configuration Diagram

Diagram: Kafka Consumer Configuration Components

5. Conclusion

The Kafka Consumer API is essential for reading data from Kafka topics. By understanding and applying the various configuration options, you can efficiently and reliably process streaming data from Kafka.