This guide covers the basics of using the Kafka Consumer API, which is responsible for reading records from Kafka topics. Understanding how to configure and use the Consumer API is essential for processing data from Kafka.
A Kafka consumer is an application or component that reads records from Kafka topics. Consumers subscribe to one or more topics and process the records they receive. They are an integral part of the Kafka ecosystem for consuming and processing streaming data.
Kafka consumers are configured using various properties to control their behavior. Key configuration options include:
Below is a Java example demonstrating how to configure and use a Kafka consumer to read messages from a Kafka topic.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// Consumer configuration
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// Create KafkaConsumer
KafkaConsumer consumer = new KafkaConsumer<>(props);
// Subscribe to topic
consumer.subscribe(Collections.singletonList("my-topic"));
// Poll for new records
while (true) {
consumer.poll(100).forEach(record -> {
System.out.printf("Received record: key=%s value=%s partition=%d offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
});
}
}
}
The following diagram illustrates the key components involved in Kafka consumer configuration, including the Kafka broker connection, deserialization, and offset management.
Diagram: Kafka Consumer Configuration Components
The Kafka Consumer API is essential for reading data from Kafka topics. By understanding and applying the various configuration options, you can efficiently and reliably process streaming data from Kafka.