Part 4: Apache Spark 모니터링과 성능 튜닝 - 프로덕션 환경 완성
📚 Apache spark complete guide 시리즈
Part 5
⏱️ 55분
📊 전문가
Part 4: Apache Spark 모니터링과 성능 튜닝 - 프로덕션 환경 완성
Apache Spark의 성능 모니터링, 프로파일링, 메모리 최적화, 클러스터 튜닝을 통한 프로덕션 환경 구축을 완성합니다.
📋 목차
🖥️ Spark UI와 메트릭 분석
Spark UI 개요
Spark UI는 Spark 애플리케이션의 성능을 모니터링하고 분석하는 웹 기반 인터페이스입니다.
핵심 탭과 기능
- Jobs 탭
- 작업 실행 상태와 시간
- 스테이지별 실행 정보
- 작업 실패 원인 분석
- Stages 탭
- 각 스테이지의 상세 정보
- 태스크 실행 분포
- 데이터 스큐(Data Skew) 분석
- Storage 탭
- 캐시된 데이터 정보
- 메모리 사용량
- 디스크 저장 상태
- Environment 탭
- Spark 설정 정보
- 시스템 환경 변수
- JVM 설정
메트릭 분석 도구
# Spark 메트릭 수집 및 분석
from pyspark.sql import SparkSession
import json
import time
class SparkMetricsCollector:
def __init__(self, spark_session):
self.spark = spark_session
self.sc = spark_session.sparkContext
def collect_job_metrics(self):
"""작업 메트릭 수집"""
status_tracker = self.sc.statusTracker()
# 실행 중인 작업 정보
active_jobs = status_tracker.getActiveJobIds()
# 작업별 메트릭 수집
job_metrics = {}
for job_id in active_jobs:
job_info = status_tracker.getJobInfo(job_id)
job_metrics[job_id] = {
'status': job_info.status,
'num_tasks': job_info.numTasks,
'num_active_tasks': job_info.numActiveTasks,
'num_completed_tasks': job_info.numCompletedTasks,
'num_failed_tasks': job_info.numFailedTasks
}
return job_metrics
def collect_stage_metrics(self):
"""스테이지 메트릭 수집"""
status_tracker = self.sc.statusTracker()
# 활성 스테이지 정보
active_stages = status_tracker.getActiveStageIds()
stage_metrics = {}
for stage_id in active_stages:
stage_info = status_tracker.getStageInfo(stage_id)
stage_metrics[stage_id] = {
'num_tasks': stage_info.numTasks,
'num_active_tasks': stage_info.numActiveTasks,
'num_completed_tasks': stage_info.numCompletedTasks,
'num_failed_tasks': stage_info.numFailedTasks,
'executor_run_time': stage_info.executorRunTime,
'executor_cpu_time': stage_info.executorCpuTime,
'input_bytes': stage_info.inputBytes,
'output_bytes': stage_info.outputBytes,
'shuffle_read_bytes': stage_info.shuffleReadBytes,
'shuffle_write_bytes': stage_info.shuffleWriteBytes
}
return stage_metrics
def collect_executor_metrics(self):
"""실행자 메트릭 수집"""
status_tracker = self.sc.statusTracker()
executor_infos = status_tracker.getExecutorInfos()
executor_metrics = {}
for executor_info in executor_infos:
executor_metrics[executor_info.executorId] = {
'host': executor_info.host,
'total_cores': executor_info.totalCores,
'max_memory': executor_info.maxMemory,
'memory_used': executor_info.memoryUsed,
'disk_used': executor_info.diskUsed,
'active_tasks': executor_info.activeTasks,
'completed_tasks': executor_info.completedTasks,
'failed_tasks': executor_info.failedTasks,
'total_duration': executor_info.totalDuration,
'total_gc_time': executor_info.totalGCTime
}
return executor_metrics
# 사용 예제
def monitor_spark_application():
spark = SparkSession.builder.appName("MetricsExample").getOrCreate()
collector = SparkMetricsCollector(spark)
# 주기적으로 메트릭 수집
while True:
job_metrics = collector.collect_job_metrics()
stage_metrics = collector.collect_stage_metrics()
executor_metrics = collector.collect_executor_metrics()
print("=== Job Metrics ===")
print(json.dumps(job_metrics, indent=2))
print("=== Stage Metrics ===")
print(json.dumps(stage_metrics, indent=2))
print("=== Executor Metrics ===")
print(json.dumps(executor_metrics, indent=2))
time.sleep(10) # 10초마다 수집
📊 성능 모니터링과 프로파일링
성능 프로파일링 도구
# 성능 프로파일링 클래스
class SparkProfiler:
def __init__(self, spark_session):
self.spark = spark_session
self.sc = spark_session.sparkContext
def profile_query_execution(self, query_name, df):
"""쿼리 실행 프로파일링"""
import time
start_time = time.time()
# 실행 계획 분석
execution_plan = df.explain(True)
# 쿼리 실행
result = df.collect()
end_time = time.time()
execution_time = end_time - start_time
# 메트릭 수집
status_tracker = self.sc.statusTracker()
executor_infos = status_tracker.getExecutorInfos()
total_memory_used = sum(info.memoryUsed for info in executor_infos)
total_gc_time = sum(info.totalGCTime for info in executor_infos)
profile_result = {
'query_name': query_name,
'execution_time': execution_time,
'total_memory_used': total_memory_used,
'total_gc_time': total_gc_time,
'execution_plan': execution_plan,
'result_count': len(result)
}
return profile_result
def analyze_data_skew(self, df, key_columns):
"""데이터 스큐 분석"""
# 키별 데이터 분포 분석
key_counts = df.groupBy(*key_columns).count()
# 통계 정보 수집
stats = key_counts.select(
count("*").alias("total_keys"),
min("count").alias("min_count"),
max("count").alias("max_count"),
avg("count").alias("avg_count"),
stddev("count").alias("stddev_count")
).collect()[0]
# 스큐 비율 계산
skew_ratio = stats['max_count'] / stats['avg_count'] if stats['avg_count'] > 0 else 0
skew_analysis = {
'total_keys': stats['total_keys'],
'min_count': stats['min_count'],
'max_count': stats['max_count'],
'avg_count': stats['avg_count'],
'stddev_count': stats['stddev_count'],
'skew_ratio': skew_ratio,
'is_skewed': skew_ratio > 2.0 # 스큐 임계값
}
return skew_analysis
def monitor_memory_usage(self):
"""메모리 사용량 모니터링"""
status_tracker = self.sc.statusTracker()
executor_infos = status_tracker.getExecutorInfos()
memory_stats = {
'total_executors': len(executor_infos),
'total_memory_used': sum(info.memoryUsed for info in executor_infos),
'total_max_memory': sum(info.maxMemory for info in executor_infos),
'memory_utilization': 0,
'executor_details': []
}
for info in executor_infos:
executor_detail = {
'executor_id': info.executorId,
'host': info.host,
'memory_used': info.memoryUsed,
'max_memory': info.maxMemory,
'utilization': info.memoryUsed / info.maxMemory if info.maxMemory > 0 else 0
}
memory_stats['executor_details'].append(executor_detail)
if memory_stats['total_max_memory'] > 0:
memory_stats['memory_utilization'] = (
memory_stats['total_memory_used'] / memory_stats['total_max_memory']
)
return memory_stats
# 성능 분석 도구 사용 예제
def performance_analysis_example():
spark = SparkSession.builder.appName("PerformanceAnalysis").getOrCreate()
profiler = SparkProfiler(spark)
# 샘플 데이터 생성
data = [(i, f"user_{i}", i * 10) for i in range(10000)]
df = spark.createDataFrame(data, ["id", "name", "value"])
# 쿼리 프로파일링
profile_result = profiler.profile_query_execution(
"sample_aggregation",
df.groupBy("name").agg(sum("value").alias("total_value"))
)
print("Query Profile Result:")
print(json.dumps(profile_result, indent=2))
# 데이터 스큐 분석
skew_analysis = profiler.analyze_data_skew(df, ["name"])
print("\nData Skew Analysis:")
print(json.dumps(skew_analysis, indent=2))
# 메모리 사용량 모니터링
memory_stats = profiler.monitor_memory_usage()
print("\nMemory Usage Stats:")
print(json.dumps(memory_stats, indent=2))
성능 최적화 가이드라인
# 성능 최적화 체크리스트
class PerformanceOptimizer:
def __init__(self, spark_session):
self.spark = spark_session
def check_partitioning(self, df):
"""파티셔닝 최적화 체크"""
num_partitions = df.rdd.getNumPartitions()
# 파티션 크기 확인
partition_sizes = df.rdd.mapPartitions(lambda x: [len(list(x))]).collect()
avg_size = sum(partition_sizes) / len(partition_sizes)
max_size = max(partition_sizes)
min_size = min(partition_sizes)
# 파티션 불균형 확인
imbalance_ratio = max_size / avg_size if avg_size > 0 else 0
recommendations = []
if num_partitions < 2:
recommendations.append("파티션 수가 너무 적습니다. 최소 2개 이상 권장")
if imbalance_ratio > 2.0:
recommendations.append(f"파티션 불균형이 심합니다 (비율: {imbalance_ratio:.2f}). repartition() 고려")
if avg_size > 128 * 1024 * 1024: # 128MB
recommendations.append("파티션 크기가 너무 큽니다. 더 작은 파티션으로 분할 권장")
return {
'num_partitions': num_partitions,
'avg_size': avg_size,
'max_size': max_size,
'min_size': min_size,
'imbalance_ratio': imbalance_ratio,
'recommendations': recommendations
}
def check_caching_strategy(self, df, usage_count=1):
"""캐싱 전략 체크"""
storage_level = df.storageLevel
recommendations = []
if usage_count > 1 and storage_level == StorageLevel.NONE:
recommendations.append("데이터를 여러 번 사용하므로 캐싱을 고려하세요")
if storage_level.useDisk and usage_count < 3:
recommendations.append("디스크 캐싱은 메모리 캐싱보다 느립니다. 사용 빈도를 고려하세요")
return {
'storage_level': str(storage_level),
'usage_count': usage_count,
'recommendations': recommendations
}
def analyze_query_plan(self, df):
"""쿼리 계획 분석"""
plan = df.explain(True)
recommendations = []
# 브로드캐스트 조인 확인
if "BroadcastHashJoin" in plan:
recommendations.append("브로드캐스트 조인이 사용되었습니다. 작은 테이블 크기를 확인하세요")
# 셔플 확인
if "Exchange" in plan:
recommendations.append("셔플이 발생합니다. 파티셔닝 키 최적화를 고려하세요")
# 스캔 확인
if "FileScan" in plan:
recommendations.append("파일 스캔이 발생합니다. 파티셔닝이나 인덱싱을 고려하세요")
return {
'execution_plan': plan,
'recommendations': recommendations
}
# 성능 최적화 예제
def optimization_example():
spark = SparkSession.builder.appName("OptimizationExample").getOrCreate()
optimizer = PerformanceOptimizer(spark)
# 샘플 데이터
data = [(i, f"category_{i % 10}", i * 100) for i in range(100000)]
df = spark.createDataFrame(data, ["id", "category", "value"])
# 파티셔닝 체크
partition_analysis = optimizer.check_partitioning(df)
print("Partitioning Analysis:")
print(json.dumps(partition_analysis, indent=2))
# 캐싱 전략 체크
caching_analysis = optimizer.check_caching_strategy(df, usage_count=3)
print("\nCaching Strategy Analysis:")
print(json.dumps(caching_analysis, indent=2))
# 쿼리 계획 분석
query_analysis = optimizer.analyze_query_plan(
df.groupBy("category").agg(sum("value").alias("total"))
)
print("\nQuery Plan Analysis:")
print(json.dumps(query_analysis, indent=2))
💾 메모리 최적화와 캐싱 전략
메모리 관리 최적화
# 메모리 최적화 도구
class MemoryOptimizer:
def __init__(self, spark_session):
self.spark = spark_session
def optimize_dataframe_memory(self, df):
"""DataFrame 메모리 최적화"""
from pyspark.sql.types import IntegerType, LongType, FloatType, DoubleType
optimized_df = df
# 컬럼 타입 최적화
for field in df.schema.fields:
field_name = field.name
field_type = field.dataType
if isinstance(field_type, IntegerType):
# 큰 정수값이 없다면 더 작은 타입으로 변경
max_val = df.select(max(col(field_name))).collect()[0][0]
min_val = df.select(min(col(field_name))).collect()[0][0]
if -128 <= min_val <= 127 and -128 <= max_val <= 127:
optimized_df = optimized_df.withColumn(
field_name, col(field_name).cast("tinyint")
)
elif -32768 <= min_val <= 32767 and -32768 <= max_val <= 32767:
optimized_df = optimized_df.withColumn(
field_name, col(field_name).cast("smallint")
)
elif isinstance(field_type, FloatType):
# Float를 Double로 변경하여 정밀도 향상
optimized_df = optimized_df.withColumn(
field_name, col(field_name).cast("double")
)
return optimized_df
def calculate_memory_usage(self, df):
"""메모리 사용량 계산"""
# DataFrame 크기 추정
num_rows = df.count()
num_cols = len(df.columns)
# 컬럼별 타입 크기 추정
type_sizes = {
'string': 50, # 평균 문자열 길이
'int': 4,
'bigint': 8,
'double': 8,
'float': 4,
'boolean': 1,
'date': 8,
'timestamp': 8
}
estimated_size = 0
for field in df.schema.fields:
field_type = str(field.dataType)
base_type = field_type.split('(')[0].lower()
if base_type in type_sizes:
estimated_size += type_sizes[base_type]
else:
estimated_size += 8 # 기본값
total_estimated_size = num_rows * num_cols * estimated_size
return {
'num_rows': num_rows,
'num_cols': num_cols,
'estimated_size_bytes': total_estimated_size,
'estimated_size_mb': total_estimated_size / (1024 * 1024)
}
def optimize_caching_strategy(self, df, access_pattern):
"""캐싱 전략 최적화"""
from pyspark import StorageLevel
memory_stats = self.calculate_memory_usage(df)
size_mb = memory_stats['estimated_size_mb']
recommendations = []
if access_pattern['frequency'] == 'high' and size_mb < 1000:
recommendations.append({
'strategy': 'MEMORY_ONLY',
'reason': '자주 사용되고 크기가 작은 데이터'
})
elif access_pattern['frequency'] == 'high' and size_mb >= 1000:
recommendations.append({
'strategy': 'MEMORY_AND_DISK_SER',
'reason': '자주 사용되지만 크기가 큰 데이터'
})
elif access_pattern['frequency'] == 'medium':
recommendations.append({
'strategy': 'DISK_ONLY',
'reason': '중간 빈도 사용 데이터'
})
else:
recommendations.append({
'strategy': 'NO_CACHING',
'reason': '낮은 빈도 사용 데이터'
})
return {
'data_size_mb': size_mb,
'access_frequency': access_pattern['frequency'],
'recommendations': recommendations
}
# 메모리 최적화 예제
def memory_optimization_example():
spark = SparkSession.builder.appName("MemoryOptimization").getOrCreate()
optimizer = MemoryOptimizer(spark)
# 샘플 데이터 생성
data = [(i, f"user_{i}", i * 1.5, i % 2 == 0) for i in range(100000)]
df = spark.createDataFrame(data, ["id", "name", "score", "is_active"])
# 메모리 최적화
optimized_df = optimizer.optimize_dataframe_memory(df)
# 메모리 사용량 계산
original_memory = optimizer.calculate_memory_usage(df)
optimized_memory = optimizer.calculate_memory_usage(optimized_df)
print("Original Memory Usage:")
print(json.dumps(original_memory, indent=2))
print("\nOptimized Memory Usage:")
print(json.dumps(optimized_memory, indent=2))
# 캐싱 전략 최적화
access_pattern = {'frequency': 'high'}
caching_strategy = optimizer.optimize_caching_strategy(df, access_pattern)
print("\nCaching Strategy:")
print(json.dumps(caching_strategy, indent=2))
고급 캐싱 전략
# 고급 캐싱 관리자
class AdvancedCacheManager:
def __init__(self, spark_session):
self.spark = spark_session
self.cached_tables = {}
def smart_cache(self, df, table_name, access_pattern):
"""지능형 캐싱"""
from pyspark import StorageLevel
# 데이터 크기 확인
num_partitions = df.rdd.getNumPartitions()
# 접근 패턴에 따른 캐싱 전략 결정
if access_pattern['frequency'] == 'high':
if access_pattern['latency_requirement'] == 'low':
df.cache() # MEMORY_ONLY
storage_level = "MEMORY_ONLY"
else:
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
storage_level = "MEMORY_AND_DISK_SER"
else:
df.persist(StorageLevel.DISK_ONLY)
storage_level = "DISK_ONLY"
# 캐시 정보 저장
self.cached_tables[table_name] = {
'dataframe': df,
'storage_level': storage_level,
'access_count': 0,
'last_accessed': time.time()
}
return df
def monitor_cache_efficiency(self):
"""캐시 효율성 모니터링"""
status_tracker = self.spark.sparkContext.statusTracker()
executor_infos = status_tracker.getExecutorInfos()
cache_stats = {
'total_cached_data': 0,
'memory_cached': 0,
'disk_cached': 0,
'cache_hit_ratio': 0
}
for info in executor_infos:
cache_stats['total_cached_data'] += info.memoryUsed
cache_stats['memory_cached'] += info.memoryUsed
return cache_stats
def cleanup_unused_cache(self, max_age_hours=24):
"""사용하지 않는 캐시 정리"""
current_time = time.time()
max_age_seconds = max_age_hours * 3600
tables_to_remove = []
for table_name, cache_info in self.cached_tables.items():
if current_time - cache_info['last_accessed'] > max_age_seconds:
cache_info['dataframe'].unpersist()
tables_to_remove.append(table_name)
for table_name in tables_to_remove:
del self.cached_tables[table_name]
return len(tables_to_remove)
# 고급 캐싱 예제
def advanced_caching_example():
spark = SparkSession.builder.appName("AdvancedCaching").getOrCreate()
cache_manager = AdvancedCacheManager(spark)
# 샘플 데이터
data = [(i, f"category_{i % 5}", i * 10) for i in range(10000)]
df = spark.createDataFrame(data, ["id", "category", "value"])
# 지능형 캐싱
access_pattern = {
'frequency': 'high',
'latency_requirement': 'low'
}
cached_df = cache_manager.smart_cache(df, "sample_table", access_pattern)
# 캐시 효율성 모니터링
cache_stats = cache_manager.monitor_cache_efficiency()
print("Cache Statistics:")
print(json.dumps(cache_stats, indent=2))
# 사용하지 않는 캐시 정리
cleaned_count = cache_manager.cleanup_unused_cache(max_age_hours=1)
print(f"\nCleaned {cleaned_count} unused cache entries")
🔍 고급 성능 분석 도구
Spark History Server 활용법
# Spark History Server 설정 및 활용
class HistoryServerAnalyzer:
def __init__(self, history_server_url):
self.history_server_url = history_server_url
def analyze_completed_applications(self):
"""완료된 애플리케이션 분석"""
import requests
# History Server API를 통한 애플리케이션 목록 조회
apps_response = requests.get(f"{self.history_server_url}/api/v1/applications")
applications = apps_response.json()
analysis_results = []
for app in applications:
app_id = app['id']
# 애플리케이션 상세 정보 조회
app_details = requests.get(f"{self.history_server_url}/api/v1/applications/{app_id}")
app_info = app_details.json()
# 작업 정보 조회
jobs_response = requests.get(f"{self.history_server_url}/api/v1/applications/{app_id}/jobs")
jobs = jobs_response.json()
analysis_result = {
'app_id': app_id,
'app_name': app_info.get('name', 'Unknown'),
'start_time': app_info.get('attempts', [{}])[0].get('startTime'),
'duration': app_info.get('attempts', [{}])[0].get('duration'),
'total_jobs': len(jobs),
'failed_jobs': len([j for j in jobs if j.get('status') == 'FAILED']),
'performance_issues': self._identify_performance_issues(jobs)
}
analysis_results.append(analysis_result)
return analysis_results
def _identify_performance_issues(self, jobs):
"""성능 문제 식별"""
issues = []
for job in jobs:
# 느린 작업 식별 (10분 이상)
if job.get('duration', 0) > 600000: # 10분 = 600,000ms
issues.append({
'type': 'slow_job',
'job_id': job.get('jobId'),
'duration': job.get('duration'),
'message': f"작업 {job.get('jobId')}가 {job.get('duration')/1000:.1f}초 소요"
})
# 실패한 작업 식별
if job.get('status') == 'FAILED':
issues.append({
'type': 'failed_job',
'job_id': job.get('jobId'),
'message': f"작업 {job.get('jobId')} 실패"
})
return issues
def generate_performance_report(self, app_id):
"""성능 보고서 생성"""
import requests
# 스테이지 정보 조회
stages_response = requests.get(f"{self.history_server_url}/api/v1/applications/{app_id}/stages")
stages = stages_response.json()
# 실행자 정보 조회
executors_response = requests.get(f"{self.history_server_url}/api/v1/applications/{app_id}/executors")
executors = executors_response.json()
report = {
'total_stages': len(stages),
'total_executors': len(executors),
'stage_analysis': self._analyze_stages(stages),
'executor_analysis': self._analyze_executors(executors),
'recommendations': self._generate_recommendations(stages, executors)
}
return report
def _analyze_stages(self, stages):
"""스테이지 분석"""
stage_metrics = {
'total_tasks': sum(s.get('numTasks', 0) for s in stages),
'failed_tasks': sum(s.get('numFailedTasks', 0) for s in stages),
'avg_task_duration': 0,
'slow_stages': []
}
total_duration = 0
stage_count = 0
for stage in stages:
if stage.get('numTasks', 0) > 0:
avg_task_duration = stage.get('executorRunTime', 0) / stage.get('numTasks', 1)
total_duration += avg_task_duration
stage_count += 1
# 느린 스테이지 식별 (평균 태스크 시간 30초 이상)
if avg_task_duration > 30000:
stage_metrics['slow_stages'].append({
'stage_id': stage.get('stageId'),
'avg_task_duration': avg_task_duration,
'num_tasks': stage.get('numTasks')
})
if stage_count > 0:
stage_metrics['avg_task_duration'] = total_duration / stage_count
return stage_metrics
def _analyze_executors(self, executors):
"""실행자 분석"""
executor_metrics = {
'total_cores': sum(e.get('totalCores', 0) for e in executors),
'total_memory': sum(e.get('maxMemory', 0) for e in executors),
'memory_utilization': 0,
'gc_time_ratio': 0
}
total_used_memory = sum(e.get('memoryUsed', 0) for e in executors)
total_gc_time = sum(e.get('totalGCTime', 0) for e in executors)
total_executor_time = sum(e.get('totalDuration', 0) for e in executors)
if executor_metrics['total_memory'] > 0:
executor_metrics['memory_utilization'] = total_used_memory / executor_metrics['total_memory']
if total_executor_time > 0:
executor_metrics['gc_time_ratio'] = total_gc_time / total_executor_time
return executor_metrics
def _generate_recommendations(self, stages, executors):
"""권장사항 생성"""
recommendations = []
# 메모리 사용률이 높은 경우
total_memory = sum(e.get('maxMemory', 0) for e in executors)
used_memory = sum(e.get('memoryUsed', 0) for e in executors)
if total_memory > 0 and used_memory / total_memory > 0.8:
recommendations.append({
'type': 'memory',
'priority': 'high',
'message': '메모리 사용률이 높습니다. executor-memory 증가를 고려하세요'
})
# GC 시간이 긴 경우
total_gc_time = sum(e.get('totalGCTime', 0) for e in executors)
total_executor_time = sum(e.get('totalDuration', 0) for e in executors)
if total_executor_time > 0 and total_gc_time / total_executor_time > 0.1:
recommendations.append({
'type': 'gc',
'priority': 'medium',
'message': 'GC 시간이 전체 실행 시간의 10% 이상입니다. JVM 튜닝을 고려하세요'
})
# 느린 스테이지가 있는 경우
slow_stages = [s for s in stages if s.get('executorRunTime', 0) / max(s.get('numTasks', 1), 1) > 30000]
if slow_stages:
recommendations.append({
'type': 'performance',
'priority': 'medium',
'message': f'{len(slow_stages)}개의 스테이지가 느립니다. 파티셔닝 최적화를 고려하세요'
})
return recommendations
# History Server 활용 예제
def history_server_analysis_example():
analyzer = HistoryServerAnalyzer("http://localhost:18080")
# 완료된 애플리케이션 분석
completed_apps = analyzer.analyze_completed_applications()
print("=== Completed Applications Analysis ===")
for app in completed_apps:
print(f"App: {app['app_name']} ({app['app_id']})")
print(f" Duration: {app['duration']/1000:.1f}s")
print(f" Jobs: {app['total_jobs']} (Failed: {app['failed_jobs']})")
if app['performance_issues']:
print(" Performance Issues:")
for issue in app['performance_issues']:
print(f" - {issue['message']}")
print()
# 특정 애플리케이션 상세 분석
if completed_apps:
app_id = completed_apps[0]['app_id']
report = analyzer.generate_performance_report(app_id)
print("=== Performance Report ===")
print(json.dumps(report, indent=2))
🚨 실무 성능 문제 진단 및 해결
Data Skew 문제 해결
# Data Skew 진단 및 해결 도구
class DataSkewResolver:
def __init__(self, spark_session):
self.spark = spark_session
def detect_data_skew(self, df, key_columns):
"""데이터 스큐 감지"""
from pyspark.sql.functions import col, count, min, max, avg, stddev
# 키별 데이터 분포 분석
key_counts = df.groupBy(*key_columns).count()
# 통계 계산
stats = key_counts.select(
count("*").alias("total_keys"),
min("count").alias("min_count"),
max("count").alias("max_count"),
avg("count").alias("avg_count"),
stddev("count").alias("stddev_count")
).collect()[0]
skew_ratio = stats['max_count'] / stats['avg_count'] if stats['avg_count'] > 0 else 0
return {
'total_keys': stats['total_keys'],
'min_count': stats['min_count'],
'max_count': stats['max_count'],
'avg_count': stats['avg_count'],
'stddev_count': stats['stddev_count'],
'skew_ratio': skew_ratio,
'is_skewed': skew_ratio > 2.0,
'severity': self._get_skew_severity(skew_ratio)
}
def _get_skew_severity(self, skew_ratio):
"""스큐 심각도 판정"""
if skew_ratio > 10:
return "critical"
elif skew_ratio > 5:
return "high"
elif skew_ratio > 2:
return "medium"
else:
return "low"
def resolve_data_skew(self, df, key_columns, method="salting"):
"""데이터 스큐 해결"""
if method == "salting":
return self._apply_salting(df, key_columns)
elif method == "random_repartition":
return self._apply_random_repartition(df, key_columns)
elif method == "adaptive_query_execution":
return self._enable_adaptive_query_execution(df)
else:
raise ValueError(f"Unknown method: {method}")
def _apply_salting(self, df, key_columns):
"""Salting 기법 적용"""
from pyspark.sql.functions import col, rand, concat, lit
# Salt 값 생성 (0-99 사이의 랜덤 값)
salted_df = df.withColumn("salt", (rand() * 100).cast("int"))
# 키 컬럼에 salt 추가
salted_keys = [concat(col(key), lit("_"), col("salt")) for key in key_columns]
salted_keys.append(col("salt"))
return salted_df.select(*salted_keys, *[col(c) for c in df.columns if c not in key_columns])
def _apply_random_repartition(self, df, key_columns):
"""랜덤 리파티셔닝 적용"""
# 현재 파티션 수의 2배로 증가
current_partitions = df.rdd.getNumPartitions()
new_partitions = current_partitions * 2
return df.repartition(new_partitions)
def _enable_adaptive_query_execution(self, df):
"""적응형 쿼리 실행 활성화"""
# Spark 설정 변경
self.spark.conf.set("spark.sql.adaptive.enabled", "true")
self.spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
self.spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
self.spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
self.spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
return df
# Data Skew 해결 예제
def data_skew_resolution_example():
spark = SparkSession.builder.appName("DataSkewResolution").getOrCreate()
resolver = DataSkewResolver(spark)
# 스큐가 있는 데이터 생성 (일부 키에 대량의 데이터)
data = []
for i in range(100000):
if i < 1000: # 처음 1000개는 키 0에 할당 (스큐 생성)
data.append((0, f"data_{i}"))
else:
data.append((i % 100, f"data_{i}")) # 나머지는 균등 분포
df = spark.createDataFrame(data, ["key", "value"])
# 스큐 감지
skew_analysis = resolver.detect_data_skew(df, ["key"])
print("=== Data Skew Analysis ===")
print(json.dumps(skew_analysis, indent=2))
if skew_analysis['is_skewed']:
print(f"\nData skew detected with ratio: {skew_analysis['skew_ratio']:.2f}")
print(f"Severity: {skew_analysis['severity']}")
# 스큐 해결
resolved_df = resolver.resolve_data_skew(df, ["key"], method="salting")
# 해결 후 스큐 재측정
resolved_skew = resolver.detect_data_skew(resolved_df, ["key", "salt"])
print("\n=== After Skew Resolution ===")
print(json.dumps(resolved_skew, indent=2))
Slow Task 분석 및 최적화
# Slow Task 분석 및 최적화 도구
class SlowTaskAnalyzer:
def __init__(self, spark_session):
self.spark = spark_session
def analyze_slow_tasks(self, df, threshold_seconds=30):
"""느린 태스크 분석"""
from pyspark.sql.functions import col, count, min, max, avg, stddev
# 파티션별 처리 시간 측정
def measure_partition_processing(iterator):
import time
start_time = time.time()
# 데이터 처리
data = list(iterator)
processed_data = [row for row in data] # 실제 처리 로직
end_time = time.time()
processing_time = end_time - start_time
return [(processing_time, len(data))]
# 각 파티션의 처리 시간 측정
partition_metrics = df.rdd.mapPartitions(measure_partition_processing).collect()
if not partition_metrics:
return {"message": "No data to analyze"}
processing_times = [metric[0] for metric in partition_metrics]
data_sizes = [metric[1] for metric in partition_metrics]
slow_partitions = [
(i, time, size) for i, (time, size) in enumerate(partition_metrics)
if time > threshold_seconds
]
analysis = {
'total_partitions': len(partition_metrics),
'slow_partitions': len(slow_partitions),
'avg_processing_time': sum(processing_times) / len(processing_times),
'max_processing_time': max(processing_times),
'min_processing_time': min(processing_times),
'stddev_processing_time': self._calculate_stddev(processing_times),
'slow_partition_details': slow_partitions,
'recommendations': self._generate_slow_task_recommendations(slow_partitions, partition_metrics)
}
return analysis
def _calculate_stddev(self, values):
"""표준편차 계산"""
if len(values) < 2:
return 0
mean = sum(values) / len(values)
variance = sum((x - mean) ** 2 for x in values) / (len(values) - 1)
return variance ** 0.5
def _generate_slow_task_recommendations(self, slow_partitions, all_partitions):
"""느린 태스크 해결 권장사항 생성"""
recommendations = []
if slow_partitions:
recommendations.append({
'type': 'repartitioning',
'priority': 'high',
'message': f'{len(slow_partitions)}개의 파티션이 느립니다. repartition() 또는 coalesce()를 고려하세요'
})
# 파티션 크기 불균형 확인
data_sizes = [size for _, size in all_partitions]
if len(data_sizes) > 1:
size_ratio = max(data_sizes) / min(data_sizes)
if size_ratio > 5:
recommendations.append({
'type': 'data_distribution',
'priority': 'medium',
'message': f'파티션 크기 불균형이 심합니다 (비율: {size_ratio:.2f}). 데이터 분포를 개선하세요'
})
# 전체적인 처리 시간 분산 확인
processing_times = [time for time, _ in all_partitions]
if len(processing_times) > 1:
time_ratio = max(processing_times) / min(processing_times)
if time_ratio > 10:
recommendations.append({
'type': 'performance_optimization',
'priority': 'medium',
'message': f'처리 시간 분산이 큽니다 (비율: {time_ratio:.2f}). 로직 최적화를 고려하세요'
})
return recommendations
def optimize_partitioning(self, df, target_partition_size_mb=128):
"""파티셔닝 최적화"""
# 현재 데이터 크기 추정
row_count = df.count()
estimated_row_size_bytes = 100 # 추정값 (실제로는 더 정확한 계산 필요)
total_size_mb = (row_count * estimated_row_size_bytes) / (1024 * 1024)
# 최적 파티션 수 계산
optimal_partitions = max(1, int(total_size_mb / target_partition_size_mb))
current_partitions = df.rdd.getNumPartitions()
if optimal_partitions != current_partitions:
if optimal_partitions > current_partitions:
# 파티션 수 증가
optimized_df = df.repartition(optimal_partitions)
action = f"repartitioned from {current_partitions} to {optimal_partitions} partitions"
else:
# 파티션 수 감소
optimized_df = df.coalesce(optimal_partitions)
action = f"coalesced from {current_partitions} to {optimal_partitions} partitions"
else:
optimized_df = df
action = "no partitioning change needed"
return {
'optimized_dataframe': optimized_df,
'original_partitions': current_partitions,
'optimized_partitions': optimal_partitions,
'action_taken': action,
'estimated_total_size_mb': total_size_mb
}
# Slow Task 분석 예제
def slow_task_analysis_example():
spark = SparkSession.builder.appName("SlowTaskAnalysis").getOrCreate()
analyzer = SlowTaskAnalyzer(spark)
# 불균등한 데이터 생성 (일부 파티션에 더 많은 데이터)
data = []
for i in range(100000):
# 처음 50%는 파티션 0에, 나머지는 균등 분포
if i < 50000:
data.append((0, f"heavy_data_{i}", i * 100))
else:
data.append((i % 10, f"light_data_{i}", i))
df = spark.createDataFrame(data, ["partition_key", "data", "value"])
# 느린 태스크 분석
slow_task_analysis = analyzer.analyze_slow_tasks(df, threshold_seconds=1)
print("=== Slow Task Analysis ===")
print(json.dumps(slow_task_analysis, indent=2))
# 파티셔닝 최적화
optimization_result = analyzer.optimize_partitioning(df)
print("\n=== Partitioning Optimization ===")
print(f"Original partitions: {optimization_result['original_partitions']}")
print(f"Optimized partitions: {optimization_result['optimized_partitions']}")
print(f"Action taken: {optimization_result['action_taken']}")
print(f"Estimated total size: {optimization_result['estimated_total_size_mb']:.2f} MB")
Shuffle Spill 문제 해결
# Shuffle Spill 문제 해결 도구
class ShuffleSpillResolver:
def __init__(self, spark_session):
self.spark = spark_session
def detect_shuffle_spill(self, df):
"""Shuffle Spill 감지"""
# Spark 설정에서 spill 관련 설정 확인
spill_settings = {
'spark.sql.adaptive.enabled': self.spark.conf.get('spark.sql.adaptive.enabled', 'false'),
'spark.sql.adaptive.coalescePartitions.enabled': self.spark.conf.get('spark.sql.adaptive.coalescePartitions.enabled', 'false'),
'spark.sql.adaptive.advisoryPartitionSizeInBytes': self.spark.conf.get('spark.sql.adaptive.advisoryPartitionSizeInBytes', '64MB'),
'spark.sql.adaptive.skewJoin.enabled': self.spark.conf.get('spark.sql.adaptive.skewJoin.enabled', 'false')
}
return {
'spill_settings': spill_settings,
'recommendations': self._generate_spill_recommendations(spill_settings)
}
def _generate_spill_recommendations(self, settings):
"""Spill 해결 권장사항 생성"""
recommendations = []
if settings['spark.sql.adaptive.enabled'] == 'false':
recommendations.append({
'type': 'adaptive_query',
'priority': 'high',
'message': 'Adaptive Query Execution을 활성화하여 자동 최적화를 활용하세요',
'config': 'spark.sql.adaptive.enabled=true'
})
if settings['spark.sql.adaptive.coalescePartitions.enabled'] == 'false':
recommendations.append({
'type': 'partition_coalescing',
'priority': 'medium',
'message': '파티션 결합을 활성화하여 작은 파티션들을 합치세요',
'config': 'spark.sql.adaptive.coalescePartitions.enabled=true'
})
# 파티션 크기 설정 확인
partition_size = settings['spark.sql.adaptive.advisoryPartitionSizeInBytes']
if '64MB' in partition_size:
recommendations.append({
'type': 'partition_size',
'priority': 'medium',
'message': '파티션 크기를 128MB 이상으로 증가시켜 spill을 줄이세요',
'config': 'spark.sql.adaptive.advisoryPartitionSizeInBytes=128MB'
})
if settings['spark.sql.adaptive.skewJoin.enabled'] == 'false':
recommendations.append({
'type': 'skew_join',
'priority': 'medium',
'message': 'Skew Join 최적화를 활성화하여 데이터 스큐 문제를 해결하세요',
'config': 'spark.sql.adaptive.skewJoin.enabled=true'
})
return recommendations
def configure_spill_prevention(self):
"""Spill 방지 설정 적용"""
configurations = {
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
'spark.sql.adaptive.advisoryPartitionSizeInBytes': '128MB',
'spark.sql.adaptive.skewJoin.enabled': 'true',
'spark.sql.adaptive.skewJoin.skewedPartitionFactor': '5',
'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes': '256MB',
'spark.sql.adaptive.localShuffleReader.enabled': 'true',
'spark.sql.adaptive.optimizer.excludedRules': '',
'spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin': '0.2'
}
applied_configs = {}
for key, value in configurations.items():
try:
self.spark.conf.set(key, value)
applied_configs[key] = value
except Exception as e:
print(f"Failed to set {key}: {e}")
return applied_configs
def optimize_shuffle_operations(self, df, operation_type="join"):
"""Shuffle 작업 최적화"""
if operation_type == "join":
return self._optimize_join_shuffle(df)
elif operation_type == "aggregation":
return self._optimize_aggregation_shuffle(df)
elif operation_type == "repartition":
return self._optimize_repartition_shuffle(df)
else:
return df
def _optimize_join_shuffle(self, df):
"""조인 Shuffle 최적화"""
# 브로드캐스트 조인 힌트 추가 (작은 테이블의 경우)
from pyspark.sql.functions import broadcast
# 여기서는 예시로 self-join을 최적화
# 실제로는 두 개의 다른 DataFrame을 조인할 때 사용
return df.hint("broadcast")
def _optimize_aggregation_shuffle(self, df):
"""집계 Shuffle 최적화"""
# 파티션 수를 줄여서 Shuffle 비용 감소
current_partitions = df.rdd.getNumPartitions()
if current_partitions > 200:
optimized_partitions = min(200, current_partitions // 2)
return df.coalesce(optimized_partitions)
return df
def _optimize_repartition_shuffle(self, df):
"""리파티셔닝 Shuffle 최적화"""
# 적응형 쿼리 실행을 활용하여 자동 최적화
return df
# Shuffle Spill 해결 예제
def shuffle_spill_resolution_example():
spark = SparkSession.builder.appName("ShuffleSpillResolution").getOrCreate()
resolver = ShuffleSpillResolver(spark)
# Shuffle Spill 감지
spill_analysis = resolver.detect_shuffle_spill(spark.createDataFrame([(1, "test")], ["id", "value"]))
print("=== Shuffle Spill Analysis ===")
print(json.dumps(spill_analysis, indent=2))
# Spill 방지 설정 적용
applied_configs = resolver.configure_spill_prevention()
print("\n=== Applied Spill Prevention Configs ===")
for key, value in applied_configs.items():
print(f"{key}: {value}")
# 대용량 데이터로 Shuffle 테스트
data = [(i, f"data_{i}", i % 100) for i in range(1000000)]
df = spark.createDataFrame(data, ["id", "value", "category"])
# Shuffle이 발생하는 작업 수행
result_df = resolver.optimize_shuffle_operations(
df.groupBy("category").agg({"id": "count", "value": "collect_list"}),
operation_type="aggregation"
)
print("\n=== Shuffle Optimization Applied ===")
print(f"Result partitions: {result_df.rdd.getNumPartitions()}")
🤖 모니터링 자동화
Health Check 시스템
# Health Check 시스템
class SparkHealthChecker:
def __init__(self, spark_session):
self.spark = spark_session
self.health_thresholds = {
'memory_usage_ratio': 0.85,
'gc_time_ratio': 0.1,
'failed_task_ratio': 0.05,
'executor_loss_ratio': 0.1,
'slow_task_ratio': 0.2
}
def perform_health_check(self):
"""전체 시스템 건강도 체크"""
status_tracker = self.spark.sparkContext.statusTracker()
executor_infos = status_tracker.getExecutorInfos()
health_report = {
'timestamp': time.time(),
'overall_status': 'healthy',
'checks': {
'memory_usage': self._check_memory_usage(executor_infos),
'gc_performance': self._check_gc_performance(executor_infos),
'task_failure': self._check_task_failure(executor_infos),
'executor_health': self._check_executor_health(executor_infos),
'cluster_utilization': self._check_cluster_utilization(executor_infos)
},
'recommendations': [],
'critical_alerts': []
}
# 전체 상태 결정
failed_checks = [check for check in health_report['checks'].values() if not check['status']]
if failed_checks:
health_report['overall_status'] = 'unhealthy'
health_report['critical_alerts'] = [check['message'] for check in failed_checks]
# 권장사항 생성
health_report['recommendations'] = self._generate_health_recommendations(health_report['checks'])
return health_report
def _check_memory_usage(self, executor_infos):
"""메모리 사용량 체크"""
total_memory = sum(info.maxMemory for info in executor_infos)
used_memory = sum(info.memoryUsed for info in executor_infos)
if total_memory == 0:
return {'status': False, 'message': 'No memory information available', 'value': 0}
usage_ratio = used_memory / total_memory
threshold = self.health_thresholds['memory_usage_ratio']
return {
'status': usage_ratio < threshold,
'message': f'Memory usage: {usage_ratio:.2%} (threshold: {threshold:.2%})',
'value': usage_ratio,
'threshold': threshold
}
def _check_gc_performance(self, executor_infos):
"""GC 성능 체크"""
total_duration = sum(info.totalDuration for info in executor_infos)
total_gc_time = sum(info.totalGCTime for info in executor_infos)
if total_duration == 0:
return {'status': True, 'message': 'No duration information available', 'value': 0}
gc_ratio = total_gc_time / total_duration
threshold = self.health_thresholds['gc_time_ratio']
return {
'status': gc_ratio < threshold,
'message': f'GC time ratio: {gc_ratio:.2%} (threshold: {threshold:.2%})',
'value': gc_ratio,
'threshold': threshold
}
def _check_task_failure(self, executor_infos):
"""태스크 실패율 체크"""
total_tasks = sum(info.completedTasks + info.failedTasks for info in executor_infos)
failed_tasks = sum(info.failedTasks for info in executor_infos)
if total_tasks == 0:
return {'status': True, 'message': 'No tasks executed yet', 'value': 0}
failure_ratio = failed_tasks / total_tasks
threshold = self.health_thresholds['failed_task_ratio']
return {
'status': failure_ratio < threshold,
'message': f'Task failure ratio: {failure_ratio:.2%} (threshold: {threshold:.2%})',
'value': failure_ratio,
'threshold': threshold
}
def _check_executor_health(self, executor_infos):
"""실행자 건강도 체크"""
total_executors = len(executor_infos)
lost_executors = len([info for info in executor_infos if info.isActive == False])
if total_executors == 0:
return {'status': False, 'message': 'No executors available', 'value': 0}
loss_ratio = lost_executors / total_executors
threshold = self.health_thresholds['executor_loss_ratio']
return {
'status': loss_ratio < threshold,
'message': f'Executor loss ratio: {loss_ratio:.2%} (threshold: {threshold:.2%})',
'value': loss_ratio,
'threshold': threshold
}
def _check_cluster_utilization(self, executor_infos):
"""클러스터 활용률 체크"""
total_cores = sum(info.totalCores for info in executor_infos)
active_tasks = sum(info.activeTasks for info in executor_infos)
if total_cores == 0:
return {'status': False, 'message': 'No cores available', 'value': 0}
utilization = active_tasks / total_cores
return {
'status': True, # 활용률은 높을수록 좋음
'message': f'Cluster utilization: {utilization:.2%}',
'value': utilization
}
def _generate_health_recommendations(self, checks):
"""건강도 권장사항 생성"""
recommendations = []
if not checks['memory_usage']['status']:
recommendations.append({
'category': 'memory',
'priority': 'high',
'action': '메모리 사용량이 높습니다. executor-memory를 증가시키거나 데이터 캐싱을 최적화하세요'
})
if not checks['gc_performance']['status']:
recommendations.append({
'category': 'gc',
'priority': 'medium',
'action': 'GC 시간이 길어지고 있습니다. JVM GC 설정을 최적화하세요'
})
if not checks['task_failure']['status']:
recommendations.append({
'category': 'reliability',
'priority': 'high',
'action': '태스크 실패율이 높습니다. 리소스 할당과 데이터 품질을 확인하세요'
})
if not checks['executor_health']['status']:
recommendations.append({
'category': 'infrastructure',
'priority': 'critical',
'action': '실행자 손실이 발생하고 있습니다. 클러스터 상태를 점검하세요'
})
return recommendations
# Health Check 예제
def health_check_example():
spark = SparkSession.builder.appName("HealthCheck").getOrCreate()
health_checker = SparkHealthChecker(spark)
# 건강도 체크 수행
health_report = health_checker.perform_health_check()
print("=== Spark Health Check Report ===")
print(f"Overall Status: {health_report['overall_status'].upper()}")
print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(health_report['timestamp']))}")
print("\n=== Individual Checks ===")
for check_name, check_result in health_report['checks'].items():
status_icon = "✅" if check_result['status'] else "❌"
print(f"{status_icon} {check_name}: {check_result['message']}")
if health_report['critical_alerts']:
print("\n=== Critical Alerts ===")
for alert in health_report['critical_alerts']:
print(f"🚨 {alert}")
if health_report['recommendations']:
print("\n=== Recommendations ===")
for rec in health_report['recommendations']:
priority_icon = "🔴" if rec['priority'] == 'high' else "🟡" if rec['priority'] == 'medium' else "🟢"
print(f"{priority_icon} [{rec['priority'].upper()}] {rec['action']}")
📚 학습 요약
Apache Spark 시리즈 완성! 🎉
이제 Apache Spark 완전 정복 시리즈가 완성되었습니다:
- Part 1: Spark 기초와 핵심 개념 (RDD, DataFrame, Spark SQL)
- Part 2: 대용량 배치 처리와 UDF 활용 (실무 프로젝트)
- Part 3: 실시간 스트리밍 처리와 Kafka 연동 (실시간 시스템)
- Part 4: 모니터링과 성능 튜닝 (프로덕션 환경)
실무 적용 역량
✅ 기술적 역량
- Spark 아키텍처 완전 이해
- 배치/스트리밍 처리 마스터
- 성능 최적화 전문가
- 프로덕션 환경 구축
✅ 실무 적용
- 대용량 데이터 처리 시스템 구축
- 실시간 분석 파이프라인 개발
- 성능 모니터링 시스템 운영
- 클러스터 관리 및 튜닝
시리즈 완료: Apache Spark 완전 정복 시리즈 전체 보기
축하합니다! 이제 Apache Spark의 모든 것을 마스터하고 빅데이터 처리 전문가가 되었습니다! 🚀✨