Apache Kafka Python Guide: Real-time Streaming and Data Processing
⏱️ 25 min
📊 Advanced
Apache Kafka Python Guide: Real-time Streaming and Data Processing
Learn real-time streaming development and data processing techniques using Apache Kafka with Python and apply them to real projects.
📖 Table of Contents
- Choosing Python Kafka Libraries
- Basic Usage of kafka-python
- High-Performance Processing with confluent-kafka
- Faust Stream Processing
- Hands-on: Python-based Streaming System
- Performance Optimization and Monitoring
- Learning Summary
🐍 Choosing Python Kafka Libraries
Major Library Comparison
Library | Features | Pros | Cons | Use Cases |
---|---|---|---|---|
kafka-python | Pure Python implementation | Easy installation, simple to understand | Performance limitations | Learning, prototyping |
confluent-kafka | C library wrapper | High performance, stability | Complex installation | Production environments |
faust | Streaming-focused | Simple streaming | Limited features | Stream processing |
aiokafka | Async processing | Async support | Complexity | High-performance async |
Installation Methods
# Install kafka-python
pip install kafka-python
# Install confluent-kafka (recommended)
pip install confluent-kafka
# Install faust
pip install faust[rocksdb]
# Install aiokafka
pip install aiokafka
# Additional dependencies
pip install pandas numpy asyncio
⚡ Basic Usage of kafka-python
1. Producer Implementation
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time
import logging
# Logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PythonKafkaProducer:
def __init__(self, bootstrap_servers='localhost:9092'):
self.producer = KafkaProducer(
bootstrap_servers=[bootstrap_servers],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
# Performance optimization settings
batch_size=16384, # 16KB
linger_ms=5, # 5ms wait
compression_type='snappy', # Compression
acks='all', # All replicas confirmation
retries=3, # Retry count
retry_backoff_ms=100, # Retry interval
# Safety settings
enable_idempotence=True, # Idempotence guarantee
max_in_flight_requests_per_connection=5,
request_timeout_ms=30000,
)
def send_message(self, topic, key, value):
"""Send message"""
try:
future = self.producer.send(topic, key=key, value=value)
record_metadata = future.get(timeout=10)
logger.info(f"Message sent to {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
return True
except KafkaError as e:
logger.error(f"Failed to send message: {e}")
return False
def send_batch(self, topic, messages):
"""Send batch messages"""
futures = []
for key, value in messages:
future = self.producer.send(topic, key=key, value=value)
futures.append(future)
# Wait for all messages to be sent
for future in futures:
try:
future.get(timeout=10)
except KafkaError as e:
logger.error(f"Batch send failed: {e}")
return False
return True
def close(self):
"""Close producer"""
self.producer.flush() # Send remaining messages
self.producer.close()
# Usage example
if __name__ == "__main__":
producer = PythonKafkaProducer()
# Send single message
producer.send_message(
topic="user-events",
key="user123",
value={"action": "login", "timestamp": time.time()}
)
# Send batch messages
messages = [
("user123", {"action": "view", "page": "/home"}),
("user456", {"action": "purchase", "amount": 99.99}),
("user789", {"action": "logout", "timestamp": time.time()})
]
producer.send_batch("user-events", messages)
producer.close()
2. Consumer Implementation
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
from typing import Dict, Any, Callable
class PythonKafkaConsumer:
def __init__(self, bootstrap_servers='localhost:9092', group_id='python-consumer'):
self.consumer = KafkaConsumer(
bootstrap_servers=[bootstrap_servers],
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m else None,
# Offset management
auto_offset_reset='earliest', # earliest, latest, none
enable_auto_commit=False, # Manual offset commit
# Performance settings
max_poll_records=500, # Records to process at once
session_timeout_ms=30000, # Session timeout
heartbeat_interval_ms=3000, # Heartbeat interval
# Retry settings
retry_backoff_ms=100,
request_timeout_ms=30000,
)
self.message_handler = None
def subscribe(self, topics):
"""Subscribe to topics"""
self.consumer.subscribe(topics)
logger.info(f"Subscribed to topics: {topics}")
def set_message_handler(self, handler: Callable[[str, Any], None]):
"""Set message handler"""
self.message_handler = handler
def start_consuming(self):
"""Start consuming messages"""
try:
while True:
message_batch = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items():
for message in messages:
self._process_message(message)
# Commit offset
self.consumer.commit()
except KeyboardInterrupt:
logger.info("Consumer stopped by user")
except Exception as e:
logger.error(f"Consumer error: {e}")
finally:
self.consumer.close()
def _process_message(self, message):
"""Process message"""
try:
logger.info(f"Received message: {message.key} -> {message.value}")
if self.message_handler:
self.message_handler(message.key, message.value)
else:
# Default processing
self._default_handler(message.key, message.value)
except Exception as e:
logger.error(f"Error processing message: {e}")
def _default_handler(self, key, value):
"""Default message handler"""
print(f"Key: {key}, Value: {value}")
# Usage example
def custom_message_handler(key, value):
"""Custom message handler"""
print(f"Processing: {key} -> {value}")
# Implement actual processing logic here
if __name__ == "__main__":
consumer = PythonKafkaConsumer()
consumer.subscribe(['user-events', 'order-events'])
consumer.set_message_handler(custom_message_handler)
consumer.start_consuming()
🚀 High-Performance Processing with confluent-kafka
1. High-Performance Producer
from confluent_kafka import Producer, Consumer, KafkaError
import json
import time
import threading
from collections import defaultdict
class ConfluentKafkaProducer:
def __init__(self, config):
self.producer = Producer(config)
self.delivery_callback_count = 0
self.delivery_callback_errors = 0
def delivery_callback(self, err, msg):
"""Delivery result callback"""
if err is not None:
self.delivery_callback_errors += 1
print(f'Message delivery failed: {err}')
else:
self.delivery_callback_count += 1
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
def produce_message(self, topic, key, value):
"""Produce message"""
try:
self.producer.produce(
topic,
key=key,
value=json.dumps(value).encode('utf-8'),
callback=self.delivery_callback
)
# Poll for async delivery
self.producer.poll(0)
except BufferError as e:
print(f'Producer queue is full: {e}')
self.producer.flush()
raise
def flush(self):
"""Flush remaining messages"""
self.producer.flush()
# Configuration example
producer_config = {
'bootstrap.servers': 'localhost:9092',
'compression.type': 'snappy',
'batch.size': 16384,
'linger.ms': 5,
'acks': 'all',
'retries': 3,
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
}
producer = ConfluentKafkaProducer(producer_config)
2. High-Performance Consumer
class ConfluentKafkaConsumer:
def __init__(self, config):
self.consumer = Consumer(config)
self.running = True
def subscribe(self, topics):
"""Subscribe to topics"""
self.consumer.subscribe(topics)
def consume_messages(self, timeout=1.0):
"""Consume messages"""
try:
while self.running:
msg = self.consumer.poll(timeout=timeout)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Reached end of partition
continue
else:
print(f"Consumer error: {msg.error()}")
continue
# Process message
self.process_message(msg)
except KeyboardInterrupt:
print("Consumer interrupted by user")
finally:
self.close()
def process_message(self, msg):
"""Process message"""
try:
key = msg.key().decode('utf-8') if msg.key() else None
value = json.loads(msg.value().decode('utf-8'))
print(f"Received message: {key} -> {value}")
# Implement actual processing logic here
except Exception as e:
print(f"Error processing message: {e}")
def close(self):
"""Close consumer"""
self.running = False
self.consumer.close()
# Configuration example
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'python-consumer-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.records': 500,
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 3000,
}
consumer = ConfluentKafkaConsumer(consumer_config)
consumer.subscribe(['user-events'])
consumer.consume_messages()
🌊 Faust Stream Processing
1. Faust Basic Configuration
import faust
from faust import Record
import asyncio
from typing import Optional
# Faust app configuration
app = faust.App(
'kafka-streaming-app',
broker='kafka://localhost:9092',
store='rocksdb://', # State store
value_serializer='json',
)
# Data model definition
class UserEvent(Record):
user_id: str
action: str
timestamp: float
metadata: Optional[dict] = None
class OrderEvent(Record):
order_id: str
user_id: str
amount: float
items: list
timestamp: float
# Topic definition
user_events_topic = app.topic('user-events', value_type=UserEvent)
order_events_topic = app.topic('order-events', value_type=OrderEvent)
processed_events_topic = app.topic('processed-events', value_type=UserEvent)
# Table definition (state store)
user_activity_table = app.Table('user-activity', default=int)
order_summary_table = app.Table('order-summary', default=dict)
2. Stream Processing Example
@app.agent(user_events_topic)
async def process_user_events(events):
"""Process user events"""
async for event in events:
# Process event
print(f"Processing user event: {event.user_id} - {event.action}")
# Update state
user_activity_table[event.user_id] += 1
# Transform event
processed_event = UserEvent(
user_id=event.user_id,
action=f"processed_{event.action}",
timestamp=event.timestamp,
metadata={"processed_by": "faust"}
)
# Send result
await processed_events_topic.send(value=processed_event)
@app.agent(order_events_topic)
async def process_order_events(orders):
"""Process order events"""
async for order in orders:
print(f"Processing order: {order.order_id} - ${order.amount}")
# Update order summary
if order.user_id not in order_summary_table:
order_summary_table[order.user_id] = {
'total_orders': 0,
'total_amount': 0.0,
'last_order_time': 0.0
}
summary = order_summary_table[order.user_id]
summary['total_orders'] += 1
summary['total_amount'] += order.amount
summary['last_order_time'] = order.timestamp
# Classify by order amount
if order.amount > 100:
await app.topic('high-value-orders').send(value=order)
else:
await app.topic('regular-orders').send(value=order)
# Window aggregation
@app.timer(interval=60.0) # Run every 1 minute
async def aggregate_metrics():
"""Aggregate metrics"""
print("=== User Activity Summary ===")
for user_id, count in user_activity_table.items():
print(f"User {user_id}: {count} events")
print("=== Order Summary ===")
for user_id, summary in order_summary_table.items():
print(f"User {user_id}: {summary['total_orders']} orders, ${summary['total_amount']:.2f}")
if __name__ == '__main__':
app.main()
3. Advanced Stream Processing
from faust import Stream, Table
from faust.types import StreamT
# Stream join
@app.agent(user_events_topic)
async def join_user_orders(events):
"""Join user events with orders"""
async for event in events:
# Get user order information
order_summary = order_summary_table.get(event.user_id, {})
# Process joined data
enriched_event = {
'user_id': event.user_id,
'action': event.action,
'timestamp': event.timestamp,
'user_orders': order_summary.get('total_orders', 0),
'user_spent': order_summary.get('total_amount', 0.0)
}
print(f"Enriched event: {enriched_event}")
# Window aggregation
@app.agent(user_events_topic)
async def window_aggregation(events):
"""Time window aggregation"""
async for event in events:
# Aggregate by 5-minute window
window = event.timestamp // 300 # 5 minutes = 300 seconds
# Count by window
window_key = f"window_{window}"
user_activity_table[window_key] += 1
# Send result when window is complete
if window_key not in processed_windows:
processed_windows.add(window_key)
await app.topic('window-results').send(
value={'window': window, 'count': user_activity_table[window_key]}
)
processed_windows = set()
🛠️ Hands-on: Python-based Streaming System
1. Real-time Log Processing System
import asyncio
import json
import time
import logging
from datetime import datetime
from typing import Dict, Any
import pandas as pd
class LogProcessingSystem:
def __init__(self):
self.producer = ConfluentKafkaProducer(producer_config)
self.consumer = ConfluentKafkaConsumer(consumer_config)
self.log_stats = defaultdict(int)
async def start_log_collector(self):
"""Start log collector"""
while True:
# Read from log file (in practice, monitor file)
log_line = self.simulate_log_line()
if log_line:
self.producer.produce_message('raw-logs', None, log_line)
await asyncio.sleep(0.1) # 100ms interval
def simulate_log_line(self):
"""Simulate log line"""
import random
levels = ['INFO', 'WARN', 'ERROR', 'DEBUG']
actions = ['login', 'logout', 'purchase', 'view', 'search']
return {
'timestamp': datetime.now().isoformat(),
'level': random.choice(levels),
'message': f"User {random.randint(1, 1000)} performed {random.choice(actions)}",
'ip': f"192.168.1.{random.randint(1, 255)}",
'user_agent': 'Mozilla/5.0...'
}
async def start_log_processor(self):
"""Start log processor"""
self.consumer.subscribe(['raw-logs'])
while True:
try:
msg = self.consumer.consume_messages(timeout=1.0)
if msg:
await self.process_log_message(msg)
except Exception as e:
logging.error(f"Error processing log: {e}")
async def process_log_message(self, log_data):
"""Process log message"""
try:
# Statistics by log level
level = log_data.get('level', 'UNKNOWN')
self.log_stats[level] += 1
# Special handling for error logs
if level == 'ERROR':
await self.handle_error_log(log_data)
# User behavior analysis
if 'performed' in log_data.get('message', ''):
await self.analyze_user_behavior(log_data)
# Update metrics
await self.update_metrics(log_data)
except Exception as e:
logging.error(f"Error processing log message: {e}")
async def handle_error_log(self, log_data):
"""Handle error log"""
print(f"ERROR detected: {log_data['message']}")
# Send error alert
await self.send_alert('ERROR', log_data)
async def analyze_user_behavior(self, log_data):
"""Analyze user behavior"""
message = log_data.get('message', '')
if 'login' in message:
print("User login detected")
elif 'purchase' in message:
print("Purchase activity detected")
async def send_alert(self, level, log_data):
"""Send alert"""
alert = {
'type': 'log_alert',
'level': level,
'message': log_data['message'],
'timestamp': log_data['timestamp']
}
self.producer.produce_message('alerts', level, alert)
async def update_metrics(self, log_data):
"""Update metrics"""
metrics = {
'timestamp': time.time(),
'level_counts': dict(self.log_stats),
'total_logs': sum(self.log_stats.values())
}
self.producer.produce_message('log-metrics', 'metrics', metrics)
async def start_system(self):
"""Start system"""
tasks = [
asyncio.create_task(self.start_log_collector()),
asyncio.create_task(self.start_log_processor())
]
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
print("System stopped by user")
finally:
self.producer.flush()
self.consumer.close()
# Run
if __name__ == "__main__":
system = LogProcessingSystem()
asyncio.run(system.start_system())
2. Event-driven Microservices
class EventDrivenMicroservice:
def __init__(self, service_name):
self.service_name = service_name
self.consumer = ConfluentKafkaConsumer(consumer_config)
self.producer = ConfluentKafkaProducer(producer_config)
self.running = True
async def start_service(self):
"""Start service"""
topics = ['user-events', 'order-events', 'payment-events']
self.consumer.subscribe(topics)
print(f"{self.service_name} started, listening to {topics}")
while self.running:
try:
msg = self.consumer.consume_messages(timeout=1.0)
if msg:
await self.handle_event(msg)
except Exception as e:
logging.error(f"Error in {self.service_name}: {e}")
async def handle_event(self, event_data):
"""Handle event"""
event_type = event_data.get('type')
if event_type == 'user_event':
await self.handle_user_event(event_data)
elif event_type == 'order_event':
await self.handle_order_event(event_data)
elif event_type == 'payment_event':
await self.handle_payment_event(event_data)
async def handle_user_event(self, event):
"""Handle user event"""
print(f"Processing user event: {event}")
# User event processing logic
await self.send_event('user-processed', event)
async def handle_order_event(self, event):
"""Handle order event"""
print(f"Processing order event: {event}")
# Order event processing logic
await self.send_event('order-processed', event)
async def handle_payment_event(self, event):
"""Handle payment event"""
print(f"Processing payment event: {event}")
# Payment event processing logic
await self.send_event('payment-processed', event)
async def send_event(self, topic, event_data):
"""Send event"""
self.producer.produce_message(topic, event_data.get('id'), event_data)
def stop(self):
"""Stop service"""
self.running = False
self.consumer.close()
self.producer.flush()
# Run service
if __name__ == "__main__":
service = EventDrivenMicroservice("event-processor")
asyncio.run(service.start_service())
📊 Performance Optimization and Monitoring
1. Performance Monitoring
import psutil
import time
from collections import defaultdict
import threading
class KafkaPerformanceMonitor:
def __init__(self):
self.metrics = defaultdict(list)
self.running = True
self.monitor_thread = None
def start_monitoring(self):
"""Start monitoring"""
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.start()
def _monitor_loop(self):
"""Monitoring loop"""
while self.running:
# Collect system metrics
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory().percent
disk_io = psutil.disk_io_counters()
network_io = psutil.net_io_counters()
# Store metrics
timestamp = time.time()
self.metrics['cpu'].append((timestamp, cpu_percent))
self.metrics['memory'].append((timestamp, memory_percent))
self.metrics['disk_read'].append((timestamp, disk_io.read_bytes))
self.metrics['disk_write'].append((timestamp, disk_io.write_bytes))
self.metrics['network_sent'].append((timestamp, network_io.bytes_sent))
self.metrics['network_recv'].append((timestamp, network_io.bytes_recv))
# Clean old metrics (older than 1 hour)
cutoff_time = timestamp - 3600
for metric_name in self.metrics:
self.metrics[metric_name] = [
(t, v) for t, v in self.metrics[metric_name] if t > cutoff_time
]
time.sleep(10) # Collect every 10 seconds
def get_metrics_summary(self):
"""Get metrics summary"""
summary = {}
for metric_name, values in self.metrics.items():
if values:
recent_values = [v for t, v in values[-10:]] # Last 10
summary[metric_name] = {
'current': recent_values[-1] if recent_values else 0,
'average': sum(recent_values) / len(recent_values) if recent_values else 0,
'max': max(recent_values) if recent_values else 0,
'min': min(recent_values) if recent_values else 0
}
return summary
def stop_monitoring(self):
"""Stop monitoring"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
# Usage example
monitor = KafkaPerformanceMonitor()
monitor.start_monitoring()
# Check metrics after 1 minute
time.sleep(60)
summary = monitor.get_metrics_summary()
print("Performance Summary:", summary)
monitor.stop_monitoring()
2. Error Handling and Retry
import asyncio
from functools import wraps
import random
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
"""Retry decorator"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e
# Exponential backoff
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
return None
return wrapper
return decorator
class ResilientKafkaProducer:
def __init__(self, config):
self.producer = ConfluentKafkaProducer(config)
self.dead_letter_queue = []
@retry_with_backoff(max_retries=3)
async def produce_with_retry(self, topic, key, value):
"""Produce message with retry"""
try:
self.producer.produce_message(topic, key, value)
return True
except Exception as e:
print(f"Failed to produce message: {e}")
raise e
async def produce_message(self, topic, key, value):
"""Produce message (with retry)"""
try:
await self.produce_with_retry(topic, key, value)
except Exception as e:
# Add to dead letter queue on final failure
self.dead_letter_queue.append({
'topic': topic,
'key': key,
'value': value,
'error': str(e),
'timestamp': time.time()
})
print(f"Message sent to dead letter queue: {e}")
def get_dead_letter_queue(self):
"""Get dead letter queue"""
return self.dead_letter_queue
def retry_dead_letter_queue(self):
"""Retry dead letter queue"""
retry_count = 0
for item in self.dead_letter_queue[:]:
try:
self.producer.produce_message(
item['topic'],
item['key'],
item['value']
)
self.dead_letter_queue.remove(item)
retry_count += 1
except Exception as e:
print(f"Failed to retry dead letter: {e}")
print(f"Retried {retry_count} messages from dead letter queue")
📚 Learning Summary
What We Learned in This Post
- Choosing Python Kafka Libraries
- Comparison of kafka-python, confluent-kafka, faust, aiokafka
- Features and use cases of each library
- Basic Usage of kafka-python
- Producer and consumer implementation
- Batch processing and offset management
- High-Performance Processing with confluent-kafka
- High-performance producer/consumer implementation
- Async processing and callback utilization
- Faust Stream Processing
- Stream processing and state stores
- Window aggregation and join operations
- Building Production-Grade Streaming System
- Real-time log processing system
- Event-driven microservices
- Performance Optimization and Monitoring
- Performance monitoring system
- Error handling and retry strategies
Key Concepts Summary
Concept | Description | Importance |
---|---|---|
Library Selection | Choosing the right library for project requirements | ⭐⭐⭐⭐⭐ |
Async Processing | High-performance async programming | ⭐⭐⭐⭐⭐ |
Stream Processing | Real-time data transformation and aggregation | ⭐⭐⭐⭐⭐ |
Monitoring | System status tracking and performance optimization | ⭐⭐⭐⭐ |
Practical Application Considerations
- Library Selection: Consider project requirements and performance needs
- Async Processing: Use asyncio for high-performance processing
- Error Handling: Implement retry strategies and dead letter queues
- Monitoring: Real-time metrics collection and alerting systems
With this guide, you can build high-performance streaming systems using Apache Kafka with Python! 🚀