Apache Kafka Python Guide: Real-time Streaming and Data Processing

Learn real-time streaming development and data processing techniques using Apache Kafka with Python and apply them to real projects.

📖 Table of Contents

  1. Choosing Python Kafka Libraries
  2. Basic Usage of kafka-python
  3. High-Performance Processing with confluent-kafka
  4. Faust Stream Processing
  5. Hands-on: Python-based Streaming System
  6. Performance Optimization and Monitoring
  7. Learning Summary

🐍 Choosing Python Kafka Libraries

Major Library Comparison

Library Features Pros Cons Use Cases
kafka-python Pure Python implementation Easy installation, simple to understand Performance limitations Learning, prototyping
confluent-kafka C library wrapper High performance, stability Complex installation Production environments
faust Streaming-focused Simple streaming Limited features Stream processing
aiokafka Async processing Async support Complexity High-performance async

Installation Methods

# Install kafka-python
pip install kafka-python

# Install confluent-kafka (recommended)
pip install confluent-kafka

# Install faust
pip install faust[rocksdb]

# Install aiokafka
pip install aiokafka

# Additional dependencies
pip install pandas numpy asyncio

⚡ Basic Usage of kafka-python

1. Producer Implementation

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time
import logging

# Logging configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class PythonKafkaProducer:
    def __init__(self, bootstrap_servers='localhost:9092'):
        self.producer = KafkaProducer(
            bootstrap_servers=[bootstrap_servers],
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            # Performance optimization settings
            batch_size=16384,  # 16KB
            linger_ms=5,       # 5ms wait
            compression_type='snappy',  # Compression
            acks='all',        # All replicas confirmation
            retries=3,         # Retry count
            retry_backoff_ms=100,  # Retry interval
            # Safety settings
            enable_idempotence=True,  # Idempotence guarantee
            max_in_flight_requests_per_connection=5,
            request_timeout_ms=30000,
        )
    
    def send_message(self, topic, key, value):
        """Send message"""
        try:
            future = self.producer.send(topic, key=key, value=value)
            record_metadata = future.get(timeout=10)
            logger.info(f"Message sent to {record_metadata.topic} "
                       f"partition {record_metadata.partition} "
                       f"offset {record_metadata.offset}")
            return True
        except KafkaError as e:
            logger.error(f"Failed to send message: {e}")
            return False
    
    def send_batch(self, topic, messages):
        """Send batch messages"""
        futures = []
        for key, value in messages:
            future = self.producer.send(topic, key=key, value=value)
            futures.append(future)
        
        # Wait for all messages to be sent
        for future in futures:
            try:
                future.get(timeout=10)
            except KafkaError as e:
                logger.error(f"Batch send failed: {e}")
                return False
        return True
    
    def close(self):
        """Close producer"""
        self.producer.flush()  # Send remaining messages
        self.producer.close()

# Usage example
if __name__ == "__main__":
    producer = PythonKafkaProducer()
    
    # Send single message
    producer.send_message(
        topic="user-events",
        key="user123",
        value={"action": "login", "timestamp": time.time()}
    )
    
    # Send batch messages
    messages = [
        ("user123", {"action": "view", "page": "/home"}),
        ("user456", {"action": "purchase", "amount": 99.99}),
        ("user789", {"action": "logout", "timestamp": time.time()})
    ]
    producer.send_batch("user-events", messages)
    
    producer.close()

2. Consumer Implementation

from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
from typing import Dict, Any, Callable

