Cross-cluster replication (CCR) in Apache Kafka allows you to replicate data between Kafka clusters that may be geographically dispersed or in different data centers. This feature is useful for disaster recovery, data locality, and minimizing latency for global applications.
CCR enables data to be replicated from one Kafka cluster (source) to another Kafka cluster (destination). This involves several key components and steps:
To set up cross-cluster replication, you need to configure Kafka MirrorMaker or the newer MirrorMaker 2 (MM2) tool, which provides additional features and improvements.
MirrorMaker is used to replicate data from one cluster to another. Below is a basic configuration example:
# Source cluster properties (source-cluster.properties)
bootstrap.servers=source-cluster-broker1:9092,source-cluster-broker2:9092
consumer.group.id=mirror-maker-group
consumer.auto.offset.reset=earliest
# Destination cluster properties (destination-cluster.properties)
bootstrap.servers=destination-cluster-broker1:9092,destination-cluster-broker2:9092
# MirrorMaker command
kafka-mirror-maker.sh --consumer.config source-cluster.properties --producer.config destination-cluster.properties --whitelist ".*" --num.streams 2 --offsets.topic.replication.factor 3
MirrorMaker 2 is an advanced version with improved functionality. Configuration is done through a Kafka Connect worker configuration file:
# MirrorMaker 2 worker properties (mirror-maker2.properties)
bootstrap.servers=source-cluster-broker1:9092,source-cluster-broker2:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
config.storage.topic=mirror-maker2-configs
offset.storage.topic=mirror-maker2-offsets
status.storage.topic=mirror-maker2-status
offsets.topic.replication.factor=3
config.storage.replication.factor=3
status.storage.replication.factor=3
# Connector configuration (mirror-maker2-connector.properties)
name=mirror-maker-connector
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
source.cluster.alias=source-cluster
target.cluster.alias=destination-cluster
topics=.*
When setting up cross-cluster replication, it's crucial to configure security settings to protect data in transit.
Configure SSL/TLS encryption to secure communication between clusters:
# Source cluster SSL configuration
ssl.keystore.location=/path/to/keystore
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/path/to/truststore
ssl.truststore.password=truststore-password
# Destination cluster SSL configuration
ssl.keystore.location=/path/to/keystore
ssl.keystore.password=keystore-password
ssl.key.password=key-password
ssl.truststore.location=/path/to/truststore
ssl.truststore.password=truststore-password
Use SASL for authentication to ensure that only authorized clusters can connect:
# Source cluster SASL configuration
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="source-user" password="source-password";
# Destination cluster SASL configuration
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="destination-user" password="destination-password";
Effective monitoring and troubleshooting are essential for ensuring smooth cross-cluster replication.
Monitor replication lag to ensure timely data replication:
# Kafka Consumer Group command to check lag
kafka-consumer-groups.sh --bootstrap-server source-cluster-broker1:9092 --describe --group mirror-maker-group
Common issues include network connectivity problems, configuration errors, and security misconfigurations. Check logs for error messages and validate configuration settings.
# Check MirrorMaker logs
tail -f /path/to/mirror-maker/logs.log
# Validate configuration
kafka-configs.sh --zookeeper localhost:2181 --describe --all
Here’s an example of using MirrorMaker to replicate data between two clusters:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "source-cluster-broker1:9092,source-cluster-broker2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
try (KafkaProducer producer = new KafkaProducer<>(props)) {
ProducerRecord record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record, (RecordMetadata metadata, Exception exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent successfully to topic " + metadata.topic());
}
});
}
}
}
Cross-cluster replication is a powerful feature in Kafka that ensures data availability across geographically dispersed clusters. By properly configuring replication, handling security, and monitoring the process, you can achieve a robust and resilient Kafka setup.