This example demonstrates how to use Apache Kafka Streams for a typical ETL (Extract, Transform, Load) pipeline. Kafka Streams is a lightweight, easy-to-use library that allows us to build real-time applications using Apache Kafka.
ETL stands for Extract, Transform, Load. In a typical data pipeline:
Kafka Streams API is a client library for processing and analyzing data stored in Kafka. It provides features like:
In this example, we will:
Ensure that you include the Kafka Streams library in your project. In Maven, add the following dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.4.0</version>
</dependency>
The following is a complete Java example of an ETL pipeline using Kafka Streams API:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class KafkaETLStreamExample {
public static void main(String[] args) {
// Define properties for the Kafka Streams application
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-etl-stream-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Create a StreamsBuilder instance
StreamsBuilder builder = new StreamsBuilder();
// Extract: Read data from the input topic 'input-topic'
KStream<String, String> sourceStream = builder.stream("input-topic");
// Transform: Filter records where the value is not null and map values to uppercase
KStream<String, String> transformedStream = sourceStream
.filter((key, value) -> value != null)
.mapValues(value -> value.toUpperCase());
// Load: Write the transformed data to the output topic 'output-topic'
transformedStream.to("output-topic");
// Build and start the Kafka Streams application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Add shutdown hook to gracefully stop the streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
StreamsConfig
: Configures the Kafka Streams application, including the application ID and Kafka broker details.StreamsBuilder
: Used to construct the processing topology, where we define the source topic, transformation logic, and target topic.filter
: Filters out records where the value is null.mapValues
: Transforms the value of each record to uppercase.to
: Sends the transformed records to the specified Kafka topic (output-topic).KafkaStreams
: The client instance that executes the stream processing application.bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
input-topic
:
bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
> hello world
> kafka streams example
output-topic
:
bin/kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning
In this example, we created a basic ETL pipeline using Kafka Streams. Data was extracted from one topic, transformed by filtering and mapping operations, and loaded into another topic. Kafka Streams makes it easy to build real-time data pipelines with minimal code.