Kafka AdminClient API
The Kafka AdminClient API provides a programmatic way to manage and inspect Kafka resources such as topics, brokers, and configurations. It allows you to perform administrative operations on your Kafka cluster, such as creating or deleting topics, describing cluster metadata, and altering configurations.
1. Overview
The AdminClient API is part of the Kafka client library and provides methods for interacting with Kafka brokers. It is useful for managing Kafka resources programmatically and can be used in applications or scripts to automate cluster management tasks.
Key Features
- Creating and Deleting Topics: Manage Kafka topics by creating new ones or deleting existing ones.
- Describing Topics: Retrieve metadata about Kafka topics, including partitions and configurations.
- Listing Topics: List all topics in the Kafka cluster.
- Managing Configurations: Alter topic and broker configurations.
2. Basic Operations
Here are some common operations you can perform using the AdminClient API:
Creating a Topic
To create a new topic, you need to define topic configurations such as the number of partitions and replication factor. Here is an example in Java:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import java.util.Collections;
import java.util.Properties;
public class CreateTopicExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
result.all().get();
System.out.println("Topic created successfully.");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Listing Topics
To list all topics in the Kafka cluster, you can use the `listTopics` method:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
import java.util.Properties;
import java.util.Set;
public class ListTopicsExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
ListTopicsResult result = adminClient.listTopics();
Set topics = result.names().get();
System.out.println("Topics: " + topics);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Describing a Topic
To retrieve metadata about a topic, such as partitions and configurations, use the `describeTopics` method:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import java.util.Properties;
import java.util.Map;
public class DescribeTopicExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singleton("my-topic"));
Map descriptions = result.all().get();
System.out.println("Topic description: " + descriptions.get("my-topic"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. Advanced Operations
In addition to basic operations, the AdminClient API supports advanced functionalities:
Altering Configurations
Modify existing topic or broker configurations using the `alterConfigs` method:
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import java.util.Properties;
import java.util.Collections;
import java.util.Map;
public class AlterConfigExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
Config config = new Config(Collections.singleton(new ConfigEntry("cleanup.policy", "compact")));
AlterConfigsResult result = adminClient.alterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "my-topic"), config));
result.all().get();
System.out.println("Configuration altered successfully.");
} catch (Exception e) {
e.printStackTrace();
}
}
}
4. Error Handling
Handling errors effectively is crucial for using the AdminClient API. Common errors include:
- TimeoutException: Indicates that a request took too long to complete. This can be due to network issues or overloaded brokers.
- InvalidTopicException: Occurs when a topic does not exist or is incorrectly specified.
- NotControllerException: Indicates that the client is not connected to the Kafka controller, which is needed for certain administrative operations.
Troubleshooting Tips
- Check Logs: Review Kafka broker and client logs for error details.
- Increase Timeouts: Adjust timeout settings if experiencing frequent timeouts.
- Verify Configurations: Ensure that topic and broker configurations are correctly specified.