Kafka Python Example

PYTHON

Create Kafka Topic, Produce, and Consume Messages

1. Creating Kafka Topic:

        
from confluent_kafka.admin import AdminClient, NewTopic

# Kafka broker configuration
bootstrap_servers = 'localhost:9092'

# Create an AdminClient for topic creation
admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})

# Create a new topic named 'my_topic' with 3 partitions and a replication factor of 1
new_topic = NewTopic('my_topic', num_partitions=3, replication_factor=1)
admin_client.create_topics([new_topic])
        
    

2. Producing Messages:

        
from confluent_kafka import Producer

# Producer configuration
producer_config = {
    'bootstrap.servers': bootstrap_servers,
    'client.id': 'python-producer'
}

# Create a Kafka producer
producer = Producer(producer_config)

# Produce messages to the 'my_topic' topic
for i in range(5):
    message_value = f'Message {i}'
    producer.produce('my_topic', key=str(i), value=message_value)

# Wait for any outstanding messages to be delivered and delivery reports received
producer.flush()
        
    

3. Consuming Messages:

        
from confluent_kafka import Consumer, KafkaError

# Consumer configuration
consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'python-consumer',
    'auto.offset.reset': 'earliest'  # Start consuming from the beginning of the topic
}

# Create a Kafka consumer
consumer = Consumer(consumer_config)

# Subscribe to the 'my_topic' topic
consumer.subscribe(['my_topic'])

# Consume messages from the 'my_topic' topic
try:
    while True:
        msg = consumer.poll(1.0)  # Poll for messages with a timeout of 1 second
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event, not an error
                continue
            else:
                print(f'Error: {msg.error()}')
                break
        print(f'Received message: Key={msg.key()}, Value={msg.value()}, Partition={msg.partition()}, Offset={msg.offset()}')

except KeyboardInterrupt:
    pass
finally:
    # Close down consumer to commit final offsets.
    consumer.close()