Kafka Intermediate: Kafka Streams DSL vs Processor API

Kafka Streams provides two main APIs for building stream processing applications:

Each API has its own use cases, benefits, and flexibility. Let’s explore the differences and when to use each approach.

1. Kafka Streams DSL (Domain-Specific Language)

The Kafka Streams DSL is a high-level abstraction that simplifies stream processing with fluent, functional operations. It allows you to perform common tasks such as filtering, mapping, and aggregations with a simple and declarative API.

Key Features:

Java Code Example (DSL):


import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class StreamDSLExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-dsl-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream("input-topic");

        stream.filter((key, value) -> value.contains("important"))
              .mapValues(value -> value.toUpperCase())
              .to("output-topic");

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

    

2. Kafka Streams Processor API

The Processor API is a lower-level API that provides more flexibility and control over the processing logic. You can implement custom processors, define complex topologies, and manually handle state management.

Key Features:

Java Code Example (Processor API):


import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TopologyBuilder;

import java.util.Properties;

public class ProcessorAPIExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-processor-example");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        Topology topology = new Topology();
        topology.addSource("Source", "input-topic")
                .addProcessor("Process", MyProcessor::new, "Source")
                .addSink("Sink", "output-topic", "Process");

        KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

    static class MyProcessor extends AbstractProcessor {
        @Override
        public void init(ProcessorContext context) {
            super.init(context);
        }

        @Override
        public void process(String key, String value) {
            if (value.contains("important")) {
                context().forward(key, value.toUpperCase());
            }
        }

        @Override
        public void close() {
            // Close resources if needed
        }
    }
}

    

3. When to Use DSL vs Processor API

4. Conclusion

The Kafka Streams DSL simplifies stream processing for most use cases, while the Processor API gives you more control when needed. By understanding the strengths of both APIs, you can choose the right tool for your stream processing requirements.