Kafka Transactions

Kafka transactions provide exactly-once processing semantics for Kafka producers and consumers. They ensure that messages are neither lost nor processed more than once, even in the event of failures. Transactions in Kafka are essential for applications that require high reliability and consistency.

1. Overview

Kafka transactions allow you to group a set of operations (producing messages) into a single atomic unit. This means that all messages produced within a transaction are committed or none are committed if there is a failure. Transactions are managed using a special transactional API.

Key Concepts

2. Configuring Transactions

To use Kafka transactions, you need to configure both the producer and Kafka brokers. Here are the key configurations:

Producer Configuration

# Required producer configurations for transactions:
acks=all
enable.idempotence=true
transactional.id=

Broker Configuration

# Broker configurations for transactions:
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168

3. Using Transactions in Code

Here is a basic example of using transactions with a Kafka producer in Java:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class TransactionalProducerExample {

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");

Producer producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
producer.commitTransaction();
} catch (ProducerException e) {
producer.abortTransaction();
} finally {
producer.close();
}
}
}

4. Handling Failures

Handling failures is a critical part of using transactions. Kafka transactions support automatic recovery in case of producer failures.

Producer Failures

Broker Failures

5. Best Practices