Part 2: Apache Flink 고급 스트리밍 처리와 상태 관리 - 프로덕션급 실시간 시스템

Apache Flink의 고급 상태 관리, 체크포인팅, 세이브포인트, 복잡한 시간 처리 전략을 학습하고 실무에 바로 적용할 수 있는 고급 패턴들을 구현합니다.

📋 목차

  1. 고급 상태 관리 패턴
  2. 체크포인팅과 세이브포인트 심화
  3. 복잡한 시간 처리 전략
  4. 성능 최적화 기법
  5. 실무 프로젝트: 실시간 추천 시스템
  6. 학습 요약

🗃️ 고급 상태 관리 패턴

상태 백엔드 (State Backend)

Flink는 다양한 상태 백엔드를 제공하여 다양한 성능과 내구성 요구사항을 충족합니다.

1. MemoryStateBackend

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.state import MemoryStateBackend

env = StreamExecutionEnvironment.get_execution_environment()

# 메모리 상태 백엔드 설정 (개발/테스트용)
env.set_state_backend(MemoryStateBackend())

# 설정 옵션
backend = MemoryStateBackend(
    max_state_size=5 * 1024 * 1024,  # 5MB
    asynchronous_snapshots=True
)
env.set_state_backend(backend)

2. FsStateBackend

from pyflink.datastream.state import FsStateBackend

# 파일 시스템 상태 백엔드 설정
backend = FsStateBackend(
    checkpoint_data_uri="file:///tmp/flink-checkpoints",
    savepoint_data_uri="file:///tmp/flink-savepoints",
    asynchronous_snapshots=True
)
env.set_state_backend(backend)

3. RocksDBStateBackend

from pyflink.datastream.state import RocksDBStateBackend

# RocksDB 상태 백엔드 설정 (프로덕션 권장)
backend = RocksDBStateBackend(
    checkpoint_data_uri="file:///tmp/flink-checkpoints",
    savepoint_data_uri="file:///tmp/flink-savepoints",
    enable_incremental_checkpointing=True
)
env.set_state_backend(backend)

# RocksDB 최적화 설정
backend.set_predefined_options(RocksDBConfigurableOptions.PREDEFINED_OPTION_DEFAULT)
backend.set_rocksdb_options(RocksDBConfigurableOptions.ROCKSDB_OPTIONS_FILE)

고급 상태 관리 패턴

1. 상태 TTL (Time-To-Live)

from pyflink.common.state import StateTtlConfig, ValueStateDescriptor
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types

class TTLStateExample(KeyedProcessFunction):
    def __init__(self):
        self.user_session_state = None
    
    def open(self, runtime_context):
        # TTL 설정
        ttl_config = StateTtlConfig.new_builder(Time.hours(24)) \
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
            .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
            .cleanup_full_snapshot() \
            .build()
        
        # TTL이 적용된 상태 디스크립터
        state_descriptor = ValueStateDescriptor("user_session", Types.STRING())
        state_descriptor.enable_time_to_live(ttl_config)
        
        self.user_session_state = runtime_context.get_state(state_descriptor)
    
    def process_element(self, value, ctx):
        # 상태 사용
        current_session = self.user_session_state.value()
        if current_session is None:
            # 새로운 세션 생성
            new_session = f"session_{ctx.timestamp()}"
            self.user_session_state.update(new_session)
            ctx.collect(f"New session created: {new_session}")
        else:
            ctx.collect(f"Existing session: {current_session}")

2. 상태 스냅샷과 복원

from pyflink.common.state import CheckpointedFunction, ListState

class StatefulFunction(KeyedProcessFunction, CheckpointedFunction):
    def __init__(self):
        self.buffered_elements = []
        self.buffered_elements_state = None
    
    def open(self, runtime_context):
        # 상태 초기화는 checkpointed_function에서 수행
        pass
    
    def initialize_state(self, context):
        """체크포인트에서 상태 초기화"""
        self.buffered_elements_state = context.get_operator_state(
            ListStateDescriptor("buffered-elements", Types.STRING())
        )
        
        # 이전 상태 복원
        if context.is_restored():
            for element in self.buffered_elements_state.get():
                self.buffered_elements.append(element)
    
    def snapshot_state(self, context):
        """상태 스냅샷 생성"""
        self.buffered_elements_state.clear()
        for element in self.buffered_elements:
            self.buffered_elements_state.add(element)
    
    def process_element(self, value, ctx):
        # 버퍼에 요소 추가
        self.buffered_elements.append(value)
        
        # 조건에 따라 처리
        if len(self.buffered_elements) >= 10:
            # 배치 처리
            result = self.process_batch(self.buffered_elements)
            ctx.collect(result)
            self.buffered_elements.clear()
    
    def process_batch(self, elements):
        return f"Processed batch of {len(elements)} elements"

3. 상태 분할과 병합

