Dynamic configuration in Kafka allows you to modify broker, topic, and client settings without restarting Kafka services. This feature is crucial for managing and tuning Kafka clusters in real-time.
Kafka supports dynamic configuration changes through the following mechanisms:
Dynamic configuration changes are applied using Kafka's command-line tools or via the Admin API. Below are examples for each type of configuration.
To update broker configurations dynamically, use the following command:
kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type brokers --entity-name 0 --add-config max.message.bytes=2000000
This command updates the max.message.bytes
configuration for broker ID 0 to allow larger messages.
To change topic configurations dynamically, use the following command:
kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=86400000
This command sets the retention.ms
for the topic my-topic
to 1 day.
To update client configurations, modify the properties files or the configuration parameters in your producer or consumer code. For example, changing the acks
setting for a producer:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // Set to "all" for stronger durability guarantees
The Admin API provides programmatic access to manage Kafka configurations. Here’s an example using the AdminClient in Java:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.TopicConfig;
import java.util.Collections;
import java.util.Properties;
public class KafkaAdminClientExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
// Alter topic configuration
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "86400000");
AlterConfigOp op = new AlterConfigOp(retentionEntry, AlterConfigOp.OpType.SET);
Config config = new Config(Collections.singletonList(retentionEntry));
adminClient.incrementalAlterConfigs(Collections.singletonMap(new org.apache.kafka.common.Configurable.ConfigResource(
org.apache.kafka.common.Configurable.ConfigResource.Type.TOPIC, "my-topic"),
Collections.singletonList(op))).all().get();
System.out.println("Configuration updated successfully.");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Dynamic configuration is a powerful feature of Kafka that allows for flexible and real-time management of your Kafka cluster and clients. By leveraging Kafka's dynamic configuration capabilities, you can optimize performance, ensure better resource utilization, and respond quickly to changing requirements.