Part 3: Apache Spark 실시간 스트리밍 처리와 Kafka 연동 - 실무 프로젝트
📚 Apache spark complete guide 시리즈
Part 4
⏱️ 50분
📊 고급
Part 3: Apache Spark 실시간 스트리밍 처리와 Kafka 연동 - 실무 프로젝트
Apache Spark Streaming, Structured Streaming, Kafka 연동을 통한 실시간 데이터 처리와 분석 시스템을 구축합니다.
📋 목차
- Spark Streaming 기초
- Structured Streaming 완전 정리
- Kafka 연동과 실시간 데이터 처리
- 워터마킹과 지연 데이터 처리
- 실무 프로젝트: 실시간 로그 분석 시스템
- 실시간 대시보드 구축
- 학습 요약
🔄 Spark Streaming 기초
Spark Streaming이란?
Spark Streaming은 Spark의 확장 모듈로, 마이크로 배치(Micro-batch) 방식으로 실시간 데이터를 처리합니다.
핵심 개념
- DStream (Discretized Stream): 연속적인 데이터 스트림을 작은 배치로 나눈 것
- 배치 간격 (Batch Interval): 각 배치를 처리하는 시간 간격
- 체크포인트 (Checkpoint): 장애 복구를 위한 상태 저장
DStream 기본 연산
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# StreamingContext 생성 (5초 배치 간격)
sc = SparkContext("local[2]", "StreamingExample")
ssc = StreamingContext(sc, 5) # 5초 배치 간격
# 텍스트 스트림 생성 (소켓 연결)
lines = ssc.socketTextStream("localhost", 9999)
# 기본 변환 연산
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
word_counts = pairs.reduceByKey(lambda x, y: x + y)
# 출력
word_counts.pprint()
# 스트리밍 시작
ssc.start()
ssc.awaitTermination()
DStream 고급 연산
# 윈도우 연산
windowed_counts = word_counts.reduceByKeyAndWindow(
lambda x, y: x + y, # reduce 함수
lambda x, y: x - y, # inverse reduce 함수
30, # 윈도우 길이 (30초)
10 # 슬라이딩 간격 (10초)
)
# 상태 유지 연산
def update_function(new_values, running_count):
if running_count is None:
running_count = 0
return sum(new_values, running_count)
running_counts = word_counts.updateStateByKey(update_function)
# 조인 연산
reference_data = sc.parallelize([("spark", "framework"), ("kafka", "broker")])
reference_dstream = ssc.queueStream([reference_data])
joined_stream = word_counts.transform(lambda rdd: rdd.join(reference_data))
# 출력
windowed_counts.pprint()
running_counts.pprint()
joined_stream.pprint()
📊 Structured Streaming 완전 정리
Structured Streaming이란?
Structured Streaming은 Spark SQL 엔진을 기반으로 한 고수준 스트리밍 API입니다.
핵심 특징
- 정확히 한 번 처리 (Exactly-once processing)
- 워터마킹 (Watermarking) 지원
- 이벤트 시간 (Event Time) 처리
- 구조화된 데이터 처리
기본 구조화된 스트리밍
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# SparkSession 생성
spark = SparkSession.builder \
.appName("StructuredStreamingExample") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# 스트리밍 데이터 읽기
streaming_df = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# 데이터 변환
words_df = streaming_df.select(
explode(split(streaming_df.value, " ")).alias("word")
)
# 집계
word_counts = words_df.groupBy("word").count()
# 스트리밍 쿼리 시작
query = word_counts \
.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime="10 seconds") \
.start()
query.awaitTermination()
다양한 데이터 소스
# 1. Kafka 스트림
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.load()
# 2. 파일 스트림
file_df = spark \
.readStream \
.format("json") \
.option("path", "/path/to/streaming/data") \
.option("maxFilesPerTrigger", 1) \
.schema(schema) \
.load()
# 3. Rate 스트림 (테스트용)
rate_df = spark \
.readStream \
.format("rate") \
.option("rowsPerSecond", 100) \
.load()
고급 스트리밍 연산
# 이벤트 시간 처리
from pyspark.sql.types import TimestampType
# 스키마 정의
schema = StructType([
StructField("timestamp", TimestampType(), True),
StructField("user_id", StringType(), True),
StructField("action", StringType(), True),
StructField("value", DoubleType(), True)
])
# 스트리밍 데이터 읽기
events_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load() \
.select(
from_json(col("value").cast("string"), schema).alias("data")
) \
.select("data.*")
# 이벤트 시간 기반 윈도우 집계
windowed_events = events_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes", "1 minute"),
"user_id"
) \
.agg(
count("*").alias("event_count"),
sum("value").alias("total_value"),
avg("value").alias("avg_value")
)
# 출력
query = windowed_events \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()
🔗 Kafka 연동과 실시간 데이터 처리
Kafka 설정과 연결
# Kafka 프로듀서 설정
from kafka import KafkaProducer
import json
import time
import random
def create_kafka_producer():
return KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
key_serializer=lambda x: x.encode('utf-8') if x else None
)
# 실시간 데이터 생성 및 전송
def generate_user_events():
producer = create_kafka_producer()
actions = ['login', 'logout', 'purchase', 'view', 'click']
user_ids = ['user_001', 'user_002', 'user_003', 'user_004', 'user_005']
for i in range(1000):
event = {
'timestamp': int(time.time() * 1000),
'user_id': random.choice(user_ids),
'action': random.choice(actions),
'value': random.uniform(1.0, 100.0),
'session_id': f'session_{i}',
'ip_address': f'192.168.1.{random.randint(1, 254)}'
}
producer.send('user-events', key=event['user_id'], value=event)
time.sleep(0.1) # 100ms 간격
producer.close()
# 데이터 생성 실행
# generate_user_events()
Spark에서 Kafka 데이터 읽기
# Kafka 스트림 읽기
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.load()
# JSON 파싱
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType
event_schema = StructType([
StructField("timestamp", LongType(), True),
StructField("user_id", StringType(), True),
StructField("action", StringType(), True),
StructField("value", DoubleType(), True),
StructField("session_id", StringType(), True),
StructField("ip_address", StringType(), True)
])
parsed_df = kafka_df.select(
col("key").cast("string").alias("kafka_key"),
from_json(col("value").cast("string"), event_schema).alias("data")
).select("kafka_key", "data.*")
# 타임스탬프 변환
events_df = parsed_df.withColumn(
"event_time",
from_unixtime(col("timestamp") / 1000).cast(TimestampType())
)
실시간 데이터 분석
# 실시간 사용자 활동 분석
user_activity = events_df \
.withWatermark("event_time", "5 minutes") \
.groupBy(
window("event_time", "1 minute", "30 seconds"),
"user_id"
) \
.agg(
count("*").alias("total_events"),
countDistinct("action").alias("unique_actions"),
sum("value").alias("total_value"),
collect_list("action").alias("actions_sequence")
)
# 액션별 실시간 집계
action_metrics = events_df \
.withWatermark("event_time", "5 minutes") \
.groupBy(
window("event_time", "2 minutes", "1 minute"),
"action"
) \
.agg(
count("*").alias("action_count"),
countDistinct("user_id").alias("unique_users"),
avg("value").alias("avg_value"),
max("value").alias("max_value"),
min("value").alias("min_value")
)
# 이상 탐지 (실시간)
anomaly_detection = events_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "5 minutes", "2 minutes"),
"user_id"
) \
.agg(
count("*").alias("event_count"),
sum("value").alias("total_value")
) \
.filter(col("event_count") > 50) # 5분에 50개 이상 이벤트
# 출력 설정
user_activity_query = user_activity \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()
action_metrics_query = action_metrics \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()
anomaly_query = anomaly_detection \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()
💧 워터마킹과 지연 데이터 처리
워터마킹이란?
워터마킹은 지연 데이터(late data)를 처리하기 위한 메커니즘입니다.
워터마킹 개념
- 이벤트 시간 (Event Time): 데이터가 실제로 발생한 시간
- 처리 시간 (Processing Time): 데이터가 시스템에서 처리되는 시간
- 워터마크: 지연 데이터를 받을 수 있는 최대 지연 시간
워터마킹 구현
# 워터마킹을 사용한 윈도우 집계
from pyspark.sql.functions import current_timestamp
# 워터마크 설정 (이벤트 시간으로부터 10분 지연 허용)
windowed_events_with_watermark = events_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
window("event_time", "5 minutes"),
"action"
) \
.agg(
count("*").alias("count"),
sum("value").alias("total_value")
)
# 지연 데이터 테스트
def test_late_data():
"""지연 데이터를 시뮬레이션하는 함수"""
import time
from datetime import datetime, timedelta
producer = create_kafka_producer()
# 정상 데이터
normal_event = {
'timestamp': int(time.time() * 1000),
'user_id': 'user_001',
'action': 'click',
'value': 10.0
}
# 지연 데이터 (5분 전 데이터)
late_event = {
'timestamp': int((time.time() - 300) * 1000), # 5분 전
'user_id': 'user_002',
'action': 'click',
'value': 20.0
}
# 매우 지연된 데이터 (15분 전 데이터)
very_late_event = {
'timestamp': int((time.time() - 900) * 1000), # 15분 전
'user_id': 'user_003',
'action': 'click',
'value': 30.0
}
# 데이터 전송
producer.send('user-events', key=normal_event['user_id'], value=normal_event)
producer.send('user-events', key=late_event['user_id'], value=late_event)
producer.send('user-events', key=very_late_event['user_id'], value=very_late_event)
producer.close()
# 워터마킹 쿼리 실행
watermark_query = windowed_events_with_watermark \
.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", False) \
.start()
지연 데이터 처리 전략
# 1. 적응형 워터마크
adaptive_watermark = events_df \
.withWatermark("event_time", "5 minutes") \
.groupBy(
window("event_time", "2 minutes"),
"user_id"
) \
.agg(
count("*").alias("event_count"),
max("event_time").alias("latest_event_time")
)
# 2. 지연 데이터 별도 처리
# 정상 데이터
normal_data = events_df \
.withWatermark("event_time", "5 minutes") \
.filter(col("event_time") >= current_timestamp() - expr("INTERVAL 10 MINUTES"))
# 지연 데이터
late_data = events_df \
.withWatermark("event_time", "5 minutes") \
.filter(col("event_time") < current_timestamp() - expr("INTERVAL 10 MINUTES"))
# 3. 지연 데이터 알림
late_data_alert = late_data \
.groupBy("user_id") \
.agg(
count("*").alias("late_event_count"),
min("event_time").alias("earliest_late_event")
) \
.filter(col("late_event_count") > 5)
# 지연 데이터 알림 쿼리
late_alert_query = late_data_alert \
.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.start()
🛠 ️ 실무 프로젝트: 실시간 로그 분석 시스템
프로젝트 구조
real-time-log-analysis/
├── src/
│ ├── log_processor.py
│ ├── anomaly_detector.py
│ ├── metrics_calculator.py
│ └── main.py
├── config/
│ ├── kafka_config.py
│ └── streaming_config.yaml
├── docker/
│ ├── docker-compose.yml
│ └── kafka-setup.sh
├── monitoring/
│ ├── grafana-dashboard.json
│ └── prometheus-config.yml
└── README.md
1. 로그 프로세서
# src/log_processor.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re
import json
class LogProcessor:
def __init__(self, spark_session):
self.spark = spark_session
# 로그 패턴 정의
self.log_patterns = {
'apache': r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+)',
'nginx': r'^(\S+) - (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "([^"]*)" "([^"]*)"',
'json': r'^{.*}$'
}
def parse_apache_log(self, log_line):
"""Apache 로그 파싱"""
match = re.match(self.log_patterns['apache'], log_line)
if match:
return {
'ip': match.group(1),
'identity': match.group(2),
'user': match.group(3),
'timestamp': match.group(4),
'method': match.group(5),
'url': match.group(6),
'protocol': match.group(7),
'status': int(match.group(8)),
'size': match.group(9)
}
return None
def parse_json_log(self, log_line):
"""JSON 로그 파싱"""
try:
return json.loads(log_line)
except:
return None
def create_log_schema(self):
"""로그 스키마 정의"""
return StructType([
StructField("timestamp", TimestampType(), True),
StructField("level", StringType(), True),
StructField("service", StringType(), True),
StructField("message", StringType(), True),
StructField("ip_address", StringType(), True),
StructField("user_id", StringType(), True),
StructField("request_id", StringType(), True),
StructField("response_time", DoubleType(), True),
StructField("status_code", IntegerType(), True),
StructField("error_code", StringType(), True)
])
def process_log_stream(self, kafka_df):
"""로그 스트림 처리"""
# JSON 파싱
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), self.create_log_schema()).alias("data")
).select("data.*")
# 데이터 정제
cleaned_df = parsed_df \
.withColumn("timestamp", to_timestamp(col("timestamp"))) \
.withColumn("response_time", col("response_time").cast(DoubleType())) \
.withColumn("status_code", col("status_code").cast(IntegerType())) \
.filter(col("timestamp").isNotNull())
return cleaned_df
def extract_metrics(self, logs_df):
"""로그에서 메트릭 추출"""
# 응답 시간 분포
response_time_metrics = logs_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "1 minute"),
"service"
) \
.agg(
count("*").alias("request_count"),
avg("response_time").alias("avg_response_time"),
max("response_time").alias("max_response_time"),
min("response_time").alias("min_response_time"),
stddev("response_time").alias("stddev_response_time")
)
# 에러율 계산
error_rate = logs_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "1 minute"),
"service"
) \
.agg(
count("*").alias("total_requests"),
sum(when(col("status_code") >= 400, 1).otherwise(0)).alias("error_count")
) \
.withColumn("error_rate", col("error_count") / col("total_requests") * 100)
# IP별 요청 패턴
ip_patterns = logs_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes"),
"ip_address"
) \
.agg(
count("*").alias("request_count"),
countDistinct("user_id").alias("unique_users"),
collect_set("service").alias("services_used")
)
return response_time_metrics, error_rate, ip_patterns
2. 이상 탐지기
# src/anomaly_detector.py
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import numpy as np
class AnomalyDetector:
def __init__(self, spark_session):
self.spark = spark_session
def detect_response_time_anomalies(self, metrics_df):
"""응답 시간 이상 탐지"""
# 이동 평균과 표준편차 계산
window_spec = Window.partitionBy("service").orderBy("window")
anomaly_df = metrics_df \
.withColumn("avg_avg_response_time", avg("avg_response_time").over(
window_spec.rowsBetween(-10, -1)
)) \
.withColumn("stddev_avg_response_time", stddev("avg_response_time").over(
window_spec.rowsBetween(-10, -1)
)) \
.withColumn("z_score",
(col("avg_response_time") - col("avg_avg_response_time")) /
col("stddev_avg_response_time")
) \
.filter(
(col("z_score") > 2) | (col("z_score") < -2)
)
return anomaly_df
def detect_error_rate_spikes(self, error_rate_df):
"""에러율 급증 탐지"""
window_spec = Window.partitionBy("service").orderBy("window")
spike_df = error_rate_df \
.withColumn("prev_error_rate", lag("error_rate", 1).over(window_spec)) \
.withColumn("error_rate_change",
col("error_rate") - col("prev_error_rate")
) \
.filter(col("error_rate_change") > 10) # 10% 이상 증가
return spike_df
def detect_suspicious_ips(self, ip_patterns_df):
"""의심스러운 IP 탐지"""
# 높은 요청 빈도
high_frequency = ip_patterns_df.filter(col("request_count") > 1000)
# 많은 서비스 사용
multi_service = ip_patterns_df.filter(size(col("services_used")) > 5)
# 의심스러운 패턴 결합
suspicious_ips = high_frequency.intersect(multi_service)
return suspicious_ips
def detect_ddos_attacks(self, logs_df):
"""DDoS 공격 탐지"""
ddos_df = logs_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "1 minute"),
"ip_address"
) \
.agg(
count("*").alias("request_count"),
countDistinct("user_id").alias("unique_users")
) \
.filter(
(col("request_count") > 100) & (col("unique_users") < 5)
)
return ddos_df
3. 메트릭 계산기
# src/metrics_calculator.py
from pyspark.sql.functions import *
from pyspark.sql.window import Window
class MetricsCalculator:
def __init__(self, spark_session):
self.spark = spark_session
def calculate_sla_metrics(self, logs_df):
"""SLA 메트릭 계산"""
sla_df = logs_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "1 minute"),
"service"
) \
.agg(
count("*").alias("total_requests"),
sum(when(col("response_time") <= 1.0, 1).otherwise(0)).alias("fast_requests"),
sum(when(col("status_code") == 200, 1).otherwise(0)).alias("successful_requests")
) \
.withColumn("availability", col("successful_requests") / col("total_requests") * 100) \
.withColumn("performance", col("fast_requests") / col("total_requests") * 100)
return sla_df
def calculate_business_metrics(self, logs_df):
"""비즈니스 메트릭 계산"""
# 사용자별 활동
user_activity = logs_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes"),
"user_id"
) \
.agg(
count("*").alias("activity_count"),
countDistinct("service").alias("services_used"),
avg("response_time").alias("avg_response_time")
)
# 서비스별 인기도
service_popularity = logs_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "1 minute"),
"service"
) \
.agg(
count("*").alias("request_count"),
countDistinct("user_id").alias("unique_users")
)
return user_activity, service_popularity
def calculate_system_health(self, logs_df):
"""시스템 건강도 계산"""
health_df = logs_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "1 minute")
) \
.agg(
count("*").alias("total_requests"),
countDistinct("service").alias("active_services"),
countDistinct("user_id").alias("active_users"),
avg("response_time").alias("avg_response_time"),
sum(when(col("status_code") >= 400, 1).otherwise(0)).alias("error_count")
) \
.withColumn("error_rate", col("error_count") / col("total_requests") * 100) \
.withColumn("health_score",
when(col("error_rate") < 1, 100)
.when(col("error_rate") < 5, 80)
.when(col("error_rate") < 10, 60)
.otherwise(40)
)
return health_df
4. 메인 애플리케이션
# src/main.py
import os
import logging
from pyspark.sql import SparkSession
from log_processor import LogProcessor
from anomaly_detector import AnomalyDetector
from metrics_calculator import MetricsCalculator
def setup_logging():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def main():
setup_logging()
logger = logging.getLogger(__name__)
try:
# Spark 세션 생성
spark = SparkSession.builder \
.appName("RealTimeLogAnalysis") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.streaming.checkpointLocation", "/tmp/checkpoint") \
.getOrCreate()
logger.info("Spark session created successfully")
# 컴포넌트 초기화
log_processor = LogProcessor(spark)
anomaly_detector = AnomalyDetector(spark)
metrics_calculator = MetricsCalculator(spark)
# Kafka 스트림 읽기
kafka_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "logs") \
.option("startingOffsets", "latest") \
.load()
# 로그 처리
processed_logs = log_processor.process_log_stream(kafka_df)
# 메트릭 계산
response_metrics, error_rate, ip_patterns = log_processor.extract_metrics(processed_logs)
sla_metrics = metrics_calculator.calculate_sla_metrics(processed_logs)
user_activity, service_popularity = metrics_calculator.calculate_business_metrics(processed_logs)
system_health = metrics_calculator.calculate_system_health(processed_logs)
# 이상 탐지
response_anomalies = anomaly_detector.detect_response_time_anomalies(response_metrics)
error_spikes = anomaly_detector.detect_error_rate_spikes(error_rate)
suspicious_ips = anomaly_detector.detect_suspicious_ips(ip_patterns)
ddos_attacks = anomaly_detector.detect_ddos_attacks(processed_logs)
# 쿼리 시작
queries = []
# 메트릭 출력
queries.append(
response_metrics.writeStream
.outputMode("append")
.format("console")
.option("truncate", False)
.start()
)
queries.append(
error_rate.writeStream
.outputMode("append")
.format("console")
.option("truncate", False)
.start()
)
queries.append(
system_health.writeStream
.outputMode("append")
.format("console")
.option("truncate", False)
.start()
)
# 이상 탐지 알림
queries.append(
response_anomalies.writeStream
.outputMode("append")
.format("console")
.option("truncate", False)
.start()
)
queries.append(
error_spikes.writeStream
.outputMode("append")
.format("console")
.option("truncate", False)
.start()
)
queries.append(
suspicious_ips.writeStream
.outputMode("append")
.format("console")
.option("truncate", False)
.start()
)
# 모든 쿼리 대기
for query in queries:
query.awaitTermination()
except Exception as e:
logger.error(f"Application failed with error: {str(e)}")
raise
finally:
if 'spark' in locals():
spark.stop()
if __name__ == "__main__":
main()
📊 실시간 대시보드 구축
Grafana 대시보드 설정
{
"dashboard": {
"title": "Real-time Log Analysis Dashboard",
"panels": [
{
"title": "System Health Score",
"type": "stat",
"targets": [
{
"expr": "avg(health_score)",
"legendFormat": "Health Score"
}
]
},
{
"title": "Response Time Trends",
"type": "graph",
"targets": [
{
"expr": "avg(avg_response_time) by (service)",
"legendFormat": ""
}
]
},
{
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "avg(error_rate) by (service)",
"legendFormat": ""
}
]
},
{
"title": "Request Volume",
"type": "graph",
"targets": [
{
"expr": "sum(request_count)",
"legendFormat": "Total Requests"
}
]
},
{
"title": "Anomaly Alerts",
"type": "table",
"targets": [
{
"expr": "anomaly_count",
"legendFormat": "Anomalies"
}
]
}
]
}
}
Prometheus 메트릭 내보내기
# 메트릭 내보내기 함수
def export_metrics_to_prometheus(metrics_df):
"""Prometheus로 메트릭 내보내기"""
import requests
import time
while True:
# 최신 메트릭 수집
latest_metrics = metrics_df.collect()
for metric in latest_metrics:
# Prometheus 메트릭 형식으로 변환
prometheus_metric = {
'metric_name': 'spark_streaming_metric',
'labels': {
'service': metric['service'],
'window': str(metric['window'])
},
'value': metric['avg_response_time'],
'timestamp': int(time.time() * 1000)
}
# Prometheus Pushgateway로 전송
requests.post(
'http://localhost:9091/metrics/job/spark_streaming',
data=prometheus_metric
)
time.sleep(10) # 10초마다 전송
⚡ 실시간 분석 지연시간 최적화
지연시간 분석 도구
# 실시간 지연시간 분석 및 최적화 도구
class StreamingLatencyOptimizer:
def __init__(self, spark_session):
self.spark = spark_session
def analyze_processing_latency(self, streaming_df, latency_threshold_ms=1000):
"""처리 지연시간 분석"""
from pyspark.sql.functions import col, current_timestamp, unix_timestamp, when
# 이벤트 시간과 처리 시간 사이의 지연 계산
latency_df = streaming_df.withColumn(
"processing_latency_ms",
(unix_timestamp(current_timestamp()) - unix_timestamp("timestamp")) * 1000
)
# 지연시간 통계 계산
latency_stats = latency_df.select(
col("processing_latency_ms"),
when(col("processing_latency_ms") > latency_threshold_ms, 1).otherwise(0).alias("is_slow")
).agg({
"processing_latency_ms": "avg",
"processing_latency_ms": "max",
"processing_latency_ms": "min",
"is_slow": "sum"
}).collect()[0]
total_records = latency_df.count()
return {
'avg_latency_ms': latency_stats[0],
'max_latency_ms': latency_stats[1],
'min_latency_ms': latency_stats[2],
'slow_records_count': latency_stats[3],
'slow_records_ratio': latency_stats[3] / total_records if total_records > 0 else 0,
'total_records': total_records,
'latency_threshold_ms': latency_threshold_ms
}
def optimize_streaming_configuration(self, current_config):
"""스트리밍 설정 최적화"""
optimized_config = current_config.copy()
# 배치 간격 최적화 (지연시간에 따라 조정)
if current_config.get('batch_interval_seconds', 10) > 5:
optimized_config['batch_interval_seconds'] = 1 # 1초로 단축
# 백프레셔 설정 최적화
optimized_config.update({
'spark.sql.streaming.rateLimit.enabled': 'true',
'spark.sql.streaming.rateLimit.maxOffsetsPerTrigger': '10000',
'spark.sql.streaming.rateLimit.maxRecordsPerSecond': '5000'
})
# 체크포인트 최적화
optimized_config.update({
'spark.sql.streaming.checkpointLocation': '/tmp/optimized_checkpoint',
'spark.sql.streaming.minBatchesToRetain': '1', # 최소 배치 유지
'spark.sql.streaming.stateStore.providerClass': 'org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider'
})
# 메모리 최적화
optimized_config.update({
'spark.sql.streaming.stateStore.maintenanceInterval': '10s', # 상태 정리 간격 단축
'spark.sql.streaming.stateStore.minDeltasForSnapshot': '10' # 스냅샷 생성 빈도 증가
})
return optimized_config
def implement_adaptive_batching(self, streaming_df, target_latency_ms=500):
"""적응형 배칭 구현"""
from pyspark.sql.functions import col, window, count, max as spark_max
# 윈도우 크기를 동적으로 조정하는 함수
def create_adaptive_window(df, base_window_size="10 seconds"):
# 현재 시스템 부하에 따라 윈도우 크기 조정
current_load = self._get_system_load()
if current_load > 0.8: # 높은 부하
window_size = "30 seconds"
elif current_load > 0.5: # 중간 부하
window_size = "20 seconds"
else: # 낮은 부하
window_size = "10 seconds"
return df.withWatermark("timestamp", window_size)
return create_adaptive_window(streaming_df)
def _get_system_load(self):
"""시스템 부하 측정"""
status_tracker = self.spark.sparkContext.statusTracker()
executor_infos = status_tracker.getExecutorInfos()
if not executor_infos:
return 0.0
total_memory = sum(info.maxMemory for info in executor_infos)
used_memory = sum(info.memoryUsed for info in executor_infos)
return used_memory / total_memory if total_memory > 0 else 0.0
def optimize_watermark_strategy(self, streaming_df, data_lateness_hours=2):
"""워터마크 전략 최적화"""
from pyspark.sql.functions import col, window, count, max as spark_max
# 데이터 지연 패턴에 따른 적응형 워터마크
watermark_delay = f"{data_lateness_hours} hours"
# 지연시간이 긴 데이터를 위한 추가 처리
optimized_df = streaming_df.withWatermark("timestamp", watermark_delay)
return optimized_df
def implement_latency_monitoring(self, streaming_df):
"""지연시간 모니터링 구현"""
from pyspark.sql.functions import col, current_timestamp, unix_timestamp, window, count, avg
# 실시간 지연시간 메트릭 생성
latency_metrics = streaming_df.withColumn(
"processing_latency_ms",
(unix_timestamp(current_timestamp()) - unix_timestamp("timestamp")) * 1000
).withWatermark("timestamp", "10 minutes").groupBy(
window("timestamp", "1 minute"),
"service"
).agg(
count("*").alias("record_count"),
avg("processing_latency_ms").alias("avg_latency_ms")
)
return latency_metrics
# 지연시간 최적화 예제
def streaming_latency_optimization_example():
spark = SparkSession.builder.appName("StreamingLatencyOptimization").getOrCreate()
optimizer = StreamingLatencyOptimizer(spark)
# 스트리밍 데이터 생성 (Kafka에서 읽는 것으로 가정)
streaming_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "logs") \
.load()
# JSON 파싱
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
schema = StructType([
StructField("timestamp", TimestampType(), True),
StructField("service", StringType(), True),
StructField("message", StringType(), True)
])
parsed_df = streaming_df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# 지연시간 분석
latency_analysis = optimizer.analyze_processing_latency(parsed_df)
print("=== Streaming Latency Analysis ===")
print(f"Average Latency: {latency_analysis['avg_latency_ms']:.2f}ms")
print(f"Max Latency: {latency_analysis['max_latency_ms']:.2f}ms")
print(f"Slow Records Ratio: {latency_analysis['slow_records_ratio']:.2%}")
# 설정 최적화
current_config = {
'batch_interval_seconds': 10,
'checkpoint_location': '/tmp/checkpoint'
}
optimized_config = optimizer.optimize_streaming_configuration(current_config)
print("\n=== Optimized Configuration ===")
for key, value in optimized_config.items():
print(f"{key}: {value}")
# 지연시간 모니터링 설정
latency_metrics = optimizer.implement_latency_monitoring(parsed_df)
# 스트리밍 쿼리 시작
query = latency_metrics.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.start()
return query
고급 지연시간 최적화 기법
# 고급 지연시간 최적화 클래스
class AdvancedLatencyOptimizer:
def __init__(self, spark_session):
self.spark = spark_session
def implement_parallel_processing(self, streaming_df, parallelism_factor=2):
"""병렬 처리 구현"""
# 파티션 수를 증가시켜 병렬 처리 향상
current_partitions = streaming_df.rdd.getNumPartitions()
optimized_partitions = current_partitions * parallelism_factor
return streaming_df.repartition(optimized_partitions)
def optimize_memory_usage_for_low_latency(self, spark_session):
"""저지연을 위한 메모리 최적화"""
# 저지연 처리를 위한 메모리 설정
memory_configs = {
'spark.sql.streaming.stateStore.maintenanceInterval': '5s',
'spark.sql.streaming.stateStore.minDeltasForSnapshot': '5',
'spark.sql.streaming.stateStore.compression.enabled': 'true',
'spark.sql.streaming.stateStore.compression.codec': 'lz4',
'spark.sql.streaming.stateStore.rocksdb.compression': 'true',
'spark.sql.streaming.stateStore.rocksdb.blockSizeKB': '64',
'spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB': '128'
}
for key, value in memory_configs.items():
spark_session.conf.set(key, value)
return memory_configs
def implement_backpressure_control(self, streaming_df):
"""백프레셔 제어 구현"""
from pyspark.sql.functions import col, lag, when
# 데이터 처리 속도를 모니터링하고 조절
backpressure_df = streaming_df.withColumn(
"processing_rate",
col("timestamp").cast("long") - lag(col("timestamp").cast("long"), 1).over(
window(col("timestamp"), "1 minute")
)
).withColumn(
"should_throttle",
when(col("processing_rate") > 1000, True).otherwise(False) # 1초당 1000개 이상이면 스로틀링
)
return backpressure_df
def optimize_serialization_for_low_latency(self, spark_session):
"""저지연을 위한 직렬화 최적화"""
serialization_configs = {
'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
'spark.sql.execution.arrow.pyspark.enabled': 'true',
'spark.sql.execution.arrow.maxRecordsPerBatch': '10000',
'spark.sql.execution.arrow.pyspark.fallback.enabled': 'true'
}
for key, value in serialization_configs.items():
spark_session.conf.set(key, value)
return serialization_configs
def implement_caching_for_streaming(self, streaming_df):
"""스트리밍을 위한 캐싱 전략"""
from pyspark import StorageLevel
# 자주 사용되는 데이터를 메모리에 캐싱
cached_df = streaming_df.persist(StorageLevel.MEMORY_AND_DISK_SER)
return cached_df
# 고급 지연시간 최적화 예제
def advanced_latency_optimization_example():
spark = SparkSession.builder.appName("AdvancedLatencyOptimization").getOrCreate()
optimizer = AdvancedLatencyOptimizer(spark)
# 스트리밍 데이터 생성
data = [(f"service_{i % 5}", f"message_{i}", time.time()) for i in range(1000)]
df = spark.createDataFrame(data, ["service", "message", "timestamp"])
# 병렬 처리 최적화
parallel_df = optimizer.implement_parallel_processing(df)
print(f"Parallel processing partitions: {parallel_df.rdd.getNumPartitions()}")
# 메모리 최적화
memory_configs = optimizer.optimize_memory_usage_for_low_latency(spark)
print("\n=== Memory Optimization Configs ===")
for key, value in memory_configs.items():
print(f"{key}: {value}")
# 직렬화 최적화
serialization_configs = optimizer.optimize_serialization_for_low_latency(spark)
print("\n=== Serialization Optimization Configs ===")
for key, value in serialization_configs.items():
print(f"{key}: {value}")
# 캐싱 전략 적용
cached_df = optimizer.implement_caching_for_streaming(df)
return cached_df
실시간 지연시간 모니터링 대시보드
# 실시간 지연시간 모니터링 대시보드
class RealTimeLatencyDashboard:
def __init__(self, spark_session):
self.spark = spark_session
def create_latency_monitoring_dashboard(self, streaming_df):
"""지연시간 모니터링 대시보드 생성"""
from pyspark.sql.functions import col, current_timestamp, unix_timestamp, window, count, avg, max as spark_max, min as spark_min
# 실시간 지연시간 메트릭 계산
latency_metrics = streaming_df.withColumn(
"processing_latency_ms",
(unix_timestamp(current_timestamp()) - unix_timestamp("timestamp")) * 1000
).withWatermark("timestamp", "5 minutes").groupBy(
window("timestamp", "30 seconds"),
"service"
).agg(
count("*").alias("record_count"),
avg("processing_latency_ms").alias("avg_latency_ms"),
spark_max("processing_latency_ms").alias("max_latency_ms"),
spark_min("processing_latency_ms").alias("min_latency_ms")
)
# 지연시간 알림 생성
alerts = latency_metrics.withColumn(
"alert_level",
when(col("avg_latency_ms") > 2000, "CRITICAL")
.when(col("avg_latency_ms") > 1000, "WARNING")
.when(col("avg_latency_ms") > 500, "INFO")
.otherwise("OK")
)
return {
'metrics': latency_metrics,
'alerts': alerts
}
def export_latency_metrics_to_grafana(self, metrics_df):
"""Grafana용 지연시간 메트릭 내보내기"""
import requests
import json
import time
def export_to_grafana():
while True:
try:
# 최신 메트릭 수집
latest_metrics = metrics_df.collect()
for metric in latest_metrics:
# Grafana 데이터 포인트 형식
grafana_data = {
"time": int(time.time() * 1000),
"value": metric['avg_latency_ms'],
"tags": {
"service": metric['service'],
"window": str(metric['window'])
}
}
# Grafana API로 전송
response = requests.post(
'http://localhost:3000/api/datasources/proxy/1/write',
json=grafana_data,
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
print(f"Latency metric exported: {metric['service']} - {metric['avg_latency_ms']:.2f}ms")
else:
print(f"Failed to export metric: {response.status_code}")
time.sleep(30) # 30초마다 전송
except Exception as e:
print(f"Error exporting metrics: {e}")
time.sleep(60) # 에러 시 1분 대기
return export_to_grafana
def create_latency_alerting_system(self, alerts_df):
"""지연시간 알림 시스템 생성"""
def process_alerts():
while True:
try:
# 최신 알림 수집
latest_alerts = alerts_df.collect()
for alert in latest_alerts:
if alert['alert_level'] in ['WARNING', 'CRITICAL']:
# 알림 메시지 생성
alert_message = {
'level': alert['alert_level'],
'service': alert['service'],
'avg_latency_ms': alert['avg_latency_ms'],
'max_latency_ms': alert['max_latency_ms'],
'timestamp': str(alert['window']),
'message': f"Service {alert['service']} has high latency: {alert['avg_latency_ms']:.2f}ms"
}
# Slack, 이메일, SMS 등으로 알림 전송
self._send_alert(alert_message)
time.sleep(60) # 1분마다 알림 체크
except Exception as e:
print(f"Error processing alerts: {e}")
time.sleep(120) # 에러 시 2분 대기
return process_alerts
def _send_alert(self, alert_message):
"""알림 전송 (Slack 예시)"""
import requests
slack_webhook_url = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
slack_message = {
"text": f"🚨 Spark Streaming Latency Alert",
"attachments": [
{
"color": "danger" if alert_message['level'] == 'CRITICAL' else "warning",
"fields": [
{"title": "Service", "value": alert_message['service'], "short": True},
{"title": "Average Latency", "value": f"{alert_message['avg_latency_ms']:.2f}ms", "short": True},
{"title": "Max Latency", "value": f"{alert_message['max_latency_ms']:.2f}ms", "short": True},
{"title": "Alert Level", "value": alert_message['level'], "short": True}
]
}
]
}
try:
response = requests.post(slack_webhook_url, json=slack_message)
if response.status_code == 200:
print(f"Alert sent successfully: {alert_message['message']}")
else:
print(f"Failed to send alert: {response.status_code}")
except Exception as e:
print(f"Error sending alert: {e}")
# 실시간 지연시간 모니터링 예제
def real_time_latency_monitoring_example():
spark = SparkSession.builder.appName("RealTimeLatencyMonitoring").getOrCreate()
dashboard = RealTimeLatencyDashboard(spark)
# 스트리밍 데이터 생성
data = [(f"service_{i % 3}", f"message_{i}", time.time()) for i in range(1000)]
df = spark.createDataFrame(data, ["service", "message", "timestamp"])
# 지연시간 모니터링 대시보드 생성
dashboard_data = dashboard.create_latency_monitoring_dashboard(df)
# 메트릭 내보내기 함수
export_function = dashboard.export_latency_metrics_to_grafana(dashboard_data['metrics'])
# 알림 처리 함수
alert_function = dashboard.create_latency_alerting_system(dashboard_data['alerts'])
print("=== Real-time Latency Monitoring Dashboard Created ===")
print("Metrics and alerts are being processed in real-time")
# 실제 환경에서는 별도 스레드에서 실행
# import threading
# threading.Thread(target=export_function, daemon=True).start()
# threading.Thread(target=alert_function, daemon=True).start()
return dashboard_data
📚 학습 요약
이번 파트에서 학습한 내용
- Spark Streaming 기초
- DStream과 마이크로 배치 처리
- 기본 변환 연산과 윈도우 연산
- 상태 유지 연산
- Structured Streaming
- 고수준 스트리밍 API
- 이벤트 시간 처리
- 다양한 데이터 소스
- Kafka 연동
- Kafka 프로듀서/컨슈머 설정
- 실시간 데이터 생성과 처리
- JSON 파싱과 스키마 처리
- 워터마킹과 지연 데이터
- 워터마크 메커니즘 이해
- 지연 데이터 처리 전략
- 적응형 워터마크
- 실무 프로젝트
- 실시간 로그 분석 시스템
- 이상 탐지와 알림
- 메트릭 계산과 모니터링
- 실시간 대시보드
- Grafana 대시보드 구축
- Prometheus 메트릭 내보내기
- 실시간 모니터링
- 실시간 분석 지연시간 최적화
- 지연시간 분석 도구
- 고급 지연시간 최적화 기법
- 실시간 지연시간 모니터링 대시보드
핵심 기술 스택
기술 | 용도 | 중요도 |
---|---|---|
Spark Streaming | 마이크로 배치 처리 | ⭐⭐⭐⭐ |
Structured Streaming | 고수준 스트리밍 | ⭐⭐⭐⭐⭐ |
Kafka | 메시지 브로커 | ⭐⭐⭐⭐⭐ |
워터마킹 | 지연 데이터 처리 | ⭐⭐⭐⭐ |
Grafana | 실시간 대시보드 | ⭐⭐⭐⭐ |
다음 파트 미리보기
Part 4: 모니터링과 성능 튜닝에서는 다음 내용을 다룹니다:
- Spark UI와 메트릭 분석
- 성능 모니터링과 프로파일링
- 메모리 최적화와 캐싱 전략
- 클러스터 튜닝과 확장성
다음 파트: Part 4: 모니터링과 성능 튜닝
이제 실시간 스트리밍 처리까지 마스터했습니다! 마지막 파트에서는 성능 튜닝과 모니터링으로 완성도를 높이겠습니다. 🚀