class PartitionableStateFunction(KeyedProcessFunction, CheckpointedFunction):
    def __init__(self):
        self.partitioned_data = {}
        self.partitioned_state = None
    
    def initialize_state(self, context):
        """분할 가능한 상태 초기화"""
        self.partitioned_state = context.get_union_list_state(
            ListStateDescriptor("partitioned-data", Types.STRING())
        )
        
        # 상태 복원
        if context.is_restored():
            for data in self.partitioned_state.get():
                self.partitioned_data[data] = data
    
    def snapshot_state(self, context):
        """상태 스냅샷"""
        self.partitioned_state.clear()
        for data in self.partitioned_data.values():
            self.partitioned_state.add(data)
    
    def process_element(self, value, ctx):
        # 분할된 데이터 처리
        partition_key = value % 4  # 4개 파티션으로 분할
        
        if partition_key not in self.partitioned_data:
            self.partitioned_data[partition_key] = []
        
        self.partitioned_data[partition_key].append(value)
        
        # 파티션별 집계
        if len(self.partitioned_data[partition_key]) >= 5:
            result = sum(self.partitioned_data[partition_key])
            ctx.collect(f"Partition {partition_key}: {result}")
            self.partitioned_data[partition_key].clear()

🔄 체크포인팅과 세이브포인트 심화

고급 체크포인팅 설정

from pyflink.common.checkpointing import CheckpointingMode, ExternalizedCheckpointCleanup
from pyflink.common.time import Time

class AdvancedCheckpointingSetup:
    def __init__(self, env):
        self.env = env
        self.setup_checkpointing()
    
    def setup_checkpointing(self):
        checkpoint_config = self.env.get_checkpoint_config()
        
        # 기본 체크포인팅 활성화
        checkpoint_config.enable_checkpointing(1000)  # 1초마다
        
        # Exactly-once 모드 설정
        checkpoint_config.set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
        
        # 외부화된 체크포인트 설정 (작업 취소 후에도 유지)
        checkpoint_config.enable_externalized_checkpoints(
            ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        )
        
        # 최대 동시 체크포인트 수
        checkpoint_config.set_max_concurrent_checkpoints(1)
        
        # 체크포인트 간 최소 대기 시간
        checkpoint_config.set_min_pause_between_checkpoints(500)
        
        # 체크포인트 타임아웃
        checkpoint_config.set_checkpoint_timeout(60000)  # 60초
        
        # 실패 허용 횟수
        checkpoint_config.set_tolerable_checkpoint_failure_number(3)
        
        # 지연 가능한 체크포인트 설정
        checkpoint_config.set_unaligned_checkpoints(True)
        checkpoint_config.set_unaligned_checkpoints_enabled(True)

세이브포인트 관리

import requests
import json

class SavepointManager:
    def __init__(self, flink_rest_url="http://localhost:8081"):
        self.rest_url = flink_rest_url
    
    def create_savepoint(self, job_id, savepoint_path):
        """세이브포인트 생성"""
        url = f"{self.rest_url}/jobs/{job_id}/savepoints"
        
        payload = {
            "target-directory": savepoint_path,
            "cancel-job": False
        }
        
        response = requests.post(url, json=payload)
        
        if response.status_code == 202:
            trigger_id = response.json()["request-id"]
            return self.wait_for_completion(trigger_id)
        else:
            raise Exception(f"Failed to create savepoint: {response.text}")
    
    def wait_for_completion(self, trigger_id, max_wait_time=300):
        """세이브포인트 완료 대기"""
        import time
        
        start_time = time.time()
        while time.time() - start_time < max_wait_time:
            url = f"{self.rest_url}/jobs/savepoints/{trigger_id}"
            response = requests.get(url)
            
            if response.status_code == 200:
                result = response.json()
                if result["status"]["id"] == "COMPLETED":
                    return result["operation"]["location"]
                elif result["status"]["id"] == "FAILED":
                    raise Exception(f"Savepoint failed: {result}")
            
            time.sleep(2)
        
        raise Exception("Savepoint creation timeout")
    
    def restore_from_savepoint(self, savepoint_path, jar_path):
        """세이브포인트에서 복원"""
        url = f"{self.rest_url}/jars/upload"
        
        # JAR 파일 업로드
        with open(jar_path, 'rb') as f:
            files = {'jarfile': f}
            response = requests.post(url, files=files)
        
        if response.status_code == 200:
            jar_id = response.json()["filename"].split("/")[-1]
            
            # 세이브포인트에서 작업 시작
            start_url = f"{self.rest_url}/jars/{jar_id}/run"
            payload = {
                "savepointPath": savepoint_path
            }
            
            start_response = requests.post(start_url, json=payload)
            return start_response.status_code == 200
        else:
            raise Exception(f"Failed to upload JAR: {response.text}")
    
    def list_savepoints(self):
        """세이브포인트 목록 조회"""
        # 실제 구현에서는 파일 시스템을 스캔하거나 메타데이터 저장소를 조회
        return []

