Understanding Kafka Offsets

This guide explains Kafka offsets, which are crucial for tracking the position of records in a Kafka topic. Proper understanding of offsets is essential for managing message consumption and ensuring data integrity.

1. What is a Kafka Offset?

A Kafka offset is a unique identifier for each record within a partition of a Kafka topic. It represents the position of a record in the log. Offsets are used by consumers to track and manage the records they have read from a topic.

2. How Offsets Work

Offsets help Kafka consumers keep track of which records have been processed and which are yet to be consumed. There are two primary ways to manage offsets:

3. Example: Manual Offset Management

Below is a Java example demonstrating how to manage offsets manually using Kafka's Consumer API. This example shows how to commit offsets after processing records to ensure that the consumer resumes from the correct position on restart.


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class ManualOffsetManagementExample {
    public static void main(String[] args) {
        // Consumer configuration
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-offset-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Disable auto commit
        
        // Create KafkaConsumer
        KafkaConsumer consumer = new KafkaConsumer<>(props);
        
        // Subscribe to topic
        consumer.subscribe(Collections.singletonList("my-topic"));
        
        // Poll for new records
        while (true) {
            consumer.poll(100).forEach(record -> {
                System.out.printf("Received record: key=%s value=%s partition=%d offset=%d%n",
                                  record.key(), record.value(), record.partition(), record.offset());

                // Manually commit offsets after processing
                Map offsets = new HashMap<>();
                offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
                consumer.commitSync(offsets);
            });
        }
    }
}
    

4. Kafka Offsets Diagram

The following diagram illustrates how offsets are used to track records in a Kafka topic partition and how they are managed by consumers.

Kafka Offsets Diagram

Diagram: Kafka Offsets Tracking and Management

5. Conclusion

Understanding Kafka offsets is crucial for effective data consumption and processing. Proper offset management ensures that consumers can accurately track their progress and handle data recovery in case of failures.