class PythonKafkaConsumer:
    def __init__(self, bootstrap_servers='localhost:9092', group_id='python-consumer'):
        self.consumer = KafkaConsumer(
            bootstrap_servers=[bootstrap_servers],
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m else None,
            # Offset management
            auto_offset_reset='earliest',  # earliest, latest, none
            enable_auto_commit=False,      # Manual offset commit
            # Performance settings
            max_poll_records=500,          # Records to process at once
            session_timeout_ms=30000,      # Session timeout
            heartbeat_interval_ms=3000,    # Heartbeat interval
            # Retry settings
            retry_backoff_ms=100,
            request_timeout_ms=30000,
        )
        self.message_handler = None
    
    def subscribe(self, topics):
        """Subscribe to topics"""
        self.consumer.subscribe(topics)
        logger.info(f"Subscribed to topics: {topics}")
    
    def set_message_handler(self, handler: Callable[[str, Any], None]):
        """Set message handler"""
        self.message_handler = handler
    
    def start_consuming(self):
        """Start consuming messages"""
        try:
            while True:
                message_batch = self.consumer.poll(timeout_ms=1000)
                
                for topic_partition, messages in message_batch.items():
                    for message in messages:
                        self._process_message(message)
                
                # Commit offset
                self.consumer.commit()
                
        except KeyboardInterrupt:
            logger.info("Consumer stopped by user")
        except Exception as e:
            logger.error(f"Consumer error: {e}")
        finally:
            self.consumer.close()
    
    def _process_message(self, message):
        """Process message"""
        try:
            logger.info(f"Received message: {message.key} -> {message.value}")
            
            if self.message_handler:
                self.message_handler(message.key, message.value)
            else:
                # Default processing
                self._default_handler(message.key, message.value)
                
        except Exception as e:
            logger.error(f"Error processing message: {e}")
    
    def _default_handler(self, key, value):
        """Default message handler"""
        print(f"Key: {key}, Value: {value}")

# Usage example
def custom_message_handler(key, value):
    """Custom message handler"""
    print(f"Processing: {key} -> {value}")
    # Implement actual processing logic here

if __name__ == "__main__":
    consumer = PythonKafkaConsumer()
    consumer.subscribe(['user-events', 'order-events'])
    consumer.set_message_handler(custom_message_handler)
    consumer.start_consuming()

🚀 High-Performance Processing with confluent-kafka

1. High-Performance Producer

from confluent_kafka import Producer, Consumer, KafkaError
import json
import time
import threading
from collections import defaultdict

class ConfluentKafkaProducer:
    def __init__(self, config):
        self.producer = Producer(config)
        self.delivery_callback_count = 0
        self.delivery_callback_errors = 0
    
    def delivery_callback(self, err, msg):
        """Delivery result callback"""
        if err is not None:
            self.delivery_callback_errors += 1
            print(f'Message delivery failed: {err}')
        else:
            self.delivery_callback_count += 1
            print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
    
    def produce_message(self, topic, key, value):
        """Produce message"""
        try:
            self.producer.produce(
                topic,
                key=key,
                value=json.dumps(value).encode('utf-8'),
                callback=self.delivery_callback
            )
            # Poll for async delivery
            self.producer.poll(0)
        except BufferError as e:
            print(f'Producer queue is full: {e}')
            self.producer.flush()
            raise
    
    def flush(self):
        """Flush remaining messages"""
        self.producer.flush()

# Configuration example
producer_config = {
    'bootstrap.servers': 'localhost:9092',
    'compression.type': 'snappy',
    'batch.size': 16384,
    'linger.ms': 5,
    'acks': 'all',
    'retries': 3,
    'enable.idempotence': True,
    'max.in.flight.requests.per.connection': 5,
}

producer = ConfluentKafkaProducer(producer_config)

2. High-Performance Consumer

class ConfluentKafkaConsumer:
    def __init__(self, config):
        self.consumer = Consumer(config)
        self.running = True
    
    def subscribe(self, topics):
        """Subscribe to topics"""
        self.consumer.subscribe(topics)
    
    def consume_messages(self, timeout=1.0):
        """Consume messages"""
        try:
            while self.running:
                msg = self.consumer.poll(timeout=timeout)
                
                if msg is None:
                    continue
                
                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # Reached end of partition
                        continue
                    else:
                        print(f"Consumer error: {msg.error()}")
                        continue
                
                # Process message
                self.process_message(msg)
                
        except KeyboardInterrupt:
            print("Consumer interrupted by user")
        finally:
            self.close()
    
    def process_message(self, msg):
        """Process message"""
        try:
            key = msg.key().decode('utf-8') if msg.key() else None
            value = json.loads(msg.value().decode('utf-8'))
            
            print(f"Received message: {key} -> {value}")
            
            # Implement actual processing logic here
            
        except Exception as e:
            print(f"Error processing message: {e}")
    
    def close(self):
        """Close consumer"""
        self.running = False
        self.consumer.close()