# 사용 예제
def manage_savepoints_example():
    manager = SavepointManager()
    
    try:
        # 세이브포인트 생성
        savepoint_location = manager.create_savepoint(
            "job-id-123",
            "file:///tmp/flink-savepoints/"
        )
        print(f"Savepoint created: {savepoint_location}")
        
        # 세이브포인트에서 복원
        success = manager.restore_from_savepoint(
            savepoint_location,
            "/path/to/job.jar"
        )
        print(f"Restore success: {success}")
        
    except Exception as e:
        print(f"Error: {e}")

체크포인트 모니터링

class CheckpointMonitor:
    def __init__(self, flink_rest_url="http://localhost:8081"):
        self.rest_url = flink_rest_url
    
    def get_checkpoint_metrics(self, job_id):
        """체크포인트 메트릭 조회"""
        url = f"{self.rest_url}/jobs/{job_id}/checkpoints"
        response = requests.get(url)
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"Failed to get checkpoint metrics: {response.text}")
    
    def analyze_checkpoint_performance(self, job_id):
        """체크포인트 성능 분석"""
        metrics = self.get_checkpoint_metrics(job_id)
        
        checkpoints = metrics.get("latest", {}).get("completed", [])
        
        if not checkpoints:
            return {"message": "No completed checkpoints found"}
        
        # 성능 분석
        durations = [cp["duration"] for cp in checkpoints]
        sizes = [cp["size"] for cp in checkpoints]
        
        analysis = {
            "total_checkpoints": len(checkpoints),
            "avg_duration_ms": sum(durations) / len(durations),
            "max_duration_ms": max(durations),
            "min_duration_ms": min(durations),
            "avg_size_bytes": sum(sizes) / len(sizes),
            "max_size_bytes": max(sizes),
            "min_size_bytes": min(sizes)
        }
        
        # 성능 권장사항
        recommendations = []
        
        if analysis["avg_duration_ms"] > 10000:  # 10초 이상
            recommendations.append({
                "type": "performance",
                "message": "체크포인트 시간이 길어서 성능에 영향을 줄 수 있습니다. 상태 크기를 줄이거나 백엔드를 최적화하세요."
            })
        
        if analysis["avg_size_bytes"] > 100 * 1024 * 1024:  # 100MB 이상
            recommendations.append({
                "type": "storage",
                "message": "체크포인트 크기가 큽니다. 불필요한 상태를 정리하거나 TTL을 설정하세요."
            })
        
        analysis["recommendations"] = recommendations
        
        return analysis

# 모니터링 예제
def checkpoint_monitoring_example():
    monitor = CheckpointMonitor()
    
    try:
        analysis = monitor.analyze_checkpoint_performance("job-id-123")
        print("=== Checkpoint Performance Analysis ===")
        print(json.dumps(analysis, indent=2))
    except Exception as e:
        print(f"Error: {e}")

⏰ 복잡한 시간 처리 전략

다중 시간 윈도우 처리

from pyflink.datastream.window import TumblingEventTimeWindows, SlidingEventTimeWindows
from pyflink.datastream.functions import AllWindowFunction
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types

class MultiTimeWindowProcessor(AllWindowFunction):
    def __init__(self):
        self.window_results = {}
    
    def apply(self, window, inputs, out):
        """다중 시간 윈도우에서 데이터 처리"""
        window_start = window.start
        window_end = window.end
        
        # 윈도우별 집계
        window_key = f"{window_start}_{window_end}"
        
        if window_key not in self.window_results:
            self.window_results[window_key] = {
                "count": 0,
                "sum": 0,
                "min": float('inf'),
                "max": float('-inf')
            }
        
        for element in inputs:
            value = element[1]  # (timestamp, value) 형태 가정
            self.window_results[window_key]["count"] += 1
            self.window_results[window_key]["sum"] += value
            self.window_results[window_key]["min"] = min(
                self.window_results[window_key]["min"], value
            )
            self.window_results[window_key]["max"] = max(
                self.window_results[window_key]["max"], value
            )
        
        # 결과 출력
        result = self.window_results[window_key].copy()
        result["window_start"] = window_start
        result["window_end"] = window_end
        out.collect(result)

class AdvancedTimeProcessing:
    def __init__(self, env):
        self.env = env
    
    def setup_multi_window_processing(self, data_stream):
        """다중 윈도우 처리 설정"""
        # 1분 윈도우
        one_minute_window = data_stream.window_all(
            TumblingEventTimeWindows.of(Time.minutes(1))
        ).apply(MultiTimeWindowProcessor(), output_type=Types.PICKLED_BYTE_ARRAY())
        
        # 5분 윈도우
        five_minute_window = data_stream.window_all(
            TumblingEventTimeWindows.of(Time.minutes(5))
        ).apply(MultiTimeWindowProcessor(), output_type=Types.PICKLED_BYTE_ARRAY())
        
        # 1분 슬라이딩 윈도우 (30초 슬라이드)
        sliding_window = data_stream.window_all(
            SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))
        ).apply(MultiTimeWindowProcessor(), output_type=Types.PICKLED_BYTE_ARRAY())
        
        return {
            "one_minute": one_minute_window,
            "five_minute": five_minute_window,
            "sliding": sliding_window
        }

