Topic compaction in Apache Kafka is a feature that ensures that only the latest version of each key is retained within a topic. This is particularly useful for scenarios where you want to keep the most recent state of a key rather than a historical log of all changes.
Topic compaction is a log cleanup policy that retains only the last message for each key within a topic. Unlike the traditional log retention based on time or size, compaction focuses on preserving the latest state for each key.
In a compacted topic, Kafka periodically performs log compaction to remove old records with the same key, keeping only the latest record for each key. This is done by creating a "compact" version of the topic that only retains the most recent message for each key.
To enable log compaction for a topic, you need to configure the topic with the cleanup.policy
property set to compact
. Here’s how you can configure it using Kafka's command-line tools:
kafka-topics.sh --create --topic my-compacted-topic --partitions 1 --replication-factor 1 --config cleanup.policy=compact --bootstrap-server localhost:9092
kafka-configs.sh --alter --entity-type topics --entity-name my-compacted-topic --add-config cleanup.policy=compact --bootstrap-server localhost:9092
log.cleaner.interval.ms
configuration, which determines how often the log cleaner runs.retention.ms
and retention.bytes
settings for additional cleanup of old segments.Here’s an example of producing and consuming messages from a compacted topic in Java:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class CompactedTopicProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(props);
try {
ProducerRecord record1 = new ProducerRecord<>("my-compacted-topic", "key1", "value1");
ProducerRecord record2 = new ProducerRecord<>("my-compacted-topic", "key1", "value2"); // Update for key1
Future future1 = producer.send(record1);
Future future2 = producer.send(record2);
future1.get();
future2.get();
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class CompactedTopicConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my-compacted-topic"));
try {
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord record : records) {
System.out.printf("Consumed record with key %s and value %s%n", record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
Topic compaction in Kafka is a powerful feature for maintaining the latest state of each key while minimizing storage requirements. By understanding and properly configuring topic compaction, you can optimize Kafka’s performance and ensure efficient data management.