Kafka Intermediate: Topic Compaction

Topic compaction in Apache Kafka is a feature that ensures that only the latest version of each key is retained within a topic. This is particularly useful for scenarios where you want to keep the most recent state of a key rather than a historical log of all changes.

1. What is Topic Compaction?

Topic compaction is a log cleanup policy that retains only the last message for each key within a topic. Unlike the traditional log retention based on time or size, compaction focuses on preserving the latest state for each key.

2. How Does Topic Compaction Work?

In a compacted topic, Kafka periodically performs log compaction to remove old records with the same key, keeping only the latest record for each key. This is done by creating a "compact" version of the topic that only retains the most recent message for each key.

3. Configuring Topic Compaction

To enable log compaction for a topic, you need to configure the topic with the cleanup.policy property set to compact. Here’s how you can configure it using Kafka's command-line tools:

3.1. Creating a Compacted Topic


kafka-topics.sh --create --topic my-compacted-topic --partitions 1 --replication-factor 1 --config cleanup.policy=compact --bootstrap-server localhost:9092
    

3.2. Altering an Existing Topic


kafka-configs.sh --alter --entity-type topics --entity-name my-compacted-topic --add-config cleanup.policy=compact --bootstrap-server localhost:9092
    

4. Key Considerations

5. Example: Producing and Consuming from a Compacted Topic

Here’s an example of producing and consuming messages from a compacted topic in Java:

5.1. Producer Example


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class CompactedTopicProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer<>(props);

        try {
            ProducerRecord record1 = new ProducerRecord<>("my-compacted-topic", "key1", "value1");
            ProducerRecord record2 = new ProducerRecord<>("my-compacted-topic", "key1", "value2"); // Update for key1

            Future future1 = producer.send(record1);
            Future future2 = producer.send(record2);

            future1.get();
            future2.get();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}
    

5.2. Consumer Example


import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class CompactedTopicConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("my-compacted-topic"));

        try {
            while (true) {
                ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord record : records) {
                    System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}
    

6. Conclusion

Topic compaction in Kafka is a powerful feature for maintaining the latest state of each key while minimizing storage requirements. By understanding and properly configuring topic compaction, you can optimize Kafka’s performance and ensure efficient data management.