Integrating Kafka Streams with machine learning (ML) allows you to process and analyze streaming data in real-time, applying ML models for predictions, classifications, or anomaly detection. This guide will cover the complete setup, including creating a Kafka Streams application, hosting an ML model with a REST API, and integrating the two components.
The architecture for integrating Kafka Streams with ML involves the following components:
First, ensure you have a trained ML model. For this example, we'll assume you have a model saved as a file (e.g., a scikit-learn model saved as a pickle file).
# Python code to save a trained model
import joblib
# Assuming `model` is your trained scikit-learn model
joblib.dump(model, 'model.pkl')
Host the ML model with a REST API using Flask. This API will accept data from the Kafka Streams application, perform inference, and return predictions.
# app.py (Flask)
from flask import Flask, request, jsonify
import joblib
app = Flask(__name__)
# Load the pre-trained model
model = joblib.load('model.pkl')
@app.route('/predict', methods=['POST'])
def predict():
data = request.json
features = data['features']
prediction = model.predict([features])
return jsonify({'prediction': prediction.tolist()})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
The Kafka Streams application will consume data from a Kafka topic, send the data to the ML model API for prediction, and write the results to another Kafka topic.
// KafkaStreamsMLIntegration.java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
public class KafkaStreamsMLIntegration {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-ml-integration");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Define input and output topics
String inputTopic = "input-topic";
String outputTopic = "output-topic";
// Create a KStream for processing
KStream stream = builder.stream(inputTopic);
// Define the ValueMapper to send data to the ML model
KStream processedStream = stream.mapValues(value -> {
try {
// Call the ML model REST API
URL url = new URL("http://localhost:5000/predict");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json; utf-8");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);
// Send the data to the ML model API
String jsonInputString = "{\"features\": [" + value + "]}";
try (OutputStream os = connection.getOutputStream()) {
byte[] input = jsonInputString.getBytes(StandardCharsets.UTF_8);
os.write(input, 0, input.length);
}
// Get the response from the ML model API
int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
try (java.io.BufferedReader in = new java.io.BufferedReader(new java.io.InputStreamReader(connection.getInputStream()))) {
StringBuilder response = new StringBuilder();
String line;
while ((line = in.readLine()) != null) {
response.append(line);
}
// Extract prediction from response
return response.toString();
}
} else {
return "Error: Unable to get prediction";
}
} catch (Exception e) {
e.printStackTrace();
return "Error: " + e.getMessage();
}
});
// Write the results to the output topic
processedStream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Add shutdown hook to close streams gracefully
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
HttpURLConnection
to communicate with the Flask service. The data is sent in JSON format, and the response is processed to obtain the prediction.To test the integration, follow these steps:
# Produce test data to input-topic
kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic
> 1.0,2.0,3.0,4.0
# Consume results from output-topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output-topic --from-beginning
Integrating Kafka Streams with machine learning enables real-time data processing and prediction. By setting up a Kafka Streams application and connecting it with an ML model hosted via a REST API, you can process streaming data and generate real-time insights or predictions efficiently. This approach is scalable and can be adapted to various ML models and use cases.