# Configuration example
consumer_config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'python-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'max.poll.records': 500,
    'session.timeout.ms': 30000,
    'heartbeat.interval.ms': 3000,
}

consumer = ConfluentKafkaConsumer(consumer_config)
consumer.subscribe(['user-events'])
consumer.consume_messages()

🌊 Faust Stream Processing

1. Faust Basic Configuration

import faust
from faust import Record
import asyncio
from typing import Optional

# Faust app configuration
app = faust.App(
    'kafka-streaming-app',
    broker='kafka://localhost:9092',
    store='rocksdb://',  # State store
    value_serializer='json',
)

# Data model definition
class UserEvent(Record):
    user_id: str
    action: str
    timestamp: float
    metadata: Optional[dict] = None

class OrderEvent(Record):
    order_id: str
    user_id: str
    amount: float
    items: list
    timestamp: float

# Topic definition
user_events_topic = app.topic('user-events', value_type=UserEvent)
order_events_topic = app.topic('order-events', value_type=OrderEvent)
processed_events_topic = app.topic('processed-events', value_type=UserEvent)

# Table definition (state store)
user_activity_table = app.Table('user-activity', default=int)
order_summary_table = app.Table('order-summary', default=dict)

2. Stream Processing Example

@app.agent(user_events_topic)
async def process_user_events(events):
    """Process user events"""
    async for event in events:
        # Process event
        print(f"Processing user event: {event.user_id} - {event.action}")
        
        # Update state
        user_activity_table[event.user_id] += 1
        
        # Transform event
        processed_event = UserEvent(
            user_id=event.user_id,
            action=f"processed_{event.action}",
            timestamp=event.timestamp,
            metadata={"processed_by": "faust"}
        )
        
        # Send result
        await processed_events_topic.send(value=processed_event)

@app.agent(order_events_topic)
async def process_order_events(orders):
    """Process order events"""
    async for order in orders:
        print(f"Processing order: {order.order_id} - ${order.amount}")
        
        # Update order summary
        if order.user_id not in order_summary_table:
            order_summary_table[order.user_id] = {
                'total_orders': 0,
                'total_amount': 0.0,
                'last_order_time': 0.0
            }
        
        summary = order_summary_table[order.user_id]
        summary['total_orders'] += 1
        summary['total_amount'] += order.amount
        summary['last_order_time'] = order.timestamp
        
        # Classify by order amount
        if order.amount > 100:
            await app.topic('high-value-orders').send(value=order)
        else:
            await app.topic('regular-orders').send(value=order)

# Window aggregation
@app.timer(interval=60.0)  # Run every 1 minute
async def aggregate_metrics():
    """Aggregate metrics"""
    print("=== User Activity Summary ===")
    for user_id, count in user_activity_table.items():
        print(f"User {user_id}: {count} events")
    
    print("=== Order Summary ===")
    for user_id, summary in order_summary_table.items():
        print(f"User {user_id}: {summary['total_orders']} orders, ${summary['total_amount']:.2f}")

if __name__ == '__main__':
    app.main()

3. Advanced Stream Processing

from faust import Stream, Table
from faust.types import StreamT

# Stream join
@app.agent(user_events_topic)
async def join_user_orders(events):
    """Join user events with orders"""
    async for event in events:
        # Get user order information
        order_summary = order_summary_table.get(event.user_id, {})
        
        # Process joined data
        enriched_event = {
            'user_id': event.user_id,
            'action': event.action,
            'timestamp': event.timestamp,
            'user_orders': order_summary.get('total_orders', 0),
            'user_spent': order_summary.get('total_amount', 0.0)
        }
        
        print(f"Enriched event: {enriched_event}")

# Window aggregation
@app.agent(user_events_topic)
async def window_aggregation(events):
    """Time window aggregation"""
    async for event in events:
        # Aggregate by 5-minute window
        window = event.timestamp // 300  # 5 minutes = 300 seconds
        
        # Count by window
        window_key = f"window_{window}"
        user_activity_table[window_key] += 1
        
        # Send result when window is complete
        if window_key not in processed_windows:
            processed_windows.add(window_key)
            await app.topic('window-results').send(
                value={'window': window, 'count': user_activity_table[window_key]}
            )

processed_windows = set()

🛠️ Hands-on: Python-based Streaming System

1. Real-time Log Processing System

import asyncio
import json
import time
import logging
from datetime import datetime
from typing import Dict, Any
import pandas as pd

class LogProcessingSystem:
    def __init__(self):
        self.producer = ConfluentKafkaProducer(producer_config)
        self.consumer = ConfluentKafkaConsumer(consumer_config)
        self.log_stats = defaultdict(int)
    
    async def start_log_collector(self):
        """Start log collector"""
        while True:
            # Read from log file (in practice, monitor file)
            log_line = self.simulate_log_line()
            if log_line:
                self.producer.produce_message('raw-logs', None, log_line)
            
            await asyncio.sleep(0.1)  # 100ms interval
    
    def simulate_log_line(self):
        """Simulate log line"""
        import random
        levels = ['INFO', 'WARN', 'ERROR', 'DEBUG']
        actions = ['login', 'logout', 'purchase', 'view', 'search']
        
        return {
            'timestamp': datetime.now().isoformat(),
            'level': random.choice(levels),
            'message': f"User {random.randint(1, 1000)} performed {random.choice(actions)}",
            'ip': f"192.168.1.{random.randint(1, 255)}",
            'user_agent': 'Mozilla/5.0...'
        }
    
    async def start_log_processor(self):
        """Start log processor"""
        self.consumer.subscribe(['raw-logs'])
        
        while True:
            try:
                msg = self.consumer.consume_messages(timeout=1.0)
                if msg:
                    await self.process_log_message(msg)
            except Exception as e:
                logging.error(f"Error processing log: {e}")
    
    async def process_log_message(self, log_data):
        """Process log message"""
        try:
            # Statistics by log level
            level = log_data.get('level', 'UNKNOWN')
            self.log_stats[level] += 1
            
            # Special handling for error logs
            if level == 'ERROR':
                await self.handle_error_log(log_data)
            
            # User behavior analysis
            if 'performed' in log_data.get('message', ''):
                await self.analyze_user_behavior(log_data)
            
            # Update metrics
            await self.update_metrics(log_data)
            
        except Exception as e:
            logging.error(f"Error processing log message: {e}")
    
    async def handle_error_log(self, log_data):
        """Handle error log"""
        print(f"ERROR detected: {log_data['message']}")
        # Send error alert
        await self.send_alert('ERROR', log_data)
    
    async def analyze_user_behavior(self, log_data):
        """Analyze user behavior"""
        message = log_data.get('message', '')
        if 'login' in message:
            print("User login detected")
        elif 'purchase' in message:
            print("Purchase activity detected")
    
    async def send_alert(self, level, log_data):
        """Send alert"""
        alert = {
            'type': 'log_alert',
            'level': level,
            'message': log_data['message'],
            'timestamp': log_data['timestamp']
        }
        self.producer.produce_message('alerts', level, alert)
    
    async def update_metrics(self, log_data):
        """Update metrics"""
        metrics = {
            'timestamp': time.time(),
            'level_counts': dict(self.log_stats),
            'total_logs': sum(self.log_stats.values())
        }
        self.producer.produce_message('log-metrics', 'metrics', metrics)
    
    async def start_system(self):
        """Start system"""
        tasks = [
            asyncio.create_task(self.start_log_collector()),
            asyncio.create_task(self.start_log_processor())
        ]
        
        try:
            await asyncio.gather(*tasks)
        except KeyboardInterrupt:
            print("System stopped by user")
        finally:
            self.producer.flush()
            self.consumer.close()

# Run
if __name__ == "__main__":
    system = LogProcessingSystem()
    asyncio.run(system.start_system())

2. Event-driven Microservices

class EventDrivenMicroservice:
    def __init__(self, service_name):
        self.service_name = service_name
        self.consumer = ConfluentKafkaConsumer(consumer_config)
        self.producer = ConfluentKafkaProducer(producer_config)
        self.running = True
    
    async def start_service(self):
        """Start service"""
        topics = ['user-events', 'order-events', 'payment-events']
        self.consumer.subscribe(topics)
        
        print(f"{self.service_name} started, listening to {topics}")
        
        while self.running:
            try:
                msg = self.consumer.consume_messages(timeout=1.0)
                if msg:
                    await self.handle_event(msg)
            except Exception as e:
                logging.error(f"Error in {self.service_name}: {e}")
    
    async def handle_event(self, event_data):
        """Handle event"""
        event_type = event_data.get('type')
        
        if event_type == 'user_event':
            await self.handle_user_event(event_data)
        elif event_type == 'order_event':
            await self.handle_order_event(event_data)
        elif event_type == 'payment_event':
            await self.handle_payment_event(event_data)
    
    async def handle_user_event(self, event):
        """Handle user event"""
        print(f"Processing user event: {event}")
        # User event processing logic
        await self.send_event('user-processed', event)
    
    async def handle_order_event(self, event):
        """Handle order event"""
        print(f"Processing order event: {event}")
        # Order event processing logic
        await self.send_event('order-processed', event)
    
    async def handle_payment_event(self, event):
        """Handle payment event"""
        print(f"Processing payment event: {event}")
        # Payment event processing logic
        await self.send_event('payment-processed', event)
    
    async def send_event(self, topic, event_data):
        """Send event"""
        self.producer.produce_message(topic, event_data.get('id'), event_data)
    
    def stop(self):
        """Stop service"""
        self.running = False
        self.consumer.close()
        self.producer.flush()

# Run service
if __name__ == "__main__":
    service = EventDrivenMicroservice("event-processor")
    asyncio.run(service.start_service())

📊 Performance Optimization and Monitoring

1. Performance Monitoring

import psutil
import time
from collections import defaultdict
import threading

class KafkaPerformanceMonitor:
    def __init__(self):
        self.metrics = defaultdict(list)
        self.running = True
        self.monitor_thread = None
    
    def start_monitoring(self):
        """Start monitoring"""
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.start()
    
    def _monitor_loop(self):
        """Monitoring loop"""
        while self.running:
            # Collect system metrics
            cpu_percent = psutil.cpu_percent()
            memory_percent = psutil.virtual_memory().percent
            disk_io = psutil.disk_io_counters()
            network_io = psutil.net_io_counters()
            
            # Store metrics
            timestamp = time.time()
            self.metrics['cpu'].append((timestamp, cpu_percent))
            self.metrics['memory'].append((timestamp, memory_percent))
            self.metrics['disk_read'].append((timestamp, disk_io.read_bytes))
            self.metrics['disk_write'].append((timestamp, disk_io.write_bytes))
            self.metrics['network_sent'].append((timestamp, network_io.bytes_sent))
            self.metrics['network_recv'].append((timestamp, network_io.bytes_recv))
            
            # Clean old metrics (older than 1 hour)
            cutoff_time = timestamp - 3600
            for metric_name in self.metrics:
                self.metrics[metric_name] = [
                    (t, v) for t, v in self.metrics[metric_name] if t > cutoff_time
                ]
            
            time.sleep(10)  # Collect every 10 seconds
    
    def get_metrics_summary(self):
        """Get metrics summary"""
        summary = {}
        for metric_name, values in self.metrics.items():
            if values:
                recent_values = [v for t, v in values[-10:]]  # Last 10
                summary[metric_name] = {
                    'current': recent_values[-1] if recent_values else 0,
                    'average': sum(recent_values) / len(recent_values) if recent_values else 0,
                    'max': max(recent_values) if recent_values else 0,
                    'min': min(recent_values) if recent_values else 0
                }
        return summary
    
    def stop_monitoring(self):
        """Stop monitoring"""
        self.running = False
        if self.monitor_thread:
            self.monitor_thread.join()

