This guide covers Kafka message formats and serialization, which are essential for sending and receiving data in Kafka topics. Proper serialization ensures that data is correctly encoded and decoded across different systems.
Kafka messages are composed of the following components:
Serialization is the process of converting an object into a format suitable for storage or transmission (e.g., converting an object to a byte array). Deserialization is the reverse process, converting bytes back into an object. Kafka uses serializers and deserializers to handle the conversion of keys and values.
Here’s a Java example demonstrating how to use Kafka's StringSerializer and StringDeserializer to send and receive string messages:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
import java.util.Collections;
public class KafkaSerializationExample {
public static void main(String[] args) {
// Properties for Kafka Producer
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create KafkaProducer
KafkaProducer producer = new KafkaProducer<>(producerProps);
// Create a ProducerRecord
ProducerRecord record = new ProducerRecord<>("my-topic", "key1", "Hello, Kafka!");
// Send the record
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Sent record to partition %d with offset %d%n",
metadata.partition(), metadata.offset());
}
});
// Close the producer
producer.close();
// Properties for Kafka Consumer
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Create KafkaConsumer
KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(Collections.singletonList("my-topic"));
// Poll for new records
while (true) {
consumer.poll(100).forEach(record -> {
System.out.printf("Received record: key=%s value=%s partition=%d offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
});
}
}
}
The following diagram illustrates the structure of a Kafka message, including its key, value, timestamp, and headers.
Diagram: Kafka Message Format
Understanding Kafka message formats and serialization is crucial for effective data exchange in Kafka. Proper serialization ensures compatibility and efficiency in sending and receiving data across different systems.