Kafka AdminClient API

The Kafka AdminClient API provides a programmatic way to manage and inspect Kafka resources such as topics, brokers, and configurations. It allows you to perform administrative operations on your Kafka cluster, such as creating or deleting topics, describing cluster metadata, and altering configurations.

1. Overview

The AdminClient API is part of the Kafka client library and provides methods for interacting with Kafka brokers. It is useful for managing Kafka resources programmatically and can be used in applications or scripts to automate cluster management tasks.

Key Features

2. Basic Operations

Here are some common operations you can perform using the AdminClient API:

Creating a Topic

To create a new topic, you need to define topic configurations such as the number of partitions and replication factor. Here is an example in Java:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.CreateTopicsResult;

import java.util.Collections;
import java.util.Properties;

public class CreateTopicExample {

public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (AdminClient adminClient = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic("my-new-topic", 1, (short) 1);
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
result.all().get();
System.out.println("Topic created successfully.");
} catch (Exception e) {
e.printStackTrace();
}
}
}

Listing Topics

To list all topics in the Kafka cluster, you can use the `listTopics` method:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;

import java.util.Properties;
import java.util.Set;

public class ListTopicsExample {

public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (AdminClient adminClient = AdminClient.create(props)) {
ListTopicsResult result = adminClient.listTopics();
Set topics = result.names().get();
System.out.println("Topics: " + topics);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Describing a Topic

To retrieve metadata about a topic, such as partitions and configurations, use the `describeTopics` method:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;

import java.util.Properties;
import java.util.Map;

public class DescribeTopicExample {

public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (AdminClient adminClient = AdminClient.create(props)) {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singleton("my-topic"));
Map descriptions = result.all().get();
System.out.println("Topic description: " + descriptions.get("my-topic"));
} catch (Exception e) {
e.printStackTrace();
}
}
}

3. Advanced Operations

In addition to basic operations, the AdminClient API supports advanced functionalities:

Altering Configurations

Modify existing topic or broker configurations using the `alterConfigs` method:

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;

import java.util.Properties;
import java.util.Collections;
import java.util.Map;

public class AlterConfigExample {

public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (AdminClient adminClient = AdminClient.create(props)) {
Config config = new Config(Collections.singleton(new ConfigEntry("cleanup.policy", "compact")));
AlterConfigsResult result = adminClient.alterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, "my-topic"), config));
result.all().get();
System.out.println("Configuration altered successfully.");
} catch (Exception e) {
e.printStackTrace();
}
}
}

4. Error Handling

Handling errors effectively is crucial for using the AdminClient API. Common errors include:

Troubleshooting Tips