1. Python + Kafka


What We Are Building

User signs up with email, producer watches for new emails and sends to Kafka, consumer reads from Kafka and processes them.

main.py -> emails.txt -> producer.py -> Kafka topic -> consumer.py

Setup

pip install kafka-python

Create 3 files:

kafka-demo/
    main.py
    producer.py
    consumer.py

Create Kafka topic first:

bin/kafka-topics.sh --create --topic demo-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

main.py

Takes email from user and saves to emails.txt. Nothing Kafka related here.

def signup_user():
    while True:
        email = input("Enter your email (or exit to quit): ")
        if email.lower() == "exit":
            break
        with open("emails.txt", "a") as file:
            file.write(email + "\\n")
        print(f"Email {email} stored successfully.")

if __name__ == "__main__":
    signup_user()

producer.py

Checks emails.txt every 2 seconds. Sends any new email to Kafka that was not sent before.

from kafka import KafkaProducer
import time, os

KAFKA_TOPIC = "demo-topic"
KAFKA_SERVER = "localhost:9092"
EMAIL_FILE = "emails.txt"

def send_to_kafka():
    producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER)
    seen_emails = set()

    while True:
        if os.path.exists(EMAIL_FILE):
            with open(EMAIL_FILE, "r", encoding="utf-8") as file:
                for email in file:
                    email = email.strip()
                    if email and email not in seen_emails:
                        producer.send(KAFKA_TOPIC, email.encode("utf-8"))
                        print(f"Sent: {email} to topic {KAFKA_TOPIC}")
                        seen_emails.add(email)
        time.sleep(2)

if __name__ == "__main__":
    send_to_kafka()