Part 2: Kafka Connect와 프로덕션 CDC 운영 - 엔터프라이즈급 실시간 데이터 파이프라인

Kafka Connect 고급 아키텍처, 커스텀 커넥터 개발, 대규모 CDC 파이프라인 운영 전략, 성능 최적화와 장애 복구까지 완전한 가이드입니다.

📋 목차

  1. Kafka Connect 고급 아키텍처
  2. 커스텀 커넥터 개발
  3. 대규모 CDC 파이프라인 운영
  4. 성능 최적화와 병목 해결
  5. 데이터 일관성 보장과 검증
  6. 모니터링과 장애 복구 전략
  7. 실무 프로젝트: 엔터프라이즈 CDC 운영 시스템
  8. 학습 요약

🏗 ️ Kafka Connect 고급 아키텍처

Kafka Connect 아키텍처 심화

Kafka Connect는 분산 스트리밍 플랫폼으로, 데이터베이스와 시스템 간의 데이터 동기화를 위한 확장 가능한 프레임워크입니다.

핵심 컴포넌트 상세 분석

컴포넌트 역할 확장성 고려사항 운영 포인트
Connect Workers 커넥터 실행 환경 • 수평 확장 가능
• CPU/메모리 기반 스케일링
• 리소스 모니터링
• 장애 복구 자동화
Connectors 데이터 소스/싱크 로직 • 플러그인 아키텍처
• 독립적 배포
• 버전 관리
• 호환성 테스트
Tasks 실제 데이터 처리 • 파티션 기반 병렬화
• 동적 태스크 할당
• 부하 분산
• 장애 격리
Transforms 데이터 변환 로직 • 체인 가능한 변환
• 커스텀 SMT 지원
• 성능 최적화
• 메모리 관리

클러스터 구성 전략

