Kafka Streams provides two main APIs for building stream processing applications:
Each API has its own use cases, benefits, and flexibility. Let’s explore the differences and when to use each approach.
The Kafka Streams DSL is a high-level abstraction that simplifies stream processing with fluent, functional operations. It allows you to perform common tasks such as filtering, mapping, and aggregations with a simple and declarative API.
map
, filter
, groupByKey
, aggregate
, etc.
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamDSLExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-dsl-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream stream = builder.stream("input-topic");
stream.filter((key, value) -> value.contains("important"))
.mapValues(value -> value.toUpperCase())
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
The Processor API is a lower-level API that provides more flexibility and control over the processing logic. You can implement custom processors, define complex topologies, and manually handle state management.
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TopologyBuilder;
import java.util.Properties;
public class ProcessorAPIExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-processor-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
Topology topology = new Topology();
topology.addSource("Source", "input-topic")
.addProcessor("Process", MyProcessor::new, "Source")
.addSink("Sink", "output-topic", "Process");
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
static class MyProcessor extends AbstractProcessor {
@Override
public void init(ProcessorContext context) {
super.init(context);
}
@Override
public void process(String key, String value) {
if (value.contains("important")) {
context().forward(key, value.toUpperCase());
}
}
@Override
public void close() {
// Close resources if needed
}
}
}
The Kafka Streams DSL simplifies stream processing for most use cases, while the Processor API gives you more control when needed. By understanding the strengths of both APIs, you can choose the right tool for your stream processing requirements.