Kafka Message Format and Serialization

This guide covers Kafka message formats and serialization, which are essential for sending and receiving data in Kafka topics. Proper serialization ensures that data is correctly encoded and decoded across different systems.

1. Kafka Message Format

Kafka messages are composed of the following components:

2. Serialization and Deserialization

Serialization is the process of converting an object into a format suitable for storage or transmission (e.g., converting an object to a byte array). Deserialization is the reverse process, converting bytes back into an object. Kafka uses serializers and deserializers to handle the conversion of keys and values.

Common Serialization Formats:

3. Example: Serialization with Java

Here’s a Java example demonstrating how to use Kafka's StringSerializer and StringDeserializer to send and receive string messages:


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;
import java.util.Collections;

public class KafkaSerializationExample {
    public static void main(String[] args) {
        // Properties for Kafka Producer
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Create KafkaProducer
        KafkaProducer producer = new KafkaProducer<>(producerProps);

        // Create a ProducerRecord
        ProducerRecord record = new ProducerRecord<>("my-topic", "key1", "Hello, Kafka!");
        
        // Send the record
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
            } else {
                System.out.printf("Sent record to partition %d with offset %d%n",
                                  metadata.partition(), metadata.offset());
            }
        });

        // Close the producer
        producer.close();

        // Properties for Kafka Consumer
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Create KafkaConsumer
        KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList("my-topic"));

        // Poll for new records
        while (true) {
            consumer.poll(100).forEach(record -> {
                System.out.printf("Received record: key=%s value=%s partition=%d offset=%d%n",
                                  record.key(), record.value(), record.partition(), record.offset());
            });
        }
    }
}
    

4. Kafka Message Format Diagram

The following diagram illustrates the structure of a Kafka message, including its key, value, timestamp, and headers.

Kafka Message Format Diagram

Diagram: Kafka Message Format

5. Conclusion

Understanding Kafka message formats and serialization is crucial for effective data exchange in Kafka. Proper serialization ensures compatibility and efficiency in sending and receiving data across different systems.