동적 워터마크 생성

from pyflink.datastream.functions import ProcessFunction
from pyflink.common.time import Time

class DynamicWatermarkGenerator(ProcessFunction):
    def __init__(self, max_delay_seconds=60):
        self.max_delay_seconds = max_delay_seconds
        self.current_watermark = 0
        self.element_count = 0
        self.delay_statistics = []
    
    def process_element(self, element, ctx):
        """동적 워터마크 생성"""
        event_time = element.timestamp
        processing_time = ctx.timer_service().current_processing_time()
        
        # 지연 시간 계산
        delay = processing_time - event_time
        self.delay_statistics.append(delay)
        
        # 최근 100개 요소의 평균 지연 시간으로 워터마크 조정
        if len(self.delay_statistics) > 100:
            self.delay_statistics.pop(0)
        
        if self.delay_statistics:
            avg_delay = sum(self.delay_statistics) / len(self.delay_statistics)
            dynamic_delay = max(avg_delay * 1.5, self.max_delay_seconds * 1000)
            
            # 동적 워터마크 생성
            new_watermark = event_time - dynamic_delay
            if new_watermark > self.current_watermark:
                self.current_watermark = new_watermark
                ctx.timer_service().register_event_time_timer(new_watermark)
        
        # 요소 처리
        ctx.collect(element)
    
    def on_timer(self, timestamp, ctx, out):
        """타이머 기반 워터마크 생성"""
        # 주기적으로 워터마크 생성
        current_time = ctx.timer_service().current_processing_time()
        next_watermark = current_time - (self.max_delay_seconds * 1000)
        
        if next_watermark > self.current_watermark:
            self.current_watermark = next_watermark
            ctx.timer_service().register_processing_time_timer(current_time + 1000)

시간 기반 상태 관리

from pyflink.common.state import ValueStateDescriptor
from pyflink.datastream.functions import KeyedProcessFunction

class TimeBasedStateManager(KeyedProcessFunction):
    def __init__(self):
        self.user_activity_state = None
        self.last_activity_time = None
        self.session_timeout_ms = 30 * 60 * 1000  # 30분
    
    def open(self, runtime_context):
        self.user_activity_state = runtime_context.get_state(
            ValueStateDescriptor("user_activity", Types.STRING())
        )
        self.last_activity_time = runtime_context.get_state(
            ValueStateDescriptor("last_activity_time", Types.LONG())
        )
    
    def process_element(self, element, ctx):
        """시간 기반 상태 관리"""
        current_time = ctx.timestamp()
        user_id = element.user_id
        
        # 이전 활동 시간 확인
        last_time = self.last_activity_time.value()
        
        if last_time is None or (current_time - last_time) > self.session_timeout_ms:
            # 새로운 세션 시작
            new_session = f"session_{current_time}"
            self.user_activity_state.update(new_session)
            
            # 세션 타임아웃 타이머 등록
            timeout_timer = current_time + self.session_timeout_ms
            ctx.timer_service().register_event_time_timer(timeout_timer)
            
            ctx.collect(f"New session started for user {user_id}: {new_session}")
        else:
            # 기존 세션 업데이트
            current_session = self.user_activity_state.value()
            ctx.collect(f"Activity in existing session: {current_session}")
        
        # 마지막 활동 시간 업데이트
        self.last_activity_time.update(current_time)
    
    def on_timer(self, timestamp, ctx, out):
        """세션 타임아웃 처리"""
        # 세션 종료 처리
        session = self.user_activity_state.value()
        if session:
            out.collect(f"Session timeout: {session}")
            self.user_activity_state.clear()
            self.last_activity_time.clear()

⚡ 성능 최적화 기법

병렬성 최적화

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema

class ParallelismOptimizer:
    def __init__(self, env):
        self.env = env
    
    def optimize_kafka_parallelism(self, topic, bootstrap_servers):
        """Kafka 병렬성 최적화"""
        kafka_properties = {
            'bootstrap.servers': bootstrap_servers,
            'group.id': 'optimized-consumer-group'
        }
        
        kafka_source = FlinkKafkaConsumer(
            topic,
            SimpleStringSchema(),
            kafka_properties
        )
        
        # 파티션 수에 맞춰 병렬성 설정
        kafka_source.set_parallelism(4)  # Kafka 파티션 수와 동일하게 설정
        
        return self.env.add_source(kafka_source)
    
    def optimize_processing_parallelism(self, data_stream):
        """처리 병렬성 최적화"""
        # 키별 병렬성 설정
        keyed_stream = data_stream.key_by(lambda x: x[0])  # 첫 번째 필드로 키 지정
        
        # 병렬 처리 설정
        optimized_stream = keyed_stream.map(
            lambda x: self.expensive_operation(x),
            output_type=Types.STRING()
        ).set_parallelism(8)  # 높은 병렬성으로 설정
        
        return optimized_stream
    
    def expensive_operation(self, data):
        """비용이 큰 연산 시뮬레이션"""
        import time
        time.sleep(0.01)  # 10ms 처리 시간
        return f"processed_{data}"

class MemoryOptimizer:
    def __init__(self, env):
        self.env = env
    
    def setup_memory_optimization(self):
        """메모리 최적화 설정"""
        # JVM 힙 메모리 설정
        self.env.get_config().set_global_job_parameters({
            "taskmanager.memory.process.size": "2048m",
            "taskmanager.memory.managed.size": "1024m"
        })
        
        # 네트워크 버퍼 설정
        self.env.get_config().set_global_job_parameters({
            "taskmanager.network.memory.fraction": "0.2",
            "taskmanager.network.memory.min": "128mb",
            "taskmanager.network.memory.max": "1gb"
        })
        
        # 상태 백엔드 최적화
        from pyflink.datastream.state import RocksDBStateBackend
        backend = RocksDBStateBackend(
            checkpoint_data_uri="file:///tmp/flink-checkpoints",
            enable_incremental_checkpointing=True
        )
        
        # RocksDB 메모리 설정
        backend.set_rocksdb_options({
            "write_buffer_size": "64MB",
            "max_write_buffer_number": "3",
            "block_cache_size": "256MB"
        })
        
        self.env.set_state_backend(backend)
    
    def optimize_data_serialization(self, data_stream):
        """데이터 직렬화 최적화"""
        # Kryo 직렬화 설정
        self.env.get_config().set_global_job_parameters({
            "taskmanager.runtime.kryo.default.serializers": "true"
        })
        
        # Arrow 기반 직렬화 사용
        optimized_stream = data_stream.map(
            lambda x: x,  # 변환 없음
            output_type=Types.PICKLED_BYTE_ARRAY()  # 최적화된 타입
        )
        
        return optimized_stream

네트워크 최적화

class NetworkOptimizer:
    def __init__(self, env):
        self.env = env
    
    def setup_network_optimization(self):
        """네트워크 최적화 설정"""
        # 네트워크 스택 최적화
        self.env.get_config().set_global_job_parameters({
            "taskmanager.network.netty.num-arenas": "4",
            "taskmanager.network.netty.server.numThreads": "4",
            "taskmanager.network.netty.client.numThreads": "4",
            "taskmanager.network.netty.server.backlog": "0"
        })
        
        # 버퍼 크기 최적화
        self.env.get_config().set_global_job_parameters({
            "taskmanager.network.memory.buffers-per-channel": "2",
            "taskmanager.network.memory.floating-buffers-per-gate": "8"
        })
    
    def optimize_shuffle_network(self, data_stream):
        """Shuffle 네트워크 최적화"""
        # 데이터 파티셔닝 최적화
        optimized_stream = data_stream.partition_custom(
            lambda key, num_partitions: hash(key) % num_partitions,
            lambda x: x[0]  # 키 추출 함수
        )
        
        return optimized_stream

class LatencyOptimizer:
    def __init__(self, env):
        self.env = env
    
    def setup_low_latency_config(self):
        """저지연 설정"""
        # 버퍼 타임아웃 최소화
        self.env.get_config().set_global_job_parameters({
            "taskmanager.network.netty.server.backlog": "0",
            "taskmanager.network.netty.client.connectTimeoutSec": "10"
        })
        
        # 체크포인팅 간격 조정
        self.env.get_checkpoint_config().enable_checkpointing(100)  # 100ms
        
        # 지연 가능한 체크포인트 활성화
        self.env.get_checkpoint_config().set_unaligned_checkpoints(True)
    
    def optimize_processing_latency(self, data_stream):
        """처리 지연시간 최적화"""
        # 스트리밍 모드 설정
        self.env.set_buffer_timeout(1)  # 1ms 버퍼 타임아웃
        
        # 즉시 처리 설정
        optimized_stream = data_stream.map(
            lambda x: x,
            output_type=Types.STRING()
        ).set_buffer_timeout(0)  # 버퍼링 없이 즉시 처리
        
        return optimized_stream

🚀 실무 프로젝트: 실시간 추천 시스템

프로젝트 개요

사용자 행동 데이터를 실시간으로 분석하여 개인화된 추천을 제공하는 시스템을 구축합니다.

1. 데이터 모델 정의

from dataclasses import dataclass
from typing import List, Dict, Optional
import json

@dataclass
class UserEvent:
    user_id: str
    item_id: str
    event_type: str  # view, click, purchase, like
    timestamp: int
    session_id: str
    metadata: Dict
    
    def to_json(self):
        return json.dumps({
            'user_id': self.user_id,
            'item_id': self.item_id,
            'event_type': self.event_type,
            'timestamp': self.timestamp,
            'session_id': self.session_id,
            'metadata': self.metadata
        })
    
    @classmethod
    def from_json(cls, json_str):
        data = json.loads(json_str)
        return cls(**data)

@dataclass
class UserProfile:
    user_id: str
    interests: Dict[str, float]  # 카테고리별 관심도
    behavior_patterns: Dict[str, int]  # 행동 패턴
    last_updated: int
    
    def update_interest(self, category: str, weight: float):
        """관심도 업데이트"""
        if category not in self.interests:
            self.interests[category] = 0.0
        
        # 지수 이동 평균으로 업데이트
        alpha = 0.1
        self.interests[category] = (1 - alpha) * self.interests[category] + alpha * weight
    
    def get_top_interests(self, top_k: int = 5):
        """상위 관심사 반환"""
        return sorted(
            self.interests.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_k]

@dataclass
class Recommendation:
    user_id: str
    item_id: str
    score: float
    reason: str
    timestamp: int

2. 실시간 사용자 프로파일링

from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.common.state import ValueStateDescriptor, MapStateDescriptor
from pyflink.common.typeinfo import Types

class RealTimeUserProfiler(KeyedProcessFunction):
    def __init__(self):
        self.user_profile_state = None
        self.item_categories = {}  # 아이템-카테고리 매핑
    
    def open(self, runtime_context):
        self.user_profile_state = runtime_context.get_state(
            ValueStateDescriptor("user_profile", Types.STRING())
        )
    
    def process_element(self, user_event, ctx):
        """실시간 사용자 프로파일 업데이트"""
        user_id = user_event.user_id
        
        # 현재 프로파일 로드
        profile_json = self.user_profile_state.value()
        if profile_json:
            profile = UserProfile.from_json(profile_json)
        else:
            profile = UserProfile(
                user_id=user_id,
                interests={},
                behavior_patterns={},
                last_updated=user_event.timestamp
            )
        
        # 이벤트 타입별 처리
        self.update_profile_from_event(profile, user_event)
        
        # 프로파일 저장
        self.user_profile_state.update(profile.to_json())
        
        # 업데이트된 프로파일 출력
        ctx.collect(profile)
    
    def update_profile_from_event(self, profile: UserProfile, event: UserEvent):
        """이벤트 기반 프로파일 업데이트"""
        # 아이템 카테고리 가져오기 (실제로는 별도 서비스에서 조회)
        category = self.get_item_category(event.item_id)
        
        # 이벤트 타입별 가중치
        event_weights = {
            'view': 0.1,
            'click': 0.3,
            'like': 0.5,
            'purchase': 1.0
        }
        
        weight = event_weights.get(event.event_type, 0.1)
        
        # 관심도 업데이트
        profile.update_interest(category, weight)
        
        # 행동 패턴 업데이트
        pattern_key = f"{event.event_type}_{category}"
        profile.behavior_patterns[pattern_key] = \
            profile.behavior_patterns.get(pattern_key, 0) + 1
        
        profile.last_updated = event.timestamp
    
    def get_item_category(self, item_id: str) -> str:
        """아이템 카테고리 조회"""
        # 실제로는 데이터베이스나 캐시에서 조회
        categories = ['electronics', 'books', 'clothing', 'home', 'sports']
        return categories[hash(item_id) % len(categories)]
    
    def to_json(self):
        return json.dumps({
            'user_id': self.user_id,
            'interests': self.interests,
            'behavior_patterns': self.behavior_patterns,
            'last_updated': self.last_updated
        })
    
    @classmethod
    def from_json(cls, json_str):
        data = json.loads(json_str)
        return cls(**data)

3. 협업 필터링 기반 추천 엔진

class CollaborativeFilteringEngine(KeyedProcessFunction):
    def __init__(self):
        self.user_item_matrix = None
        self.item_item_similarity = None
        self.user_similarity = None
    
    def open(self, runtime_context):
        self.user_item_matrix = runtime_context.get_map_state(
            MapStateDescriptor("user_item_matrix", Types.STRING(), Types.FLOAT())
        )
        self.item_item_similarity = runtime_context.get_map_state(
            MapStateDescriptor("item_similarity", Types.STRING(), Types.FLOAT())
        )
        self.user_similarity = runtime_context.get_map_state(
            MapStateDescriptor("user_similarity", Types.STRING(), Types.FLOAT())
        )
    
    def process_element(self, user_event, ctx):
        """협업 필터링 기반 추천 생성"""
        user_id = user_event.user_id
        item_id = user_event.item_id
        
        # 사용자-아이템 행렬 업데이트
        matrix_key = f"{user_id}:{item_id}"
        current_rating = self.user_item_matrix.get(matrix_key) or 0.0
        
        # 이벤트 기반 평점 계산
        event_rating = self.calculate_rating_from_event(user_event)
        new_rating = (current_rating + event_rating) / 2
        
        self.user_item_matrix.put(matrix_key, new_rating)
        
        # 아이템 기반 추천 생성
        item_recommendations = self.generate_item_based_recommendations(user_id, item_id)
        
        # 사용자 기반 추천 생성
        user_recommendations = self.generate_user_based_recommendations(user_id)
        
        # 추천 결과 통합
        final_recommendations = self.combine_recommendations(
            item_recommendations, user_recommendations
        )
        
        for rec in final_recommendations:
            ctx.collect(rec)
    
    def calculate_rating_from_event(self, event: UserEvent) -> float:
        """이벤트 기반 평점 계산"""
        rating_map = {
            'view': 1.0,
            'click': 2.0,
            'like': 3.0,
            'purchase': 5.0
        }
        return rating_map.get(event.event_type, 1.0)
    
    def generate_item_based_recommendations(self, user_id: str, item_id: str, top_k: int = 10):
        """아이템 기반 추천 생성"""
        # 유사한 아이템 찾기
        similar_items = self.find_similar_items(item_id, top_k)
        
        recommendations = []
        for similar_item, similarity in similar_items:
            # 사용자가 아직 상호작용하지 않은 아이템만 추천
            matrix_key = f"{user_id}:{similar_item}"
            if not self.user_item_matrix.get(matrix_key):
                score = similarity * 0.8  # 아이템 기반 가중치
                recommendations.append(Recommendation(
                    user_id=user_id,
                    item_id=similar_item,
                    score=score,
                    reason="Similar to items you liked",
                    timestamp=int(time.time())
                ))
        
        return sorted(recommendations, key=lambda x: x.score, reverse=True)[:top_k]
    
    def generate_user_based_recommendations(self, user_id: str, top_k: int = 10):
        """사용자 기반 추천 생성"""
        # 유사한 사용자 찾기
        similar_users = self.find_similar_users(user_id, top_k)
        
        recommendations = []
        for similar_user, similarity in similar_users:
            # 유사한 사용자가 좋아한 아이템 중 현재 사용자가 아직 상호작용하지 않은 것
            user_items = self.get_user_items(similar_user)
            current_user_items = self.get_user_items(user_id)
            
            for item_id in user_items:
                if item_id not in current_user_items:
                    score = similarity * 0.6  # 사용자 기반 가중치
                    recommendations.append(Recommendation(
                        user_id=user_id,
                        item_id=item_id,
                        score=score,
                        reason="Users with similar tastes liked this",
                        timestamp=int(time.time())
                    ))
        
        return sorted(recommendations, key=lambda x: x.score, reverse=True)[:top_k]
    
    def find_similar_items(self, item_id: str, top_k: int):
        """유사한 아이템 찾기"""
        # 실제로는 더 정교한 유사도 계산 알고리즘 사용
        similar_items = []
        
        # 모든 아이템과의 유사도 계산 (간단한 예시)
        for other_item in self.get_all_items():
            if other_item != item_id:
                similarity = self.calculate_item_similarity(item_id, other_item)
                similar_items.append((other_item, similarity))
        
        return sorted(similar_items, key=lambda x: x[1], reverse=True)[:top_k]
    
    def calculate_item_similarity(self, item1: str, item2: str) -> float:
        """아이템 유사도 계산 (코사인 유사도)"""
        # 실제로는 사용자-아이템 행렬을 기반으로 계산
        return abs(hash(item1) - hash(item2)) / (hash(item1) + hash(item2) + 1)
    
    def get_all_items(self):
        """모든 아이템 목록 반환"""
        # 실제로는 데이터베이스에서 조회
        return [f"item_{i}" for i in range(1000)]
    
    def get_user_items(self, user_id: str):
        """사용자가 상호작용한 아이템 목록 반환"""
        items = []
        for key in self.user_item_matrix.keys():
            if key.startswith(f"{user_id}:"):
                item_id = key.split(":")[1]
                items.append(item_id)
        return items
    
    def combine_recommendations(self, item_recs, user_recs):
        """추천 결과 통합"""
        combined = {}
        
        # 아이템 기반 추천 추가
        for rec in item_recs:
            combined[rec.item_id] = rec
        
        # 사용자 기반 추천 추가 (점수 조정)
        for rec in user_recs:
            if rec.item_id in combined:
                # 이미 있는 경우 점수 평균
                combined[rec.item_id].score = (combined[rec.item_id].score + rec.score) / 2
            else:
                combined[rec.item_id] = rec
        
        return list(combined.values())

4. 실시간 추천 시스템 통합

