User signs up with email, producer watches for new emails and sends to Kafka, consumer reads from Kafka and processes them.
main.py -> emails.txt -> producer.py -> Kafka topic -> consumer.py
pip install kafka-python
Create 3 files:
kafka-demo/
main.py
producer.py
consumer.py
Create Kafka topic first:
bin/kafka-topics.sh --create --topic demo-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Takes email from user and saves to emails.txt. Nothing Kafka related here.
def signup_user():
while True:
email = input("Enter your email (or exit to quit): ")
if email.lower() == "exit":
break
with open("emails.txt", "a") as file:
file.write(email + "\\n")
print(f"Email {email} stored successfully.")
if __name__ == "__main__":
signup_user()
Checks emails.txt every 2 seconds. Sends any new email to Kafka that was not sent before.
from kafka import KafkaProducer
import time, os
KAFKA_TOPIC = "demo-topic"
KAFKA_SERVER = "localhost:9092"
EMAIL_FILE = "emails.txt"
def send_to_kafka():
producer = KafkaProducer(bootstrap_servers=KAFKA_SERVER)
seen_emails = set()
while True:
if os.path.exists(EMAIL_FILE):
with open(EMAIL_FILE, "r", encoding="utf-8") as file:
for email in file:
email = email.strip()
if email and email not in seen_emails:
producer.send(KAFKA_TOPIC, email.encode("utf-8"))
print(f"Sent: {email} to topic {KAFKA_TOPIC}")
seen_emails.add(email)
time.sleep(2)
if __name__ == "__main__":
send_to_kafka()