# Usage example
monitor = KafkaPerformanceMonitor()
monitor.start_monitoring()

# Check metrics after 1 minute
time.sleep(60)
summary = monitor.get_metrics_summary()
print("Performance Summary:", summary)

monitor.stop_monitoring()

2. Error Handling and Retry

import asyncio
from functools import wraps
import random

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    """Retry decorator"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise e
                    
                    # Exponential backoff
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    jitter = random.uniform(0, delay * 0.1)
                    await asyncio.sleep(delay + jitter)
                    
                    print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
            
            return None
        return wrapper
    return decorator

class ResilientKafkaProducer:
    def __init__(self, config):
        self.producer = ConfluentKafkaProducer(config)
        self.dead_letter_queue = []
    
    @retry_with_backoff(max_retries=3)
    async def produce_with_retry(self, topic, key, value):
        """Produce message with retry"""
        try:
            self.producer.produce_message(topic, key, value)
            return True
        except Exception as e:
            print(f"Failed to produce message: {e}")
            raise e
    
    async def produce_message(self, topic, key, value):
        """Produce message (with retry)"""
        try:
            await self.produce_with_retry(topic, key, value)
        except Exception as e:
            # Add to dead letter queue on final failure
            self.dead_letter_queue.append({
                'topic': topic,
                'key': key,
                'value': value,
                'error': str(e),
                'timestamp': time.time()
            })
            print(f"Message sent to dead letter queue: {e}")
    
    def get_dead_letter_queue(self):
        """Get dead letter queue"""
        return self.dead_letter_queue
    
    def retry_dead_letter_queue(self):
        """Retry dead letter queue"""
        retry_count = 0
        for item in self.dead_letter_queue[:]:
            try:
                self.producer.produce_message(
                    item['topic'], 
                    item['key'], 
                    item['value']
                )
                self.dead_letter_queue.remove(item)
                retry_count += 1
            except Exception as e:
                print(f"Failed to retry dead letter: {e}")
        
        print(f"Retried {retry_count} messages from dead letter queue")

📚 Learning Summary

What We Learned in This Post

  1. Choosing Python Kafka Libraries
    • Comparison of kafka-python, confluent-kafka, faust, aiokafka
    • Features and use cases of each library
  2. Basic Usage of kafka-python
    • Producer and consumer implementation
    • Batch processing and offset management
  3. High-Performance Processing with confluent-kafka
    • High-performance producer/consumer implementation
    • Async processing and callback utilization
  4. Faust Stream Processing
    • Stream processing and state stores
    • Window aggregation and join operations
  5. Building Production-Grade Streaming System
    • Real-time log processing system
    • Event-driven microservices
  6. Performance Optimization and Monitoring
    • Performance monitoring system
    • Error handling and retry strategies

Key Concepts Summary

Concept Description Importance
Library Selection Choosing the right library for project requirements ⭐⭐⭐⭐⭐
Async Processing High-performance async programming ⭐⭐⭐⭐⭐
Stream Processing Real-time data transformation and aggregation ⭐⭐⭐⭐⭐
Monitoring System status tracking and performance optimization ⭐⭐⭐⭐

Practical Application Considerations

  1. Library Selection: Consider project requirements and performance needs
  2. Async Processing: Use asyncio for high-performance processing
  3. Error Handling: Implement retry strategies and dead letter queues
  4. Monitoring: Real-time metrics collection and alerting systems

With this guide, you can build high-performance streaming systems using Apache Kafka with Python! 🚀