Kafka Advanced ETL Streams Example

This example demonstrates how to use Apache Kafka Streams for a typical ETL (Extract, Transform, Load) pipeline. Kafka Streams is a lightweight, easy-to-use library that allows us to build real-time applications using Apache Kafka.

What is ETL?

ETL stands for Extract, Transform, Load. In a typical data pipeline:

Kafka Streams Overview

Kafka Streams API is a client library for processing and analyzing data stored in Kafka. It provides features like:

ETL with Kafka Streams

In this example, we will:

Dependencies

Ensure that you include the Kafka Streams library in your project. In Maven, add the following dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.4.0</version>
</dependency>

Source Code Example

The following is a complete Java example of an ETL pipeline using Kafka Streams API:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class KafkaETLStreamExample {

    public static void main(String[] args) {

        // Define properties for the Kafka Streams application
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-etl-stream-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Create a StreamsBuilder instance
        StreamsBuilder builder = new StreamsBuilder();

        // Extract: Read data from the input topic 'input-topic'
        KStream<String, String> sourceStream = builder.stream("input-topic");

        // Transform: Filter records where the value is not null and map values to uppercase
        KStream<String, String> transformedStream = sourceStream
                .filter((key, value) -> value != null)
                .mapValues(value -> value.toUpperCase());

        // Load: Write the transformed data to the output topic 'output-topic'
        transformedStream.to("output-topic");

        // Build and start the Kafka Streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // Add shutdown hook to gracefully stop the streams
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Explanation of the Code

Running the Example

  1. Ensure Kafka is running locally, with Zookeeper and Kafka brokers started.
  2. Create the input and output topics:
    bin/kafka-topics.sh --create --topic input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    bin/kafka-topics.sh --create --topic output-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  3. Run the Java application.
  4. Produce some data to the input-topic:
    bin/kafka-console-producer.sh --topic input-topic --bootstrap-server localhost:9092
    > hello world
    > kafka streams example
  5. Consume the transformed data from the output-topic:
    bin/kafka-console-consumer.sh --topic output-topic --bootstrap-server localhost:9092 --from-beginning

Conclusion

In this example, we created a basic ETL pipeline using Kafka Streams. Data was extracted from one topic, transformed by filtering and mapping operations, and loaded into another topic. Kafka Streams makes it easy to build real-time data pipelines with minimal code.