Apache Kafka Python 가이드: 실시간 스트리밍과 데이터 처리
⏱️ 25분
📊 고급
Apache Kafka Python 가이드: 실시간 스트리밍과 데이터 처리
Python을 활용한 Apache Kafka 실시간 스트리밍 개발과 데이터 처리 기법을 학습하고 실제 프로젝트에 적용해봅니다.
📋 목차
- Python Kafka 라이브러리 선택
- kafka-python 기본 사용법
- confluent-kafka 고성능 처리
- Faust 스트리밍 처리
- 실습: Python 기반 스트리밍 시스템
- 성능 최적화와 모니터링
- 학습 요약
🐍 Python Kafka 라이브러리 선택
주요 라이브러리 비교
| 라이브러리 | 특징 | 장점 | 단점 | 사용 사례 |
|---|---|---|---|---|
| kafka-python | 순수 Python 구현 | 설치 간단, 이해하기 쉬움 | 성능 제한 | 학습, 프로토타입 |
| confluent-kafka | C 라이브러리 래핑 | 고성능, 안정성 | 설치 복잡 | 프로덕션 환경 |
| faust | 스트리밍 전용 | 간단한 스트리밍 | 제한적 기능 | 스트리밍 처리 |
| aiokafka | 비동기 처리 | 비동기 지원 | 복잡성 | 고성능 비동기 |
설치 방법
# kafka-python 설치
pip install kafka-python
# confluent-kafka 설치 (권장)
pip install confluent-kafka
# faust 설치
pip install faust[rocksdb]
# aiokafka 설치
pip install aiokafka
# 추가 의존성
pip install pandas numpy asyncio
⚡ kafka-python 기본 사용법
1. 프로듀서 구현
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import time
import logging
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class PythonKafkaProducer:
def __init__(self, bootstrap_servers='localhost:9092'):
self.producer = KafkaProducer(
bootstrap_servers=[bootstrap_servers],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
# 성능 최적화 설정
batch_size=16384, # 16KB
linger_ms=5, # 5ms 대기
compression_type='snappy', # 압축
acks='all', # 모든 복제본 확인
retries=3, # 재시도 횟수
retry_backoff_ms=100, # 재시도 간격
# 안전성 설정
enable_idempotence=True, # 멱등성 보장
max_in_flight_requests_per_connection=5,
request_timeout_ms=30000,
)
def send_message(self, topic, key, value):
"""메시지 전송"""
try:
future = self.producer.send(topic, key=key, value=value)
record_metadata = future.get(timeout=10)
logger.info(f"Message sent to {record_metadata.topic} "
f"partition {record_metadata.partition} "
f"offset {record_metadata.offset}")
return True
except KafkaError as e:
logger.error(f"Failed to send message: {e}")
return False
def send_batch(self, topic, messages):
"""배치 메시지 전송"""
futures = []
for key, value in messages:
future = self.producer.send(topic, key=key, value=value)
futures.append(future)
# 모든 메시지 전송 완료 대기
for future in futures:
try:
future.get(timeout=10)
except KafkaError as e:
logger.error(f"Batch send failed: {e}")
return False
return True
def close(self):
"""프로듀서 종료"""
self.producer.flush() # 남은 메시지 전송
self.producer.close()
# 사용 예제
if __name__ == "__main__":
producer = PythonKafkaProducer()
# 단일 메시지 전송
producer.send_message(
topic="user-events",
key="user123",
value={"action": "login", "timestamp": time.time()}
)
# 배치 메시지 전송
messages = [
("user123", {"action": "view", "page": "/home"}),
("user456", {"action": "purchase", "amount": 99.99}),
("user789", {"action": "logout", "timestamp": time.time()})
]
producer.send_batch("user-events", messages)
producer.close()
2. 컨슈머 구현
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
from typing import Dict, Any, Callable
class PythonKafkaConsumer:
def __init__(self, bootstrap_servers='localhost:9092', group_id='python-consumer'):
self.consumer = KafkaConsumer(
bootstrap_servers=[bootstrap_servers],
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m else None,
# 오프셋 관리
auto_offset_reset='earliest', # earliest, latest, none
enable_auto_commit=False, # 수동 오프셋 커밋
# 성능 설정
max_poll_records=500, # 한 번에 처리할 레코드 수
session_timeout_ms=30000, # 세션 타임아웃
heartbeat_interval_ms=3000, # 하트비트 간격
# 재시도 설정
retry_backoff_ms=100,
request_timeout_ms=30000,
)
self.message_handler = None
def subscribe(self, topics):
"""토픽 구독"""
self.consumer.subscribe(topics)
logger.info(f"Subscribed to topics: {topics}")
def set_message_handler(self, handler: Callable[[str, Any], None]):
"""메시지 처리 핸들러 설정"""
self.message_handler = handler
def start_consuming(self):
"""메시지 소비 시작"""
try:
while True:
message_batch = self.consumer.poll(timeout_ms=1000)
for topic_partition, messages in message_batch.items():
for message in messages:
self._process_message(message)
# 오프셋 커밋
self.consumer.commit()
except KeyboardInterrupt:
logger.info("Consumer stopped by user")
except Exception as e:
logger.error(f"Consumer error: {e}")
finally:
self.consumer.close()
def _process_message(self, message):
"""메시지 처리"""
try:
logger.info(f"Received message: {message.key} -> {message.value}")
if self.message_handler:
self.message_handler(message.key, message.value)
else:
# 기본 처리
self._default_handler(message.key, message.value)
except Exception as e:
logger.error(f"Error processing message: {e}")
def _default_handler(self, key, value):
"""기본 메시지 핸들러"""
print(f"Key: {key}, Value: {value}")
# 사용 예제
def custom_message_handler(key, value):
"""커스텀 메시지 핸들러"""
print(f"Processing: {key} -> {value}")
# 여기에 실제 처리 로직 구현
if __name__ == "__main__":
consumer = PythonKafkaConsumer()
consumer.subscribe(['user-events', 'order-events'])
consumer.set_message_handler(custom_message_handler)
consumer.start_consuming()
🚀 confluent-kafka 고성능 처리
1. 고성능 프로듀서
from confluent_kafka import Producer, Consumer, KafkaError
import json
import time
import threading
from collections import defaultdict
class ConfluentKafkaProducer:
def __init__(self, config):
self.producer = Producer(config)
self.delivery_callback_count = 0
self.delivery_callback_errors = 0
def delivery_callback(self, err, msg):
"""전송 결과 콜백"""
if err is not None:
self.delivery_callback_errors += 1
print(f'Message delivery failed: {err}')
else:
self.delivery_callback_count += 1
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
def produce_message(self, topic, key, value):
"""메시지 전송"""
try:
self.producer.produce(
topic,
key=key,
value=json.dumps(value).encode('utf-8'),
callback=self.delivery_callback
)
# 비동기 전송이므로 주기적으로 flush 필요
self.producer.poll(0)
except BufferError as e:
print(f'Producer queue is full: {e}')
self.producer.flush()
raise
def flush(self):
"""남은 메시지 전송"""
self.producer.flush()
# 설정 예제
producer_config = {
'bootstrap.servers': 'localhost:9092',
'compression.type': 'snappy',
'batch.size': 16384,
'linger.ms': 5,
'acks': 'all',
'retries': 3,
'enable.idempotence': True,
'max.in.flight.requests.per.connection': 5,
}
producer = ConfluentKafkaProducer(producer_config)
2. 고성능 컨슈머
class ConfluentKafkaConsumer:
def __init__(self, config):
self.consumer = Consumer(config)
self.running = True
def subscribe(self, topics):
"""토픽 구독"""
self.consumer.subscribe(topics)
def consume_messages(self, timeout=1.0):
"""메시지 소비"""
try:
while self.running:
msg = self.consumer.poll(timeout=timeout)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# 파티션 끝에 도달
continue
else:
print(f"Consumer error: {msg.error()}")
continue
# 메시지 처리
self.process_message(msg)
except KeyboardInterrupt:
print("Consumer interrupted by user")
finally:
self.close()
def process_message(self, msg):
"""메시지 처리"""
try:
key = msg.key().decode('utf-8') if msg.key() else None
value = json.loads(msg.value().decode('utf-8'))
print(f"Received message: {key} -> {value}")
# 여기에 실제 처리 로직 구현
except Exception as e:
print(f"Error processing message: {e}")
def close(self):
"""컨슈머 종료"""
self.running = False
self.consumer.close()
# 설정 예제
consumer_config = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'python-consumer-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.records': 500,
'session.timeout.ms': 30000,
'heartbeat.interval.ms': 3000,
}
consumer = ConfluentKafkaConsumer(consumer_config)
consumer.subscribe(['user-events'])
consumer.consume_messages()
🌊 Faust 스트리밍 처리
1. Faust 기본 설정
import faust
from faust import Record
import asyncio
from typing import Optional
# Faust 앱 설정
app = faust.App(
'kafka-streaming-app',
broker='kafka://localhost:9092',
store='rocksdb://', # 상태 저장소
value_serializer='json',
)
# 데이터 모델 정의
class UserEvent(Record):
user_id: str
action: str
timestamp: float
metadata: Optional[dict] = None
class OrderEvent(Record):
order_id: str
user_id: str
amount: float
items: list
timestamp: float
# 토픽 정의
user_events_topic = app.topic('user-events', value_type=UserEvent)
order_events_topic = app.topic('order-events', value_type=OrderEvent)
processed_events_topic = app.topic('processed-events', value_type=UserEvent)
# 테이블 정의 (상태 저장소)
user_activity_table = app.Table('user-activity', default=int)
order_summary_table = app.Table('order-summary', default=dict)
2. 스트림 처리 예제
@app.agent(user_events_topic)
async def process_user_events(events):
"""사용자 이벤트 처리"""
async for event in events:
# 이벤트 처리
print(f"Processing user event: {event.user_id} - {event.action}")
# 상태 업데이트
user_activity_table[event.user_id] += 1
# 이벤트 변환
processed_event = UserEvent(
user_id=event.user_id,
action=f"processed_{event.action}",
timestamp=event.timestamp,
metadata={"processed_by": "faust"}
)
# 결과 전송
await processed_events_topic.send(value=processed_event)
@app.agent(order_events_topic)
async def process_order_events(orders):
"""주문 이벤트 처리"""
async for order in orders:
print(f"Processing order: {order.order_id} - ${order.amount}")
# 주문 요약 업데이트
if order.user_id not in order_summary_table:
order_summary_table[order.user_id] = {
'total_orders': 0,
'total_amount': 0.0,
'last_order_time': 0.0
}
summary = order_summary_table[order.user_id]
summary['total_orders'] += 1
summary['total_amount'] += order.amount
summary['last_order_time'] = order.timestamp
# 주문 금액별 분류
if order.amount > 100:
await app.topic('high-value-orders').send(value=order)
else:
await app.topic('regular-orders').send(value=order)
# 윈도우 집계
@app.timer(interval=60.0) # 1분마다 실행
async def aggregate_metrics():
"""메트릭 집계"""
print("=== User Activity Summary ===")
for user_id, count in user_activity_table.items():
print(f"User {user_id}: {count} events")
print("=== Order Summary ===")
for user_id, summary in order_summary_table.items():
print(f"User {user_id}: {summary['total_orders']} orders, ${summary['total_amount']:.2f}")
if __name__ == '__main__':
app.main()
3. 고급 스트림 처리
from faust import Stream, Table
from faust.types import StreamT
# 스트림 조인
@app.agent(user_events_topic)
async def join_user_orders(events):
"""사용자 이벤트와 주문 조인"""
async for event in events:
# 사용자 주문 정보 조회
order_summary = order_summary_table.get(event.user_id, {})
# 조인된 데이터 처리
enriched_event = {
'user_id': event.user_id,
'action': event.action,
'timestamp': event.timestamp,
'user_orders': order_summary.get('total_orders', 0),
'user_spent': order_summary.get('total_amount', 0.0)
}
print(f"Enriched event: {enriched_event}")
# 윈도우 집계
@app.agent(user_events_topic)
async def window_aggregation(events):
"""시간 윈도우 집계"""
async for event in events:
# 5분 윈도우로 집계
window = event.timestamp // 300 # 5분 = 300초
# 윈도우별 카운트
window_key = f"window_{window}"
user_activity_table[window_key] += 1
# 윈도우가 완료되면 결과 전송
if window_key not in processed_windows:
processed_windows.add(window_key)
await app.topic('window-results').send(
value={'window': window, 'count': user_activity_table[window_key]}
)
processed_windows = set()
🛠 ️ 실습: Python 기반 스트리밍 시스템
1. 실시간 로그 처리 시스템
import asyncio
import json
import time
import logging
from datetime import datetime
from typing import Dict, Any
import pandas as pd
class LogProcessingSystem:
def __init__(self):
self.producer = ConfluentKafkaProducer(producer_config)
self.consumer = ConfluentKafkaConsumer(consumer_config)
self.log_stats = defaultdict(int)
async def start_log_collector(self):
"""로그 수집기 시작"""
while True:
# 로그 파일에서 읽기 (실제로는 파일 모니터링)
log_line = self.simulate_log_line()
if log_line:
self.producer.produce_message('raw-logs', None, log_line)
await asyncio.sleep(0.1) # 100ms 간격
def simulate_log_line(self):
"""로그 라인 시뮬레이션"""
import random
levels = ['INFO', 'WARN', 'ERROR', 'DEBUG']
actions = ['login', 'logout', 'purchase', 'view', 'search']
return {
'timestamp': datetime.now().isoformat(),
'level': random.choice(levels),
'message': f"User {random.randint(1, 1000)} performed {random.choice(actions)}",
'ip': f"192.168.1.{random.randint(1, 255)}",
'user_agent': 'Mozilla/5.0...'
}
async def start_log_processor(self):
"""로그 처리기 시작"""
self.consumer.subscribe(['raw-logs'])
while True:
try:
msg = self.consumer.consume_messages(timeout=1.0)
if msg:
await self.process_log_message(msg)
except Exception as e:
logging.error(f"Error processing log: {e}")
async def process_log_message(self, log_data):
"""로그 메시지 처리"""
try:
# 로그 레벨별 통계
level = log_data.get('level', 'UNKNOWN')
self.log_stats[level] += 1
# 에러 로그 특별 처리
if level == 'ERROR':
await self.handle_error_log(log_data)
# 사용자 행동 분석
if 'performed' in log_data.get('message', ''):
await self.analyze_user_behavior(log_data)
# 메트릭 업데이트
await self.update_metrics(log_data)
except Exception as e:
logging.error(f"Error processing log message: {e}")
async def handle_error_log(self, log_data):
"""에러 로그 처리"""
print(f"ERROR detected: {log_data['message']}")
# 에러 알림 전송
await self.send_alert('ERROR', log_data)
async def analyze_user_behavior(self, log_data):
"""사용자 행동 분석"""
message = log_data.get('message', '')
if 'login' in message:
print("User login detected")
elif 'purchase' in message:
print("Purchase activity detected")
async def send_alert(self, level, log_data):
"""알림 전송"""
alert = {
'type': 'log_alert',
'level': level,
'message': log_data['message'],
'timestamp': log_data['timestamp']
}
self.producer.produce_message('alerts', level, alert)
async def update_metrics(self, log_data):
"""메트릭 업데이트"""
metrics = {
'timestamp': time.time(),
'level_counts': dict(self.log_stats),
'total_logs': sum(self.log_stats.values())
}
self.producer.produce_message('log-metrics', 'metrics', metrics)
async def start_system(self):
"""시스템 시작"""
tasks = [
asyncio.create_task(self.start_log_collector()),
asyncio.create_task(self.start_log_processor())
]
try:
await asyncio.gather(*tasks)
except KeyboardInterrupt:
print("System stopped by user")
finally:
self.producer.flush()
self.consumer.close()
# 실행
if __name__ == "__main__":
system = LogProcessingSystem()
asyncio.run(system.start_system())
2. 이벤트 기반 마이크로서비스
class EventDrivenMicroservice:
def __init__(self, service_name):
self.service_name = service_name
self.consumer = ConfluentKafkaConsumer(consumer_config)
self.producer = ConfluentKafkaProducer(producer_config)
self.running = True
async def start_service(self):
"""서비스 시작"""
topics = ['user-events', 'order-events', 'payment-events']
self.consumer.subscribe(topics)
print(f"{self.service_name} started, listening to {topics}")
while self.running:
try:
msg = self.consumer.consume_messages(timeout=1.0)
if msg:
await self.handle_event(msg)
except Exception as e:
logging.error(f"Error in {self.service_name}: {e}")
async def handle_event(self, event_data):
"""이벤트 처리"""
event_type = event_data.get('type')
if event_type == 'user_event':
await self.handle_user_event(event_data)
elif event_type == 'order_event':
await self.handle_order_event(event_data)
elif event_type == 'payment_event':
await self.handle_payment_event(event_data)
async def handle_user_event(self, event):
"""사용자 이벤트 처리"""
print(f"Processing user event: {event}")
# 사용자 이벤트 처리 로직
await self.send_event('user-processed', event)
async def handle_order_event(self, event):
"""주문 이벤트 처리"""
print(f"Processing order event: {event}")
# 주문 이벤트 처리 로직
await self.send_event('order-processed', event)
async def handle_payment_event(self, event):
"""결제 이벤트 처리"""
print(f"Processing payment event: {event}")
# 결제 이벤트 처리 로직
await self.send_event('payment-processed', event)
async def send_event(self, topic, event_data):
"""이벤트 전송"""
self.producer.produce_message(topic, event_data.get('id'), event_data)
def stop(self):
"""서비스 중지"""
self.running = False
self.consumer.close()
self.producer.flush()
# 서비스 실행
if __name__ == "__main__":
service = EventDrivenMicroservice("event-processor")
asyncio.run(service.start_service())
📊 성능 최적화와 모니터링
1. 성능 모니터링
import psutil
import time
from collections import defaultdict
import threading
class KafkaPerformanceMonitor:
def __init__(self):
self.metrics = defaultdict(list)
self.running = True
self.monitor_thread = None
def start_monitoring(self):
"""모니터링 시작"""
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.start()
def _monitor_loop(self):
"""모니터링 루프"""
while self.running:
# 시스템 메트릭 수집
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory().percent
disk_io = psutil.disk_io_counters()
network_io = psutil.net_io_counters()
# 메트릭 저장
timestamp = time.time()
self.metrics['cpu'].append((timestamp, cpu_percent))
self.metrics['memory'].append((timestamp, memory_percent))
self.metrics['disk_read'].append((timestamp, disk_io.read_bytes))
self.metrics['disk_write'].append((timestamp, disk_io.write_bytes))
self.metrics['network_sent'].append((timestamp, network_io.bytes_sent))
self.metrics['network_recv'].append((timestamp, network_io.bytes_recv))
# 오래된 메트릭 정리 (1시간 이상)
cutoff_time = timestamp - 3600
for metric_name in self.metrics:
self.metrics[metric_name] = [
(t, v) for t, v in self.metrics[metric_name] if t > cutoff_time
]
time.sleep(10) # 10초마다 수집
def get_metrics_summary(self):
"""메트릭 요약 반환"""
summary = {}
for metric_name, values in self.metrics.items():
if values:
recent_values = [v for t, v in values[-10:]] # 최근 10개
summary[metric_name] = {
'current': recent_values[-1] if recent_values else 0,
'average': sum(recent_values) / len(recent_values) if recent_values else 0,
'max': max(recent_values) if recent_values else 0,
'min': min(recent_values) if recent_values else 0
}
return summary
def stop_monitoring(self):
"""모니터링 중지"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join()
# 사용 예제
monitor = KafkaPerformanceMonitor()
monitor.start_monitoring()
# 1분 후 메트릭 확인
time.sleep(60)
summary = monitor.get_metrics_summary()
print("Performance Summary:", summary)
monitor.stop_monitoring()
2. 에러 처리와 재시도
import asyncio
from functools import wraps
import random
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
"""재시도 데코레이터"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e
# 지수 백오프
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
print(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
return None
return wrapper
return decorator
class ResilientKafkaProducer:
def __init__(self, config):
self.producer = ConfluentKafkaProducer(config)
self.dead_letter_queue = []
@retry_with_backoff(max_retries=3)
async def produce_with_retry(self, topic, key, value):
"""재시도가 있는 메시지 전송"""
try:
self.producer.produce_message(topic, key, value)
return True
except Exception as e:
print(f"Failed to produce message: {e}")
raise e
async def produce_message(self, topic, key, value):
"""메시지 전송 (재시도 포함)"""
try:
await self.produce_with_retry(topic, key, value)
except Exception as e:
# 최종 실패 시 데드 레터 큐에 추가
self.dead_letter_queue.append({
'topic': topic,
'key': key,
'value': value,
'error': str(e),
'timestamp': time.time()
})
print(f"Message sent to dead letter queue: {e}")
def get_dead_letter_queue(self):
"""데드 레터 큐 반환"""
return self.dead_letter_queue
def retry_dead_letter_queue(self):
"""데드 레터 큐 재시도"""
retry_count = 0
for item in self.dead_letter_queue[:]:
try:
self.producer.produce_message(
item['topic'],
item['key'],
item['value']
)
self.dead_letter_queue.remove(item)
retry_count += 1
except Exception as e:
print(f"Failed to retry dead letter: {e}")
print(f"Retried {retry_count} messages from dead letter queue")
📚 학습 요약
이번 포스트에서 학습한 내용
- Python Kafka 라이브러리 선택
- kafka-python, confluent-kafka, faust, aiokafka 비교
- 각 라이브러리의 특징과 사용 사례
- kafka-python 기본 사용법
- 프로듀서와 컨슈머 구현
- 배치 처리와 오프셋 관리
- confluent-kafka 고성능 처리
- 고성능 프로듀서/컨슈머 구현
- 비동기 처리와 콜백 활용
- Faust 스트리밍 처리
- 스트림 처리와 상태 저장소
- 윈도우 집계와 조인 연산
- 실무급 스트리밍 시스템 구축
- 실시간 로그 처리 시스템
- 이벤트 기반 마이크로서비스
- 성능 최적화와 모니터링
- 성능 모니터링 시스템
- 에러 처리와 재시도 전략
핵심 개념 정리
| 개념 | 설명 | 중요도 |
|---|---|---|
| 라이브러리 선택 | 프로젝트 요구사항에 맞는 라이브러리 선택 | ⭐⭐⭐⭐⭐ |
| 비동기 처리 | 고성능을 위한 비동기 프로그래밍 | ⭐⭐⭐⭐⭐ |
| 스트리밍 처리 | 실시간 데이터 변환과 집계 | ⭐⭐⭐⭐⭐ |
| 모니터링 | 시스템 상태 추적과 성능 최적화 | ⭐⭐⭐⭐ |
실무 적용 시 고려사항
- 라이브러리 선택: 프로젝트 요구사항과 성능 요구사항 고려
- 비동기 처리: asyncio를 활용한 고성능 처리
- 에러 처리: 재시도 전략과 데드 레터 큐 활용
- 모니터링: 실시간 메트릭 수집과 알림 시스템
이 가이드를 통해 Python으로 Apache Kafka를 활용한 고성능 스트리밍 시스템을 구축할 수 있습니다! 🚀