Kafka Streams provides built-in metrics that allow you to monitor the performance and behavior of your streams application. These metrics cover various aspects, including:
You can access metrics in a Kafka Streams application using the KafkaStreams.metrics() method. This returns a Map<String, Metric> of the application’s metrics, which you can log, monitor, or export to a monitoring system.
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.metrics.Metric;
import org.apache.kafka.common.metrics.Metrics;
import java.util.Map;
import java.util.Properties;
public class StreamsMetricsExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-metrics-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
// Define your streams topology here
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Access and print metrics
Map metrics = streams.metrics();
for (Map.Entry entry : metrics.entrySet()) {
System.out.println("Metric name: " + entry.getKey() + ", value: " + entry.getValue().metricValue());
}
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Kafka Streams provides several key metrics that you should monitor to ensure optimal performance:
thread-poll-total: The total number of poll calls made by a stream thread.thread-poll-time-total: The total time spent in poll calls by a stream thread.commit-total: The total number of commits made by a stream thread.process-total: The total number of records processed by a task.process-rate: The rate at which records are processed by a task.punctuate-total: The total number of punctuations triggered by a task.commit-latency-avg: The average commit latency for a task.You can export Kafka Streams metrics to external systems like Prometheus or JMX by configuring the appropriate reporter classes in the StreamsConfig. Kafka Streams supports JMX out of the box, which you can use to integrate with monitoring tools.
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
public class JMXMetricsExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "jmx-metrics-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
props.put("metric.reporters", "org.apache.kafka.common.metrics.JmxReporter");
StreamsBuilder builder = new StreamsBuilder();
// Define your streams topology here
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Kafka Streams applications expose real-time metrics that can be observed through Kafka’s JMX interface. You can use tools like JConsole or Prometheus to visualize these metrics and track: