Building a Simple Kafka Producer and Consumer using Python

Building a Simple Kafka Producer and Consumer using Python

Let’s create a simple mini project to interact with Kafka using Python.

We will build a simple message Producer and Consumer. Mini projects like this are helpful when you quickly want to test some behaviour of Kafka. So let’s get started.

First off, let’s spin up all the required components using this docker-compose.yml. It will start Zookeeper, Kafka and Kafdrop containers in your system.

Let’s create our message Producer now.

We need to import KafkaProducer from the kafka library.

We have to specify the address of our Kafka server (which we created in the above step) while creating a KafkaProducer.

We need to pass these minimum parameters in the producer.send() method:

Topic name : we will use the name my_test_topic

value : the message itself

key (optional) : the key of the message

from kafka import KafkaProducer

bootstrap_servers = [localhost:29092]
topic_name = my_test_topic

producer = KafkaProducer(bootstrap_servers = bootstrap_servers)

future = producer.send(topic_name, key=b1, value=bThis is message 1)

metadata = future.get()

print(message sent successfully on topic %s partition %s % (metadata.topic, metadata.partition))

When we run our Producer, the specified Kafka topic (my_test_topic) gets created automatically and the message is sent to the topic.

Let’s create the Consumer now. We need the server and topic name as above.

from kafka import KafkaConsumer
import sys

bootstrap_servers = [localhost:29092]
topic_name = my_test_topic

consumer = KafkaConsumer(
bootstrap_servers = bootstrap_servers,
auto_offset_reset = earliest,
enable_auto_commit = True,
group_id = my-group-1

for message in consumer:
print (Message received – %s:%d:%d: key=%s value=%s % (message.topic, message.partition, message.offset, message.key, message.value))
except KeyboardInterrupt:

Run the Consumer and it will receive the message sent by the Producer.

Thanks for reading! The source code is available in this GitHub repo.