!DOCTYPE html
html lang=en
head
meta charset=UTF-8
meta name=viewport content=width=device-width, initial-scale=1.0
titleKafka Simple Streams Applicationtitle
style
body {
font-family Arial, Calibri, sans-serif;
line-height 1.6;
}
h1, h2 {
color #333;
}
pre {
background-color #f4f4f4;
padding 10px;
border-radius 5px;
}
code {
font-family Consolas, Courier New, monospace;
}
style
head
body
h1Kafka Simple Streams Applicationh1
pA simple Kafka Streams application demonstrates the fundamental capabilities of stream processing with Kafka. This example will guide you through creating a basic Kafka Streams application that processes messages from an input topic and writes results to an output topic.p
h21. Overviewh2
pIn this example, we will build a Kafka Streams application that reads messages from an input topic, processes the messages (e.g., converting them to uppercase), and writes the processed messages to an output topic.p
h22. Project Setuph2
pEnsure you have the following dependencies in your Maven `pom.xml`p
precode
<dependency>
<groupId>org.apache.kafka<groupId>
<artifactId>kafka-streams<artifactId>
<version>3.5.0<version>
<dependency>
codepre
h23. Kafka Streams Application Codeh2
pHere's a basic Kafka Streams application in Javap
precode
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
public class SimpleStreamsApp {
public static void main(String[] args) {
Define the stream processing topology
StreamsBuilder builder = new StreamsBuilder();
Create a stream from the input topic
KStreamString, String input = builder.stream(input-topic);
Process the stream by converting messages to uppercase
KStreamString, String processed = input.mapValues(value - value.toUpperCase());
Write the processed stream to the output topic
processed.to(output-topic, Produced.with(Serdes.String(), Serdes.String()));
Define the application properties
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, simple-streams-app);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, localhost9092);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Build the Kafka Streams instance
KafkaStreams streams = new KafkaStreams(builder.build(), props);
Start the streams application
streams.start();
Add a shutdown hook to gracefully stop the application
Runtime.getRuntime().addShutdownHook(new Thread(streamsclose));
}
}
codepre
h24. Explanationh2
ul
listrongStreamsBuilderstrong Used to define the processing topology.li
listrongKStreamstrong Represents a stream of records from the input topic.li
listrongmapValuesstrong Transforms the values of the stream by applying a function (e.g., converting to uppercase).li
listrongProducedstrong Specifies the output topic and the serialization format for the key and value.li
listrongStreamsConfigstrong Configures the Kafka Streams application with properties such as application ID and Kafka bootstrap servers.li
listrongKafkaStreamsstrong Represents the Kafka Streams instance that is started and managed throughout the application lifecycle.li
ul
h25. Running the Applicationh2
pTo run the Kafka Streams applicationp
ol
liEnsure Kafka is running and the `input-topic` and `output-topic` exist.li
liCompile and run the Java application.li
liProduce some test messages to the `input-topic`.li
liConsume messages from the `output-topic` to see the processed results.li
ol
h26. Conclusionh2
pThis basic Kafka Streams application demonstrates how to set up a stream processing pipeline with Kafka. By processing messages in real-time and writing results to an output topic, you can build powerful data processing applications using Kafka Streams.p
body
html