class KafkaConnectClusterManager:
    def __init__(self):
        self.cluster_config = {}
    
    def design_cluster_architecture(self, requirements):
        """대규모 CDC 요구사항에 맞는 클러스터 아키텍처 설계"""
        
        architecture = {
            "cluster_size": {
                "workers": self._calculate_worker_count(requirements),
                "connectors": requirements["estimated_connectors"],
                "tasks_per_connector": requirements["max_concurrency"]
            },
            "resource_allocation": {
                "worker_specs": {
                    "cpu": "4 cores",
                    "memory": "8GB",
                    "heap": "4GB",
                    "disk": "100GB SSD"
                },
                "jvm_settings": {
                    "Xmx": "4g",
                    "Xms": "4g",
                    "GC_algorithm": "G1GC",
                    "GC_tuning": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200"
                }
            },
            "network_topology": {
                "worker_distribution": "multi_zone",
                "replication_strategy": "rack_aware",
                "load_balancing": "round_robin"
            }
        }
        
        return architecture
    
    def _calculate_worker_count(self, requirements):
        """워커 수 계산"""
        
        # 기본 계산 공식
        base_workers = max(3, requirements["estimated_connectors"] // 10)
        
        # 고가용성을 위한 최소 워커 수
        ha_workers = max(base_workers, 3)
        
        # 부하 분산을 위한 여유분
        buffer_workers = int(ha_workers * 0.3)
        
        return ha_workers + buffer_workers
    
    def configure_distributed_mode(self, cluster_config):
        """분산 모드 설정"""
        
        distributed_config = {
            "bootstrap.servers": cluster_config["kafka_bootstrap_servers"],
            "group.id": cluster_config["connect_cluster_id"],
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": "false",
            "value.converter.schemas.enable": "false",
            "offset.storage.topic": f"{cluster_config['connect_cluster_id']}-offsets",
            "offset.storage.replication.factor": "3",
            "offset.storage.partitions": "25",
            "config.storage.topic": f"{cluster_config['connect_cluster_id']}-configs",
            "config.storage.replication.factor": "3",
            "status.storage.topic": f"{cluster_config['connect_cluster_id']}-status",
            "status.storage.replication.factor": "3",
            "status.storage.partitions": "5",
            "rest.host.name": "0.0.0.0",
            "rest.port": "8083",
            "plugin.path": "/opt/connectors",
            "connector.client.config.override.policy": "All",
            "producer.security.protocol": "SASL_SSL",
            "consumer.security.protocol": "SASL_SSL"
        }
        
        return distributed_config

고가용성과 장애 복구

장애 시나리오 복구 전략 자동화 수준 복구 시간
Worker 노드 장애 • 자동 태스크 재배치
• 헬스체크 기반 감지
완전 자동 30-60초
Connector 장애 • 백오프 재시도
• 장애 격리
자동 1-5분
Kafka 브로커 장애 • 리더 선출
• 파티션 재할당
완전 자동 10-30초
네트워크 분할 • 쿠럼 기반 합의
• 장애 복구
자동 1-2분

🔧 커스텀 커넥터 개발

커스텀 커넥터 아키텍처

커스텀 커넥터는 특정 데이터 소스나 싱크에 최적화된 전용 커넥터를 개발하는 것입니다.

Source Connector 개발

class CustomSourceConnector(SourceConnector):
    """커스텀 소스 커넥터 예제"""
    
    def __init__(self):
        self.config = {}
        self.version = "1.0.0"
    
    def start(self, props):
        """커넥터 시작"""
        
        self.config = {
            "database.hostname": props.get("database.hostname"),
            "database.port": props.get("database.port"),
            "database.user": props.get("database.user"),
            "database.password": props.get("database.password"),
            "database.name": props.get("database.name"),
            "batch.size": int(props.get("batch.size", "1000")),
            "poll.interval.ms": int(props.get("poll.interval.ms", "1000")),
            "topic.prefix": props.get("topic.prefix", "custom"),
            "table.whitelist": props.get("table.whitelist", "").split(",")
        }
        
        # 데이터베이스 연결 테스트
        self._validate_connection()
    
    def task_configs(self, maxTasks):
        """태스크 설정 생성"""
        
        # 테이블을 태스크 수에 따라 분할
        tables = self.config["table.whitelist"]
        tasks_config = []
        
        for i in range(maxTasks):
            task_config = self.config.copy()
            task_config["task.id"] = str(i)
            
            # 테이블 분할 로직
            if tables:
                start_idx = i * len(tables) // maxTasks
                end_idx = (i + 1) * len(tables) // maxTasks
                task_config["table.whitelist"] = tables[start_idx:end_idx]
            
            tasks_config.append(task_config)
        
        return tasks_config
    
    def stop(self):
        """커넥터 중지"""
        pass
    
    def config(self):
        """설정 스키마 정의"""
        
        return ConfigDef() \
            .define("database.hostname", Type.STRING, ConfigDef.Importance.HIGH,
                   "데이터베이스 호스트명") \
            .define("database.port", Type.INT, 3306, ConfigDef.Importance.HIGH,
                   "데이터베이스 포트") \
            .define("database.user", Type.STRING, ConfigDef.Importance.HIGH,
                   "데이터베이스 사용자명") \
            .define("database.password", Type.PASSWORD, ConfigDef.Importance.HIGH,
                   "데이터베이스 비밀번호") \
            .define("database.name", Type.STRING, ConfigDef.Importance.HIGH,
                   "데이터베이스 이름") \
            .define("batch.size", Type.INT, 1000, ConfigDef.Importance.MEDIUM,
                   "배치 크기") \
            .define("poll.interval.ms", Type.INT, 1000, ConfigDef.Importance.MEDIUM,
                   "폴링 간격 (밀리초)") \
            .define("topic.prefix", Type.STRING, "custom", ConfigDef.Importance.HIGH,
                   "토픽 접두사") \
            .define("table.whitelist", Type.STRING, ConfigDef.Importance.HIGH,
                   "테이블 화이트리스트 (쉼표 구분)")
    
    def _validate_connection(self):
        """데이터베이스 연결 검증"""
        
        try:
            # 실제 연결 검증 로직
            connection = self._create_connection()
            connection.close()
            self.log.info("데이터베이스 연결 검증 성공")
        except Exception as e:
            raise ConnectException(f"데이터베이스 연결 실패: {e}")
    
    def _create_connection(self):
        """데이터베이스 연결 생성"""
        # 실제 연결 생성 로직
        pass


class CustomSourceTask(SourceTask):
    """커스텀 소스 태스크"""
    
    def __init__(self):
        self.config = {}
        self.connection = None
        self.last_offset = {}
    
    def start(self, props):
        """태스크 시작"""
        
        self.config = props
        self.connection = self._create_connection()
        
        # 오프셋 복원
        self._restore_offset()
    
    def poll(self):
        """데이터 폴링"""
        
        try:
            # 배치 크기만큼 데이터 조회
            records = self._fetch_records()
            
            if not records:
                # 데이터가 없으면 짧은 대기
                time.sleep(self.config.get("poll.interval.ms", 1000) / 1000.0)
                return []
            
            # SourceRecord로 변환
            source_records = []
            for record in records:
                source_record = self._convert_to_source_record(record)
                source_records.append(source_record)
                
                # 오프셋 업데이트
                self._update_offset(record)
            
            return source_records
            
        except Exception as e:
            self.log.error(f"데이터 폴링 중 오류 발생: {e}")
            raise
    
    def stop(self):
        """태스크 중지"""
        
        if self.connection:
            self.connection.close()
    
    def _fetch_records(self):
        """실제 데이터 조회"""
        
        # 배치 크기
        batch_size = int(self.config.get("batch.size", 1000))
        
        # 테이블 목록
        tables = self.config.get("table.whitelist", "").split(",")
        
        records = []
        for table in tables:
            if table.strip():
                table_records = self._fetch_table_records(table.strip(), batch_size)
                records.extend(table_records)
        
        return records
    
    def _convert_to_source_record(self, record):
        """레코드를 SourceRecord로 변환"""
        
        topic = f"{self.config['topic.prefix']}.{record['table']}"
        
        # 스키마 정의
        key_schema = SchemaBuilder.struct() \
            .field("id", Schema.INT64_SCHEMA) \
            .build()
        
        value_schema = SchemaBuilder.struct() \
            .field("id", Schema.INT64_SCHEMA) \
            .field("name", Schema.STRING_SCHEMA) \
            .field("created_at", Schema.STRING_SCHEMA) \
            .build()
        
        # 키와 값 생성
        key = Struct(key_schema).put("id", record["id"])
        value = Struct(value_schema) \
            .put("id", record["id"]) \
            .put("name", record["name"]) \
            .put("created_at", record["created_at"])
        
        # 오프셋 정보
        offset = {
            "table": record["table"],
            "id": record["id"],
            "timestamp": record["timestamp"]
        }
        
        return SourceRecord(
            partition=None,
            offset=offset,
            topic=topic,
            key_schema=key_schema,
            key=key,
            value_schema=value_schema,
            value=value,
            timestamp=record["timestamp"]
        )
    
    def _update_offset(self, record):
        """오프셋 업데이트"""
        
        self.last_offset[record["table"]] = {
            "id": record["id"],
            "timestamp": record["timestamp"]
        }
    
    def _restore_offset(self):
        """오프셋 복원"""
        
        # Kafka Connect에서 자동으로 오프셋 관리
        pass

Sink Connector 개발

class CustomSinkConnector(SinkConnector):
    """커스텀 싱크 커넥터"""
    
    def __init__(self):
        self.config = {}
    
    def start(self, props):
        """커넥터 시작"""
        
        self.config = {
            "target.hostname": props.get("target.hostname"),
            "target.port": props.get("target.port"),
            "target.database": props.get("target.database"),
            "target.username": props.get("target.username"),
            "target.password": props.get("target.password"),
            "batch.size": int(props.get("batch.size", "1000")),
            "flush.timeout.ms": int(props.get("flush.timeout.ms", "5000")),
            "auto.create": props.get("auto.create", "true").lower() == "true",
            "delete.enabled": props.get("delete.enabled", "false").lower() == "true"
        }
    
    def task_configs(self, maxTasks):
        """태스크 설정 생성"""
        
        tasks_config = []
        for i in range(maxTasks):
            task_config = self.config.copy()
            task_config["task.id"] = str(i)
            tasks_config.append(task_config)
        
        return tasks_config
    
    def stop(self):
        """커넥터 중지"""
        pass
    
    def config(self):
        """설정 스키마"""
        
        return ConfigDef() \
            .define("target.hostname", Type.STRING, ConfigDef.Importance.HIGH,
                   "대상 시스템 호스트명") \
            .define("target.port", Type.INT, ConfigDef.Importance.HIGH,
                   "대상 시스템 포트") \
            .define("target.database", Type.STRING, ConfigDef.Importance.HIGH,
                   "대상 데이터베이스") \
            .define("target.username", Type.STRING, ConfigDef.Importance.HIGH,
                   "대상 시스템 사용자명") \
            .define("target.password", Type.PASSWORD, ConfigDef.Importance.HIGH,
                   "대상 시스템 비밀번호") \
            .define("batch.size", Type.INT, 1000, ConfigDef.Importance.MEDIUM,
                   "배치 크기") \
            .define("flush.timeout.ms", Type.INT, 5000, ConfigDef.Importance.MEDIUM,
                   "플러시 타임아웃") \
            .define("auto.create", Type.BOOLEAN, True, ConfigDef.Importance.LOW,
                   "자동 테이블 생성") \
            .define("delete.enabled", Type.BOOLEAN, False, ConfigDef.Importance.MEDIUM,
                   "삭제 작업 활성화")


class CustomSinkTask(SinkTask):
    """커스텀 싱크 태스크"""
    
    def __init__(self):
        self.config = {}
        self.connection = None
        self.batch = []
        self.last_flush_time = time.time()
    
    def start(self, props):
        """태스크 시작"""
        
        self.config = props
        self.connection = self._create_connection()
        
        # 테이블 스키마 캐시 초기화
        self._initialize_table_schemas()
    
    def put(self, records):
        """레코드 처리"""
        
        for record in records:
            self.batch.append(record)
            
            # 배치 크기 또는 타임아웃에 도달하면 플러시
            if (len(self.batch) >= int(self.config["batch.size"]) or
                time.time() - self.last_flush_time > int(self.config["flush.timeout.ms"]) / 1000.0):
                self._flush_batch()
    
    def flush(self, offsets):
        """수동 플러시"""
        
        self._flush_batch()
    
    def stop(self):
        """태스크 중지"""
        
        # 남은 배치 처리
        if self.batch:
            self._flush_batch()
        
        if self.connection:
            self.connection.close()
    
    def _flush_batch(self):
        """배치 플러시"""
        
        if not self.batch:
            return
        
        try:
            # 배치를 테이블별로 그룹화
            grouped_records = self._group_by_table(self.batch)
            
            # 각 테이블별로 처리
            for table, records in grouped_records.items():
                self._process_table_batch(table, records)
            
            # 배치 초기화
            self.batch = []
            self.last_flush_time = time.time()
            
        except Exception as e:
            self.log.error(f"배치 플러시 중 오류 발생: {e}")
            raise
    
    def _process_table_batch(self, table, records):
        """테이블별 배치 처리"""
        
        # INSERT, UPDATE, DELETE로 분류
        inserts = [r for r in records if r.value() is not None]
        deletes = [r for r in records if r.value() is None]
        
        # INSERT/UPDATE 처리
        if inserts:
            self._upsert_records(table, inserts)
        
        # DELETE 처리
        if deletes and self.config.get("delete.enabled", "false").lower() == "true":
            self._delete_records(table, deletes)
    
    def _upsert_records(self, table, records):
        """레코드 업서트"""
        
        # 실제 업서트 로직 구현
        pass
    
    def _delete_records(self, table, records):
        """레코드 삭제"""
        
        # 실제 삭제 로직 구현
        pass

커스텀 Transform 개발

class CustomTransform(Transform):
    """커스텀 Single Message Transform"""
    
    def __init__(self):
        self.config = {}
    
    def configure(self, configs):
        """설정 구성"""
        
        self.config = {
            "field.mapping": configs.get("field.mapping", ""),
            "data.type": configs.get("data.type", "json"),
            "validation.enabled": configs.get("validation.enabled", "true").lower() == "true"
        }
        
        # 필드 매핑 파싱
        if self.config["field.mapping"]:
            self.field_mapping = self._parse_field_mapping(self.config["field.mapping"])
        else:
            self.field_mapping = {}
    
    def apply(self, record):
        """레코드 변환 적용"""
        
        if record is None:
            return None
        
        try:
            # 값 추출
            value = record.value()
            
            if value is None:
                return record
            
            # 데이터 타입별 처리
            if self.config["data.type"] == "json":
                transformed_value = self._transform_json(value)
            elif self.config["data.type"] == "avro":
                transformed_value = self._transform_avro(value)
            else:
                transformed_value = value
            
            # 검증
            if self.config["validation.enabled"]:
                self._validate_record(transformed_value)
            
            # 새 레코드 생성
            return record.new_record(
                topic=record.topic(),
                partition=record.kafkaPartition(),
                key_schema=record.keySchema(),
                key=record.key(),
                value_schema=record.valueSchema(),
                value=transformed_value,
                timestamp=record.timestamp()
            )
            
        except Exception as e:
            self.log.error(f"레코드 변환 중 오류 발생: {e}")
            return record  # 원본 레코드 반환
    
    def _transform_json(self, value):
        """JSON 데이터 변환"""
        
        if isinstance(value, dict):
            transformed = value.copy()
            
            # 필드 매핑 적용
            for old_field, new_field in self.field_mapping.items():
                if old_field in transformed:
                    transformed[new_field] = transformed.pop(old_field)
            
            # 추가 변환 로직
            transformed = self._apply_business_rules(transformed)
            
            return transformed
        
        return value
    
    def _transform_avro(self, value):
        """Avro 데이터 변환"""
        
        # Avro 스키마 변환 로직
        return value
    
    def _apply_business_rules(self, data):
        """비즈니스 규칙 적용"""
        
        # 예: 타임스탬프 포맷 변환
        if "created_at" in data:
            data["created_at"] = self._format_timestamp(data["created_at"])
        
        # 예: 데이터 정규화
        if "email" in data:
            data["email"] = data["email"].lower().strip()
        
        return data
    
    def _validate_record(self, value):
        """레코드 검증"""
        
        if not isinstance(value, dict):
            return
        
        # 필수 필드 검증
        required_fields = ["id", "name"]
        for field in required_fields:
            if field not in value:
                raise ValueError(f"필수 필드 '{field}'가 누락되었습니다")
    
    def _parse_field_mapping(self, mapping_str):
        """필드 매핑 파싱"""
        
        mapping = {}
        for pair in mapping_str.split(","):
            if ":" in pair:
                old_field, new_field = pair.split(":", 1)
                mapping[old_field.strip()] = new_field.strip()
        
        return mapping
    
    def _format_timestamp(self, timestamp):
        """타임스탬프 포맷팅"""
        
        # 실제 포맷팅 로직
        return timestamp

🚀 대규모 CDC 파이프라인 운영

파이프라인 아키텍처 설계

대규모 CDC 파이프라인은 수백 개의 테이블과 수십 개의 시스템을 동시에 처리해야 합니다.

파이프라인 구성 요소

구성 요소 역할 확장성 전략 모니터링 포인트
Source Connectors 데이터 소스에서 변경사항 캡처 • 테이블별 커넥터 분리
• 파티션 기반 병렬화
• 지연시간
• 처리량
• 오류율
Transform Layer 데이터 변환 및 정제 • 스트림 기반 변환
• 병렬 처리
• 변환 지연시간
• 데이터 품질
Sink Connectors 대상 시스템으로 데이터 전송 • 대상별 커넥터 분리
• 배치 최적화
• 전송 지연시간
• 성공률
Schema Registry 스키마 관리 및 호환성 • 분산 캐싱
• 버전 관리
• 스키마 변경
• 호환성 검사

파이프라인 오케스트레이션

class CDCPipelineOrchestrator:
    def __init__(self):
        self.pipeline_configs = {}
        self.connector_manager = ConnectorManager()
        self.monitoring_system = MonitoringSystem()
    
    def deploy_large_scale_pipeline(self, pipeline_spec):
        """대규모 CDC 파이프라인 배포"""
        
        deployment_plan = {
            "phase_1": self._deploy_source_connectors(pipeline_spec["sources"]),
            "phase_2": self._configure_transformations(pipeline_spec["transforms"]),
            "phase_3": self._deploy_sink_connectors(pipeline_spec["sinks"]),
            "phase_4": self._setup_monitoring(pipeline_spec["monitoring"])
        }
        
        return deployment_plan
    
    def _deploy_source_connectors(self, sources):
        """소스 커넥터 배포"""
        
        source_deployment = []
        
        for source in sources:
            # 데이터베이스별 커넥터 설정
            connector_config = {
                "name": f"source-{source['database']}-{source['schema']}",
                "connector.class": self._get_connector_class(source["type"]),
                "tasks.max": source.get("tasks_max", 4),
                "database.hostname": source["hostname"],
                "database.port": source["port"],
                "database.user": source["username"],
                "database.password": source["password"],
                "database.server.id": source["server_id"],
                "topic.prefix": f"{source['database']}.{source['schema']}",
                "table.include.list": ",".join(source["tables"]),
                "transforms": "unwrap,route,addTimestamp",
                "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
                "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
                "transforms.route.regex": f"{source['database']}\\.{source['schema']}\\.([^.]+)",
                "transforms.route.replacement": f"{source['database']}.$1",
                "max.batch.size": source.get("batch_size", 4096),
                "poll.interval.ms": source.get("poll_interval", 100)
            }
            
            # 커넥터 배포
            result = self.connector_manager.deploy_connector(connector_config)
            source_deployment.append({
                "connector_name": connector_config["name"],
                "status": result["status"],
                "tables": source["tables"]
            })
        
        return source_deployment
    
    def _configure_transformations(self, transforms):
        """변환 설정"""
        
        transform_configs = []
        
        for transform in transforms:
            config = {
                "name": f"transform-{transform['name']}",
                "transforms": ",".join(transform["steps"]),
                "topics": transform["input_topics"],
                "output_topic": transform["output_topic"]
            }
            
            # 각 변환 단계별 설정
            for i, step in enumerate(transform["steps"]):
                step_config = transform["step_configs"][i]
                config.update({
                    f"transforms.{step}.type": step_config["class"],
                    f"transforms.{step}.field.mapping": step_config.get("field_mapping", ""),
                    f"transforms.{step}.validation.enabled": step_config.get("validation", "true")
                })
            
            transform_configs.append(config)
        
        return transform_configs
    
    def _deploy_sink_connectors(self, sinks):
        """싱크 커넥터 배포"""
        
        sink_deployment = []
        
        for sink in sinks:
            sink_config = {
                "name": f"sink-{sink['target']}-{sink['database']}",
                "connector.class": self._get_sink_connector_class(sink["type"]),
                "tasks.max": sink.get("tasks_max", 4),
                "topics": ",".join(sink["topics"]),
                "batch.size": sink.get("batch_size", 1000),
                "flush.timeout.ms": sink.get("flush_timeout", 5000)
            }
            
            # 대상 시스템별 설정 추가
            if sink["type"] == "elasticsearch":
                sink_config.update({
                    "connection.url": sink["connection_url"],
                    "type.name": sink.get("type_name", "_doc"),
                    "key.ignore": "false",
                    "schema.ignore": "true"
                })
            elif sink["type"] == "postgresql":
                sink_config.update({
                    "connection.url": sink["connection_url"],
                    "auto.create": "true",
                    "delete.enabled": "false"
                })
            elif sink["type"] == "s3":
                sink_config.update({
                    "s3.bucket.name": sink["bucket"],
                    "s3.region": sink["region"],
                    "flush.size": sink.get("flush_size", 10000),
                    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat"
                })
            
            # 커넥터 배포
            result = self.connector_manager.deploy_connector(sink_config)
            sink_deployment.append({
                "connector_name": sink_config["name"],
                "status": result["status"],
                "target": sink["target"]
            })
        
        return sink_deployment
    
    def _setup_monitoring(self, monitoring_config):
        """모니터링 설정"""
        
        monitoring_setup = {
            "metrics": self.monitoring_system.setup_metrics(monitoring_config),
            "alerts": self.monitoring_system.setup_alerts(monitoring_config),
            "dashboards": self.monitoring_system.setup_dashboards(monitoring_config)
        }
        
        return monitoring_setup

파이프라인 운영 전략

운영 영역 전략 구현 방법 모니터링
부하 분산 • 테이블별 파티션 분산
• 워커 노드 균등 분배
• 파티션 수 최적화
• 워커 수 동적 조정
• 파티션별 처리량
• 워커별 리소스 사용률
장애 격리 • 커넥터별 독립 실행
• 장애 전파 방지
• 서킷 브레이커 패턴
• 백오프 재시도
• 장애 발생률
• 복구 시간
확장성 • 수평 확장 우선
• 자동 스케일링
• Kubernetes HPA
• 메트릭 기반 스케일링
• 처리 용량
• 확장 이벤트
데이터 일관성 • Exactly-once 처리
• 순서 보장
• 트랜잭션 기반 처리
• 오프셋 관리
• 데이터 일관성 검증
• 중복 데이터 감지

⚡ 성능 최적화와 병목 해결

성능 최적화 전략

CDC 파이프라인의 성능을 최적화하기 위한 종합적인 접근 방법입니다.

병목 지점 분석

병목 지점 원인 해결 방법 모니터링 지표
데이터베이스 읽기 • 인덱스 부족
• 락 경합
• 네트워크 지연
• 읽기 전용 복제본 활용
• 배치 크기 최적화
• 연결 풀 튜닝
• 읽기 지연시간
• 연결 풀 사용률
• 쿼리 실행 시간
Kafka 처리 • 파티션 수 부족
• 브로커 리소스 부족
• 압축 설정
• 파티션 수 증가
• 브로커 스케일링
• 압축 알고리즘 최적화
• 토픽별 처리량
• 브로커 CPU/메모리
• 압축 비율
커넥터 처리 • 배치 크기 부적절
• 변환 로직 비효율
• 메모리 부족
• 배치 크기 동적 조정
• 변환 로직 최적화
• JVM 튜닝
• 커넥터 처리량
• GC 시간
• 메모리 사용률
네트워크 전송 • 대역폭 부족
• 네트워크 지연
• 압축 효율
• 네트워크 최적화
• 압축 설정 조정
• 배치 전송
• 네트워크 처리량
• 지연시간
• 패킷 손실률

성능 튜닝 구현

class PerformanceOptimizer:
    def __init__(self):
        self.optimization_configs = {}
    
    def optimize_connector_performance(self, connector_name, current_metrics):
        """커넥터 성능 최적화"""
        
        optimizations = []
        
        # 처리량 기반 배치 크기 조정
        if current_metrics["throughput"] < 1000:
            batch_size = min(current_metrics["batch_size"] * 2, 8192)
            optimizations.append({
                "parameter": "max.batch.size",
                "current_value": current_metrics["batch_size"],
                "new_value": batch_size,
                "reason": "처리량 향상을 위한 배치 크기 증가"
            })
        
        # 지연시간 기반 폴링 간격 조정
        if current_metrics["latency"] > 5000:
            poll_interval = max(current_metrics["poll_interval"] // 2, 50)
            optimizations.append({
                "parameter": "poll.interval.ms",
                "current_value": current_metrics["poll_interval"],
                "new_value": poll_interval,
                "reason": "지연시간 감소를 위한 폴링 간격 단축"
            })
        
        # 메모리 사용률 기반 큐 크기 조정
        if current_metrics["memory_usage"] > 0.8:
            queue_size = max(current_metrics["queue_size"] // 2, 1024)
            optimizations.append({
                "parameter": "max.queue.size",
                "current_value": current_metrics["queue_size"],
                "new_value": queue_size,
                "reason": "메모리 사용률 감소를 위한 큐 크기 축소"
            })
        
        return optimizations
    
    def optimize_kafka_cluster(self, cluster_metrics):
        """Kafka 클러스터 성능 최적화"""
        
        kafka_optimizations = []
        
        # 브로커별 CPU 사용률 확인
        for broker_id, metrics in cluster_metrics["brokers"].items():
            if metrics["cpu_usage"] > 0.8:
                kafka_optimizations.append({
                    "broker_id": broker_id,
                    "optimization": "CPU 사용률이 높음 - 파티션 재분배 또는 브로커 추가 필요",
                    "current_cpu": metrics["cpu_usage"],
                    "recommended_action": "파티션 리밸런싱 또는 브로커 스케일링"
                })
            
            if metrics["memory_usage"] > 0.9:
                kafka_optimizations.append({
                    "broker_id": broker_id,
                    "optimization": "메모리 사용률이 높음 - JVM 힙 크기 조정 필요",
                    "current_memory": metrics["memory_usage"],
                    "recommended_action": "JVM 힙 크기 증가 또는 브로커 추가"
                })
        
        # 토픽별 처리량 최적화
        for topic, metrics in cluster_metrics["topics"].items():
            if metrics["throughput"] < 10000:
                kafka_optimizations.append({
                    "topic": topic,
                    "optimization": "처리량이 낮음 - 파티션 수 증가 필요",
                    "current_partitions": metrics["partitions"],
                    "recommended_partitions": min(metrics["partitions"] * 2, 50)
                })
        
        return kafka_optimizations
    
    def generate_performance_report(self, system_metrics):
        """성능 리포트 생성"""
        
        report = {
            "summary": {
                "overall_health": self._calculate_overall_health(system_metrics),
                "total_throughput": sum(m["throughput"] for m in system_metrics["connectors"]),
                "average_latency": sum(m["latency"] for m in system_metrics["connectors"]) / len(system_metrics["connectors"]),
                "error_rate": sum(m["error_rate"] for m in system_metrics["connectors"]) / len(system_metrics["connectors"])
            },
            "bottlenecks": self._identify_bottlenecks(system_metrics),
            "recommendations": self._generate_recommendations(system_metrics),
            "optimization_plan": self._create_optimization_plan(system_metrics)
        }
        
        return report
    
    def _calculate_overall_health(self, metrics):
        """전체 시스템 건강도 계산"""
        
        health_score = 100
        
        # 처리량 기준 감점
        for connector in metrics["connectors"]:
            if connector["throughput"] < 1000:
                health_score -= 10
        
        # 지연시간 기준 감점
        for connector in metrics["connectors"]:
            if connector["latency"] > 5000:
                health_score -= 15
        
        # 오류율 기준 감점
        for connector in metrics["connectors"]:
            if connector["error_rate"] > 0.01:
                health_score -= 20
        
        return max(0, health_score)
    
    def _identify_bottlenecks(self, metrics):
        """병목 지점 식별"""
        
        bottlenecks = []
        
        # 데이터베이스 병목
        db_metrics = metrics.get("database", {})
        if db_metrics.get("connection_pool_usage", 0) > 0.9:
            bottlenecks.append({
                "type": "database",
                "description": "데이터베이스 연결 풀 사용률이 높음",
                "severity": "high",
                "current_value": db_metrics["connection_pool_usage"]
            })
        
        # Kafka 병목
        kafka_metrics = metrics.get("kafka", {})
        for topic, topic_metrics in kafka_metrics.get("topics", {}).items():
            if topic_metrics.get("consumer_lag", 0) > 10000:
                bottlenecks.append({
                    "type": "kafka",
                    "description": f"토픽 {topic}의 컨슈머 지연이 높음",
                    "severity": "medium",
                    "current_value": topic_metrics["consumer_lag"]
                })
        
        return bottlenecks

자동 스케일링 구현

class AutoScaler:
    def __init__(self):
        self.scaling_configs = {}
        self.metrics_collector = MetricsCollector()
    
    def setup_auto_scaling(self, scaling_config):
        """자동 스케일링 설정"""
        
        auto_scaling_config = {
            "connectors": {
                "min_tasks": scaling_config.get("min_tasks", 1),
                "max_tasks": scaling_config.get("max_tasks", 10),
                "scale_up_threshold": scaling_config.get("scale_up_threshold", 0.8),
                "scale_down_threshold": scaling_config.get("scale_down_threshold", 0.3),
                "scale_up_cooldown": scaling_config.get("scale_up_cooldown", 300),
                "scale_down_cooldown": scaling_config.get("scale_down_cooldown", 600)
            },
            "workers": {
                "min_workers": scaling_config.get("min_workers", 2),
                "max_workers": scaling_config.get("max_workers", 20),
                "cpu_threshold": scaling_config.get("cpu_threshold", 0.7),
                "memory_threshold": scaling_config.get("memory_threshold", 0.8)
            }
        }
        
        return auto_scaling_config
    
    def evaluate_scaling_needs(self, connector_name):
        """스케일링 필요성 평가"""
        
        current_metrics = self.metrics_collector.get_connector_metrics(connector_name)
        scaling_config = self.scaling_configs.get(connector_name, {})
        
        scaling_decision = {
            "connector": connector_name,
            "action": "none",
            "reason": "",
            "current_tasks": current_metrics["task_count"],
            "recommended_tasks": current_metrics["task_count"]
        }
        
        # CPU 사용률 기반 스케일링
        cpu_usage = current_metrics.get("cpu_usage", 0)
        if cpu_usage > scaling_config.get("scale_up_threshold", 0.8):
            if current_metrics["task_count"] < scaling_config.get("max_tasks", 10):
                scaling_decision.update({
                    "action": "scale_up",
                    "reason": f"CPU 사용률이 {cpu_usage:.2%}로 높음",
                    "recommended_tasks": min(current_metrics["task_count"] + 2, scaling_config.get("max_tasks", 10))
                })
        
        elif cpu_usage < scaling_config.get("scale_down_threshold", 0.3):
            if current_metrics["task_count"] > scaling_config.get("min_tasks", 1):
                scaling_decision.update({
                    "action": "scale_down",
                    "reason": f"CPU 사용률이 {cpu_usage:.2%}로 낮음",
                    "recommended_tasks": max(current_metrics["task_count"] - 1, scaling_config.get("min_tasks", 1))
                })
        
        # 처리량 기반 스케일링
        throughput = current_metrics.get("throughput", 0)
        if throughput > 10000 and current_metrics["task_count"] < scaling_config.get("max_tasks", 10):
            scaling_decision.update({
                "action": "scale_up",
                "reason": f"처리량이 {throughput}로 높음",
                "recommended_tasks": min(current_metrics["task_count"] + 1, scaling_config.get("max_tasks", 10))
            })
        
        return scaling_decision
    
    def execute_scaling(self, scaling_decision):
        """스케일링 실행"""
        
        if scaling_decision["action"] == "scale_up":
            return self._scale_up_connector(
                scaling_decision["connector"],
                scaling_decision["recommended_tasks"]
            )
        elif scaling_decision["action"] == "scale_down":
            return self._scale_down_connector(
                scaling_decision["connector"],
                scaling_decision["recommended_tasks"]
            )
        
        return {"status": "no_action_needed"}
    
    def _scale_up_connector(self, connector_name, new_task_count):
        """커넥터 스케일 업"""
        
        try:
            # 커넥터 설정 업데이트
            connector_config = self.connector_manager.get_connector_config(connector_name)
            connector_config["tasks.max"] = new_task_count
            
            # 커넥터 재시작
            result = self.connector_manager.update_connector(connector_name, connector_config)
            
            return {
                "status": "success",
                "action": "scale_up",
                "old_task_count": connector_config.get("tasks.max", 1),
                "new_task_count": new_task_count,
                "message": f"커넥터 {connector_name}{new_task_count}개 태스크로 스케일 업됨"
            }
            
        except Exception as e:
            return {
                "status": "error",
                "action": "scale_up",
                "error": str(e),
                "message": f"커넥터 {connector_name} 스케일 업 실패"
            }
    
    def _scale_down_connector(self, connector_name, new_task_count):
        """커넥터 스케일 다운"""
        
        try:
            # 커넥터 설정 업데이트
            connector_config = self.connector_manager.get_connector_config(connector_name)
            connector_config["tasks.max"] = new_task_count
            
            # 커넥터 재시작
            result = self.connector_manager.update_connector(connector_name, connector_config)
            
            return {
                "status": "success",
                "action": "scale_down",
                "old_task_count": connector_config.get("tasks.max", 1),
                "new_task_count": new_task_count,
                "message": f"커넥터 {connector_name}{new_task_count}개 태스크로 스케일 다운됨"
            }
            
        except Exception as e:
            return {
                "status": "error",
                "action": "scale_down",
                "error": str(e),
                "message": f"커넥터 {connector_name} 스케일 다운 실패"
            }

🔒 데이터 일관성 보장과 검증

데이터 일관성 전략

CDC 파이프라인에서 데이터 일관성을 보장하는 것은 매우 중요합니다.

일관성 보장 방법

방법 설명 구현 복잡도 성능 영향 사용 사례
Exactly-once Semantics 각 메시지를 정확히 한 번만 처리 높음 중간 금융 거래, 주문 처리
At-least-once Semantics 메시지를 최소 한 번은 처리 낮음 낮음 로그 수집, 메트릭 수집
At-most-once Semantics 메시지를 최대 한 번만 처리 중간 낮음 알림, 이벤트 스트리밍
Transactional Processing 트랜잭션 단위로 일관성 보장 높음 높음 계정 이체, 재고 관리

📊 모니터링과 장애 복구 전략

종합 모니터링 시스템

CDC 파이프라인의 건강한 운영을 위한 포괄적인 모니터링 시스템입니다.

모니터링 계층

계층 모니터링 대상 주요 메트릭 알림 임계값
인프라 계층 • Kubernetes 클러스터
• Kafka 브로커
• 데이터베이스
• CPU/메모리 사용률
• 디스크 I/O
• 네트워크 처리량
• CPU > 80%
• 메모리 > 90%
• 디스크 > 85%
애플리케이션 계층 • Kafka Connect 워커
• 커넥터
• 태스크
• JVM 메모리
• GC 시간
• 스레드 수
• GC 시간 > 20%
• 스레드 수 > 1000
• 힙 사용률 > 80%
데이터 계층 • 데이터 처리량
• 지연시간
• 오류율
• 레코드/초
• P99 지연시간
• 실패율
• 처리량 < 1000/s
• 지연시간 > 5초
• 오류율 > 1%
비즈니스 계층 • 데이터 품질
• 일관성
• 완전성
• 데이터 검증 실패
• 체크섬 불일치
• 누락 레코드
• 검증 실패 > 0
• 체크섬 불일치 > 0
• 누락률 > 0.1%

🚀 실무 프로젝트: 엔터프라이즈 CDC 운영 시스템

프로젝트 개요

대규모 전자상거래 플랫폼을 위한 엔터프라이즈급 CDC 운영 시스템을 구축합니다.

시스템 아키텍처

구성 요소 기술 스택 용량 고가용성
Source Systems • MySQL 8.0 (주문)
• PostgreSQL 13 (사용자)
• MongoDB 5.0 (제품)
• 100만+ 레코드/일
• 50+ 테이블
• 10+ 데이터베이스
• 읽기 전용 복제본
• 자동 장애 복구
Kafka Cluster • Apache Kafka 3.0
• Schema Registry
• Kafka Connect
• 3 브로커
• 100+ 토픽
• 1000+ 파티션
• 3중 복제
• 자동 리밸런싱
Target Systems • Elasticsearch 8.0
• Redis 7.0
• S3 Data Lake
• Snowflake
• 3 노드 ES 클러스터
• 6 노드 Redis 클러스터
• 무제한 S3 저장소
• 클러스터 모드
• 자동 백업

📚 학습 요약

이번 Part에서 학습한 내용

  1. Kafka Connect 고급 아키텍처
    • 분산 모드 구성과 클러스터 설계
    • 워커 노드 관리와 확장성 전략
    • 고가용성과 장애 복구 메커니즘
  2. 커스텀 커넥터 개발
    • Source Connector와 Sink Connector 구현
    • 커스텀 Transform 개발
    • 커넥터 테스트와 배포 전략
  3. 대규모 CDC 파이프라인 운영
    • 파이프라인 오케스트레이션
    • 부하 분산과 장애 격리
    • 확장성과 데이터 일관성 보장
  4. 성능 최적화와 병목 해결
    • 성능 병목 지점 분석
    • 동적 성능 튜닝
    • 자동 스케일링 구현
  5. 데이터 일관성 보장과 검증
    • 일관성 보장 방법론
    • 데이터 검증 시스템
    • 드리프트 감지와 이상 탐지
  6. 모니터링과 장애 복구 전략
    • 종합 모니터링 시스템
    • 재해 복구 계획
    • 자동 장애 조치
  7. 실무 프로젝트
    • 엔터프라이즈급 CDC 운영 시스템
    • 대규모 전자상거래 플랫폼 적용
    • 운영 자동화와 최적화

핵심 기술 스택

기술 역할 중요도 학습 포인트
Kafka Connect 커넥터 프레임워크 ⭐⭐⭐⭐⭐ 분산 아키텍처, 확장성
Debezium CDC 플랫폼 ⭐⭐⭐⭐⭐ 고급 설정, 성능 최적화
커스텀 커넥터 특화된 데이터 처리 ⭐⭐⭐⭐ 개발 패턴, 테스트 전략
모니터링 운영 가시성 ⭐⭐⭐⭐⭐ 메트릭 수집, 알림 설정
자동화 운영 효율성 ⭐⭐⭐⭐ 스케일링, 복구 자동화

다음 단계

이제 CDC 시리즈의 핵심 내용을 모두 학습했습니다. 다음 단계로는:

  1. 실제 프로젝트 적용: 학습한 내용을 실제 프로젝트에 적용
  2. 고급 주제 탐구: 스트림 처리, 이벤트 소싱 등 고급 주제
  3. 다른 기술 스택: Apache Pulsar, Apache Flink 등 다른 스트리밍 기술

시리즈 완료: Change Data Capture Complete Guide Series


엔터프라이즈급 실시간 데이터 파이프라인을 구축하여 현대적인 데이터 아키텍처의 핵심을 완성하세요! 🚀