class RealTimeRecommendationSystem:
    def __init__(self):
        self.env = StreamExecutionEnvironment.get_execution_environment()
        self.setup_environment()
    
    def setup_environment(self):
        """환경 설정"""
        # 체크포인팅 활성화
        self.env.get_checkpoint_config().enable_checkpointing(1000)
        
        # 상태 백엔드 설정
        from pyflink.datastream.state import RocksDBStateBackend
        backend = RocksDBStateBackend(
            checkpoint_data_uri="file:///tmp/recommendation-checkpoints"
        )
        self.env.set_state_backend(backend)
    
    def create_kafka_source(self, topic, bootstrap_servers):
        """Kafka 소스 생성"""
        from pyflink.datastream.connectors import FlinkKafkaConsumer
        from pyflink.common.serialization import SimpleStringSchema
        
        kafka_properties = {
            'bootstrap.servers': bootstrap_servers,
            'group.id': 'recommendation-system'
        }
        
        kafka_source = FlinkKafkaConsumer(
            topic,
            SimpleStringSchema(),
            kafka_properties
        )
        
        return self.env.add_source(kafka_source)
    
    def parse_user_events(self, event_stream):
        """사용자 이벤트 파싱"""
        def parse_event(event_str):
            try:
                return UserEvent.from_json(event_str)
            except Exception as e:
                print(f"Failed to parse event: {event_str}, Error: {e}")
                return None
        
        return event_stream.map(parse_event, output_type=Types.PICKLED_BYTE_ARRAY()) \
                          .filter(lambda x: x is not None)
    
    def run_recommendation_system(self):
        """추천 시스템 실행"""
        # Kafka에서 사용자 이벤트 읽기
        event_stream = self.create_kafka_source("user-events", "localhost:9092")
        
        # 이벤트 파싱
        parsed_events = self.parse_user_events(event_stream)
        
        # 타임스탬프와 워터마크 할당
        from pyflink.datastream.functions import BoundedOutOfOrdernessTimestampExtractor
        from pyflink.common.time import Time
        
        class EventTimestampExtractor(BoundedOutOfOrdernessTimestampExtractor):
            def __init__(self):
                super().__init__(Time.seconds(10))
            
            def extract_timestamp(self, element, previous_timestamp):
                return element.timestamp * 1000
        
        watermarked_events = parsed_events.assign_timestamps_and_watermarks(
            EventTimestampExtractor()
        )
        
        # 실시간 사용자 프로파일링
        user_profiles = watermarked_events.key_by(lambda event: event.user_id) \
                                        .process(RealTimeUserProfiler())
        
        # 협업 필터링 기반 추천
        recommendations = watermarked_events.key_by(lambda event: event.user_id) \
                                          .process(CollaborativeFilteringEngine())
        
        # 결과 출력
        user_profiles.print("User Profiles")
        recommendations.print("Recommendations")
        
        # 실행
        self.env.execute("Real-time Recommendation System")

# 시스템 실행
if __name__ == "__main__":
    system = RealTimeRecommendationSystem()
    system.run_recommendation_system()

📚 학습 요약

이번 파트에서 학습한 내용

  1. 고급 상태 관리 패턴
    • 상태 백엔드 (Memory, Fs, RocksDB)
    • 상태 TTL과 자동 정리
    • 상태 스냅샷과 복원
    • 상태 분할과 병합
  2. 체크포인팅과 세이브포인트
    • 고급 체크포인팅 설정
    • 세이브포인트 생성과 복원
    • 체크포인트 성능 모니터링
    • 장애 복구 전략
  3. 복잡한 시간 처리
    • 다중 시간 윈도우 처리
    • 동적 워터마크 생성
    • 시간 기반 상태 관리
    • 지연 데이터 처리
  4. 성능 최적화
    • 병렬성 최적화
    • 메모리 최적화
    • 네트워크 최적화
    • 지연시간 최적화
  5. 실무 프로젝트
    • 실시간 추천 시스템
    • 사용자 프로파일링
    • 협업 필터링
    • 개인화 추천

핵심 기술 스택

기술 용도 중요도
고급 상태 관리 복잡한 상태 처리 ⭐⭐⭐⭐⭐
체크포인팅 장애 복구와 안정성 ⭐⭐⭐⭐⭐
시간 처리 정확한 시간 기반 분석 ⭐⭐⭐⭐⭐
성능 최적화 프로덕션 환경 최적화 ⭐⭐⭐⭐
실시간 추천 개인화 서비스 ⭐⭐⭐⭐

다음 파트 미리보기

Part 3: 실시간 분석과 CEP (Complex Event Processing)에서는 다음 내용을 다룹니다:

  • 복잡한 이벤트 처리 패턴
  • 실시간 집계와 윈도우 함수
  • CEP 패턴 매칭
  • 실시간 대시보드 구축

다음 파트: Part 3: 실시간 분석과 CEP


이제 Flink의 고급 기능들을 마스터했습니다! 다음 파트에서는 복잡한 이벤트 처리의 세계로 들어가겠습니다. 🚀