This guide explains Kafka offsets, which are crucial for tracking the position of records in a Kafka topic. Proper understanding of offsets is essential for managing message consumption and ensuring data integrity.
A Kafka offset is a unique identifier for each record within a partition of a Kafka topic. It represents the position of a record in the log. Offsets are used by consumers to track and manage the records they have read from a topic.
Offsets help Kafka consumers keep track of which records have been processed and which are yet to be consumed. There are two primary ways to manage offsets:
Below is a Java example demonstrating how to manage offsets manually using Kafka's Consumer API. This example shows how to commit offsets after processing records to ensure that the consumer resumes from the correct position on restart.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class ManualOffsetManagementExample {
public static void main(String[] args) {
// Consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-offset-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // Disable auto commit
// Create KafkaConsumer
KafkaConsumer consumer = new KafkaConsumer<>(props);
// Subscribe to topic
consumer.subscribe(Collections.singletonList("my-topic"));
// Poll for new records
while (true) {
consumer.poll(100).forEach(record -> {
System.out.printf("Received record: key=%s value=%s partition=%d offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
// Manually commit offsets after processing
Map offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
consumer.commitSync(offsets);
});
}
}
}
The following diagram illustrates how offsets are used to track records in a Kafka topic partition and how they are managed by consumers.
Diagram: Kafka Offsets Tracking and Management
Understanding Kafka offsets is crucial for effective data consumption and processing. Proper offset management ensures that consumers can accurately track their progress and handle data recovery in case of failures.