Kafka Python Example
PYTHON
Create Kafka Topic, Produce, and Consume Messages
1. Creating Kafka Topic:
from confluent_kafka.admin import AdminClient, NewTopic
# Kafka broker configuration
bootstrap_servers = 'localhost:9092'
# Create an AdminClient for topic creation
admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
# Create a new topic named 'my_topic' with 3 partitions and a replication factor of 1
new_topic = NewTopic('my_topic', num_partitions=3, replication_factor=1)
admin_client.create_topics([new_topic])
2. Producing Messages:
from confluent_kafka import Producer
# Producer configuration
producer_config = {
'bootstrap.servers': bootstrap_servers,
'client.id': 'python-producer'
}
# Create a Kafka producer
producer = Producer(producer_config)
# Produce messages to the 'my_topic' topic
for i in range(5):
message_value = f'Message {i}'
producer.produce('my_topic', key=str(i), value=message_value)
# Wait for any outstanding messages to be delivered and delivery reports received
producer.flush()
3. Consuming Messages:
from confluent_kafka import Consumer, KafkaError
# Consumer configuration
consumer_config = {
'bootstrap.servers': bootstrap_servers,
'group.id': 'python-consumer',
'auto.offset.reset': 'earliest' # Start consuming from the beginning of the topic
}
# Create a Kafka consumer
consumer = Consumer(consumer_config)
# Subscribe to the 'my_topic' topic
consumer.subscribe(['my_topic'])
# Consume messages from the 'my_topic' topic
try:
while True:
msg = consumer.poll(1.0) # Poll for messages with a timeout of 1 second
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event, not an error
continue
else:
print(f'Error: {msg.error()}')
break
print(f'Received message: Key={msg.key()}, Value={msg.value()}, Partition={msg.partition()}, Offset={msg.offset()}')
except KeyboardInterrupt:
pass
finally:
# Close down consumer to commit final offsets.
consumer.close()