Part 2: Kafka Connect and Production CDC Operations - Enterprise Real-time Data Pipeline

Advanced Kafka Connect architecture, custom connector development, large-scale CDC pipeline operation strategies, performance optimization and disaster recovery.

📋 Table of Contents

  1. Kafka Connect Advanced Architecture
  2. Custom Connector Development
  3. Large-scale CDC Pipeline Operations
  4. Performance Optimization and Bottleneck Resolution
  5. Data Consistency Assurance and Validation
  6. Monitoring and Disaster Recovery Strategies
  7. Practical Project: Enterprise CDC Operation System
  8. Learning Summary

🏗️ Kafka Connect Advanced Architecture

Kafka Connect Architecture Deep Dive

Kafka Connect is a distributed streaming platform that provides a scalable framework for data synchronization between databases and systems.

Core Components Detailed Analysis

Component Role Scalability Considerations Operational Points
Connect Workers Connector execution environment • Horizontal scaling capability
• CPU/memory-based scaling
• Resource monitoring
• Automated failure recovery
Connectors Data source/sink logic • Plugin architecture
• Independent deployment
• Version management
• Compatibility testing
Tasks Actual data processing • Partition-based parallelization
• Dynamic task allocation
• Load distribution
• Failure isolation
Transforms Data transformation logic • Chainable transformations
• Custom SMT support
• Performance optimization
• Memory management

Cluster Configuration Strategy

class KafkaConnectClusterManager:
    def __init__(self):
        self.cluster_config = {}
    
    def design_cluster_architecture(self, requirements):
        """Design cluster architecture for large-scale CDC requirements"""
        
        architecture = {
            "cluster_size": {
                "workers": self._calculate_worker_count(requirements),
                "connectors": requirements["estimated_connectors"],
                "tasks_per_connector": requirements["max_concurrency"]
            },
            "resource_allocation": {
                "worker_specs": {
                    "cpu": "4 cores",
                    "memory": "8GB",
                    "heap": "4GB",
                    "disk": "100GB SSD"
                },
                "jvm_settings": {
                    "Xmx": "4g",
                    "Xms": "4g",
                    "GC_algorithm": "G1GC",
                    "GC_tuning": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200"
                }
            },
            "network_topology": {
                "worker_distribution": "multi_zone",
                "replication_strategy": "rack_aware",
                "load_balancing": "round_robin"
            }
        }
        
        return architecture
    
    def _calculate_worker_count(self, requirements):
        """Calculate worker count"""
        
        # Basic calculation formula
        base_workers = max(3, requirements["estimated_connectors"] // 10)
        
        # Minimum workers for high availability
        ha_workers = max(base_workers, 3)
        
        # Buffer for load distribution
        buffer_workers = int(ha_workers * 0.3)
        
        return ha_workers + buffer_workers
    
    def configure_distributed_mode(self, cluster_config):
        """Configure distributed mode"""
        
        distributed_config = {
            "bootstrap.servers": cluster_config["kafka_bootstrap_servers"],
            "group.id": cluster_config["connect_cluster_id"],
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter.schemas.enable": "false",
            "value.converter.schemas.enable": "false",
            "offset.storage.topic": f"{cluster_config['connect_cluster_id']}-offsets",
            "offset.storage.replication.factor": "3",
            "offset.storage.partitions": "25",
            "config.storage.topic": f"{cluster_config['connect_cluster_id']}-configs",
            "config.storage.replication.factor": "3",
            "status.storage.topic": f"{cluster_config['connect_cluster_id']}-status",
            "status.storage.replication.factor": "3",
            "status.storage.partitions": "5",
            "rest.host.name": "0.0.0.0",
            "rest.port": "8083",
            "plugin.path": "/opt/connectors",
            "connector.client.config.override.policy": "All",
            "producer.security.protocol": "SASL_SSL",
            "consumer.security.protocol": "SASL_SSL"
        }
        
        return distributed_config

High Availability and Disaster Recovery

Failure Scenario Recovery Strategy Automation Level Recovery Time
Worker Node Failure • Automatic task reallocation
• Health check-based detection
Fully automated 30-60 seconds
Connector Failure • Backoff retry
• Failure isolation
Automated 1-5 minutes
Kafka Broker Failure • Leader election
• Partition reassignment
Fully automated 10-30 seconds
Network Partition • Quorum-based consensus
• Failure recovery
Automated 1-2 minutes

🔧 Custom Connector Development

Custom Connector Architecture

Custom connectors are specialized connectors developed for specific data sources or sinks.

Source Connector Development

class CustomSourceConnector(SourceConnector):
    """Custom source connector example"""
    
    def __init__(self):
        self.config = {}
        self.version = "1.0.0"
    
    def start(self, props):
        """Start connector"""
        
        self.config = {
            "database.hostname": props.get("database.hostname"),
            "database.port": props.get("database.port"),
            "database.user": props.get("database.user"),
            "database.password": props.get("database.password"),
            "database.name": props.get("database.name"),
            "batch.size": int(props.get("batch.size", "1000")),
            "poll.interval.ms": int(props.get("poll.interval.ms", "1000")),
            "topic.prefix": props.get("topic.prefix", "custom"),
            "table.whitelist": props.get("table.whitelist", "").split(",")
        }
        
        # Test database connection
        self._validate_connection()
    
    def task_configs(self, maxTasks):
        """Generate task configurations"""
        
        # Distribute tables across tasks
        tables = self.config["table.whitelist"]
        tasks_config = []
        
        for i in range(maxTasks):
            task_config = self.config.copy()
            task_config["task.id"] = str(i)
            
            # Table distribution logic
            if tables:
                start_idx = i * len(tables) // maxTasks
                end_idx = (i + 1) * len(tables) // maxTasks
                task_config["table.whitelist"] = tables[start_idx:end_idx]
            
            tasks_config.append(task_config)
        
        return tasks_config
    
    def stop(self):
        """Stop connector"""
        pass
    
    def config(self):
        """Define configuration schema"""
        
        return ConfigDef() \
            .define("database.hostname", Type.STRING, ConfigDef.Importance.HIGH,
                   "Database hostname") \
            .define("database.port", Type.INT, 3306, ConfigDef.Importance.HIGH,
                   "Database port") \
            .define("database.user", Type.STRING, ConfigDef.Importance.HIGH,
                   "Database username") \
            .define("database.password", Type.PASSWORD, ConfigDef.Importance.HIGH,
                   "Database password") \
            .define("database.name", Type.STRING, ConfigDef.Importance.HIGH,
                   "Database name") \
            .define("batch.size", Type.INT, 1000, ConfigDef.Importance.MEDIUM,
                   "Batch size") \
            .define("poll.interval.ms", Type.INT, 1000, ConfigDef.Importance.MEDIUM,
                   "Polling interval (milliseconds)") \
            .define("topic.prefix", Type.STRING, "custom", ConfigDef.Importance.HIGH,
                   "Topic prefix") \
            .define("table.whitelist", Type.STRING, ConfigDef.Importance.HIGH,
                   "Table whitelist (comma-separated)")
    
    def _validate_connection(self):
        """Validate database connection"""
        
        try:
            # Actual connection validation logic
            connection = self._create_connection()
            connection.close()
            self.log.info("Database connection validation successful")
        except Exception as e:
            raise ConnectException(f"Database connection failed: {e}")
    
    def _create_connection(self):
        """Create database connection"""
        # Actual connection creation logic
        pass


class CustomSourceTask(SourceTask):
    """Custom source task"""
    
    def __init__(self):
        self.config = {}
        self.connection = None
        self.last_offset = {}
    
    def start(self, props):
        """Start task"""
        
        self.config = props
        self.connection = self._create_connection()
        
        # Restore offset
        self._restore_offset()
    
    def poll(self):
        """Poll data"""
        
        try:
            # Fetch data in batch size
            records = self._fetch_records()
            
            if not records:
                # Short wait if no data
                time.sleep(self.config.get("poll.interval.ms", 1000) / 1000.0)
                return []
            
            # Convert to SourceRecord
            source_records = []
            for record in records:
                source_record = self._convert_to_source_record(record)
                source_records.append(source_record)
                
                # Update offset
                self._update_offset(record)
            
            return source_records
            
        except Exception as e:
            self.log.error(f"Error occurred while polling data: {e}")
            raise
    
    def stop(self):
        """Stop task"""
        
        if self.connection:
            self.connection.close()
    
    def _fetch_records(self):
        """Fetch actual data"""
        
        # Batch size
        batch_size = int(self.config.get("batch.size", 1000))
        
        # Table list
        tables = self.config.get("table.whitelist", "").split(",")
        
        records = []
        for table in tables:
            if table.strip():
                table_records = self._fetch_table_records(table.strip(), batch_size)
                records.extend(table_records)
        
        return records
    
    def _convert_to_source_record(self, record):
        """Convert record to SourceRecord"""
        
        topic = f"{self.config['topic.prefix']}.{record['table']}"
        
        # Schema definition
        key_schema = SchemaBuilder.struct() \
            .field("id", Schema.INT64_SCHEMA) \
            .build()
        
        value_schema = SchemaBuilder.struct() \
            .field("id", Schema.INT64_SCHEMA) \
            .field("name", Schema.STRING_SCHEMA) \
            .field("created_at", Schema.STRING_SCHEMA) \
            .build()
        
        # Create key and value
        key = Struct(key_schema).put("id", record["id"])
        value = Struct(value_schema) \
            .put("id", record["id"]) \
            .put("name", record["name"]) \
            .put("created_at", record["created_at"])
        
        # Offset information
        offset = {
            "table": record["table"],
            "id": record["id"],
            "timestamp": record["timestamp"]
        }
        
        return SourceRecord(
            partition=None,
            offset=offset,
            topic=topic,
            key_schema=key_schema,
            key=key,
            value_schema=value_schema,
            value=value,
            timestamp=record["timestamp"]
        )
    
    def _update_offset(self, record):
        """Update offset"""
        
        self.last_offset[record["table"]] = {
            "id": record["id"],
            "timestamp": record["timestamp"]
        }
    
    def _restore_offset(self):
        """Restore offset"""
        
        # Kafka Connect automatically manages offsets
        pass

Sink Connector Development

class CustomSinkConnector(SinkConnector):
    """Custom sink connector"""
    
    def __init__(self):
        self.config = {}
    
    def start(self, props):
        """Start connector"""
        
        self.config = {
            "target.hostname": props.get("target.hostname"),
            "target.port": props.get("target.port"),
            "target.database": props.get("target.database"),
            "target.username": props.get("target.username"),
            "target.password": props.get("target.password"),
            "batch.size": int(props.get("batch.size", "1000")),
            "flush.timeout.ms": int(props.get("flush.timeout.ms", "5000")),
            "auto.create": props.get("auto.create", "true").lower() == "true",
            "delete.enabled": props.get("delete.enabled", "false").lower() == "true"
        }
    
    def task_configs(self, maxTasks):
        """Generate task configurations"""
        
        tasks_config = []
        for i in range(maxTasks):
            task_config = self.config.copy()
            task_config["task.id"] = str(i)
            tasks_config.append(task_config)
        
        return tasks_config
    
    def stop(self):
        """Stop connector"""
        pass
    
    def config(self):
        """Configuration schema"""
        
        return ConfigDef() \
            .define("target.hostname", Type.STRING, ConfigDef.Importance.HIGH,
                   "Target system hostname") \
            .define("target.port", Type.INT, ConfigDef.Importance.HIGH,
                   "Target system port") \
            .define("target.database", Type.STRING, ConfigDef.Importance.HIGH,
                   "Target database") \
            .define("target.username", Type.STRING, ConfigDef.Importance.HIGH,
                   "Target system username") \
            .define("target.password", Type.PASSWORD, ConfigDef.Importance.HIGH,
                   "Target system password") \
            .define("batch.size", Type.INT, 1000, ConfigDef.Importance.MEDIUM,
                   "Batch size") \
            .define("flush.timeout.ms", Type.INT, 5000, ConfigDef.Importance.MEDIUM,
                   "Flush timeout") \
            .define("auto.create", Type.BOOLEAN, True, ConfigDef.Importance.LOW,
                   "Auto table creation") \
            .define("delete.enabled", Type.BOOLEAN, False, ConfigDef.Importance.MEDIUM,
                   "Enable delete operations")


class CustomSinkTask(SinkTask):
    """Custom sink task"""
    
    def __init__(self):
        self.config = {}
        self.connection = None
        self.batch = []
        self.last_flush_time = time.time()
    
    def start(self, props):
        """Start task"""
        
        self.config = props
        self.connection = self._create_connection()
        
        # Initialize table schema cache
        self._initialize_table_schemas()
    
    def put(self, records):
        """Process records"""
        
        for record in records:
            self.batch.append(record)
            
            # Flush when batch size or timeout reached
            if (len(self.batch) >= int(self.config["batch.size"]) or
                time.time() - self.last_flush_time > int(self.config["flush.timeout.ms"]) / 1000.0):
                self._flush_batch()
    
    def flush(self, offsets):
        """Manual flush"""
        
        self._flush_batch()
    
    def stop(self):
        """Stop task"""
        
        # Process remaining batch
        if self.batch:
            self._flush_batch()
        
        if self.connection:
            self.connection.close()
    
    def _flush_batch(self):
        """Flush batch"""
        
        if not self.batch:
            return
        
        try:
            # Group batch by table
            grouped_records = self._group_by_table(self.batch)
            
            # Process each table
            for table, records in grouped_records.items():
                self._process_table_batch(table, records)
            
            # Reset batch
            self.batch = []
            self.last_flush_time = time.time()
            
        except Exception as e:
            self.log.error(f"Error occurred while flushing batch: {e}")
            raise
    
    def _process_table_batch(self, table, records):
        """Process table batch"""
        
        # Classify as INSERT, UPDATE, DELETE
        inserts = [r for r in records if r.value() is not None]
        deletes = [r for r in records if r.value() is None]
        
        # Process INSERT/UPDATE
        if inserts:
            self._upsert_records(table, inserts)
        
        # Process DELETE
        if deletes and self.config.get("delete.enabled", "false").lower() == "true":
            self._delete_records(table, deletes)
    
    def _upsert_records(self, table, records):
        """Upsert records"""
        
        # Actual upsert logic implementation
        pass
    
    def _delete_records(self, table, records):
        """Delete records"""
        
        # Actual delete logic implementation
        pass

Custom Transform Development

class CustomTransform(Transform):
    """Custom Single Message Transform"""
    
    def __init__(self):
        self.config = {}
    
    def configure(self, configs):
        """Configure settings"""
        
        self.config = {
            "field.mapping": configs.get("field.mapping", ""),
            "data.type": configs.get("data.type", "json"),
            "validation.enabled": configs.get("validation.enabled", "true").lower() == "true"
        }
        
        # Parse field mapping
        if self.config["field.mapping"]:
            self.field_mapping = self._parse_field_mapping(self.config["field.mapping"])
        else:
            self.field_mapping = {}
    
    def apply(self, record):
        """Apply record transformation"""
        
        if record is None:
            return None
        
        try:
            # Extract value
            value = record.value()
            
            if value is None:
                return record
            
            # Process by data type
            if self.config["data.type"] == "json":
                transformed_value = self._transform_json(value)
            elif self.config["data.type"] == "avro":
                transformed_value = self._transform_avro(value)
            else:
                transformed_value = value
            
            # Validation
            if self.config["validation.enabled"]:
                self._validate_record(transformed_value)
            
            # Create new record
            return record.new_record(
                topic=record.topic(),
                partition=record.kafkaPartition(),
                key_schema=record.keySchema(),
                key=record.key(),
                value_schema=record.valueSchema(),
                value=transformed_value,
                timestamp=record.timestamp()
            )
            
        except Exception as e:
            self.log.error(f"Error occurred while transforming record: {e}")
            return record  # Return original record
    
    def _transform_json(self, value):
        """Transform JSON data"""
        
        if isinstance(value, dict):
            transformed = value.copy()
            
            # Apply field mapping
            for old_field, new_field in self.field_mapping.items():
                if old_field in transformed:
                    transformed[new_field] = transformed.pop(old_field)
            
            # Additional transformation logic
            transformed = self._apply_business_rules(transformed)
            
            return transformed
        
        return value
    
    def _transform_avro(self, value):
        """Transform Avro data"""
        
        # Avro schema transformation logic
        return value
    
    def _apply_business_rules(self, data):
        """Apply business rules"""
        
        # Example: timestamp format conversion
        if "created_at" in data:
            data["created_at"] = self._format_timestamp(data["created_at"])
        
        # Example: data normalization
        if "email" in data:
            data["email"] = data["email"].lower().strip()
        
        return data
    
    def _validate_record(self, value):
        """Validate record"""
        
        if not isinstance(value, dict):
            return
        
        # Required field validation
        required_fields = ["id", "name"]
        for field in required_fields:
            if field not in value:
                raise ValueError(f"Required field '{field}' is missing")
    
    def _parse_field_mapping(self, mapping_str):
        """Parse field mapping"""
        
        mapping = {}
        for pair in mapping_str.split(","):
            if ":" in pair:
                old_field, new_field = pair.split(":", 1)
                mapping[old_field.strip()] = new_field.strip()
        
        return mapping
    
    def _format_timestamp(self, timestamp):
        """Format timestamp"""
        
        # Actual formatting logic
        return timestamp

🔒 Data Consistency Assurance and Validation

Data Consistency Strategy

Ensuring data consistency in CDC pipelines is crucial.

Consistency Assurance Methods

Method Description Implementation Complexity Performance Impact Use Cases
Exactly-once Semantics Process each message exactly once High Medium Financial transactions, order processing
At-least-once Semantics Process messages at least once Low Low Log collection, metrics collection
At-most-once Semantics Process messages at most once Medium Low Notifications, event streaming
Transactional Processing Guarantee consistency by transaction unit High High Account transfers, inventory management

📊 Monitoring and Disaster Recovery Strategies

Comprehensive Monitoring System

A comprehensive monitoring system for healthy operation of CDC pipelines.

Monitoring Layers

Layer Monitoring Target Key Metrics Alert Thresholds
Infrastructure Layer • Kubernetes cluster
• Kafka brokers
• Databases
• CPU/memory usage
• Disk I/O
• Network throughput
• CPU > 80%
• Memory > 90%
• Disk > 85%
Application Layer • Kafka Connect workers
• Connectors
• Tasks
• JVM memory
• GC time
• Thread count
• GC time > 20%
• Thread count > 1000
• Heap usage > 80%
Data Layer • Data throughput
• Latency
• Error rate
• Records/sec
• P99 latency
• Failure rate
• Throughput < 1000/s
• Latency > 5s
• Error rate > 1%
Business Layer • Data quality
• Consistency
• Completeness
• Data validation failures
• Checksum mismatches
• Missing records
• Validation failures > 0
• Checksum mismatches > 0
• Missing rate > 0.1%

🚀 Practical Project: Enterprise CDC Operation System

Project Overview

Build an enterprise-grade CDC operation system for a large-scale e-commerce platform.

System Architecture

Component Technology Stack Capacity High Availability
Source Systems • MySQL 8.0 (Orders)
• PostgreSQL 13 (Users)
• MongoDB 5.0 (Products)
• 1M+ records/day
• 50+ tables
• 10+ databases
• Read-only replicas
• Automatic failure recovery
Kafka Cluster • Apache Kafka 3.0
• Schema Registry
• Kafka Connect
• 3 brokers
• 100+ topics
• 1000+ partitions
• Triple replication
• Automatic rebalancing
Target Systems • Elasticsearch 8.0
• Redis 7.0
• S3 Data Lake
• Snowflake
• 3-node ES cluster
• 6-node Redis cluster
• Unlimited S3 storage
• Cluster mode
• Automatic backup

📚 Learning Summary

What We Learned in This Part

  1. Kafka Connect Advanced Architecture
    • Distributed mode configuration and cluster design
    • Worker node management and scalability strategies
    • High availability and disaster recovery mechanisms
  2. Custom Connector Development
    • Source Connector and Sink Connector implementation
    • Custom Transform development
    • Connector testing and deployment strategies
  3. Large-scale CDC Pipeline Operations
    • Pipeline orchestration
    • Load balancing and failure isolation
    • Scalability and data consistency assurance
  4. Performance Optimization and Bottleneck Resolution
    • Performance bottleneck analysis
    • Dynamic performance tuning
    • Auto-scaling implementation
  5. Data Consistency Assurance and Validation
    • Consistency assurance methodology
    • Data validation system
    • Drift detection and anomaly detection
  6. Monitoring and Disaster Recovery Strategies
    • Comprehensive monitoring system
    • Disaster recovery planning
    • Automatic failure handling
  7. Practical Project
    • Enterprise-grade CDC operation system
    • Large-scale e-commerce platform application
    • Operation automation and optimization

Core Technology Stack

Technology Role Importance Learning Points
Kafka Connect Connector framework ⭐⭐⭐⭐⭐ Distributed architecture, scalability
Debezium CDC platform ⭐⭐⭐⭐⭐ Advanced configuration, performance optimization
Custom Connectors Specialized data processing ⭐⭐⭐⭐ Development patterns, testing strategies
Monitoring Operational visibility ⭐⭐⭐⭐⭐ Metric collection, alert configuration
Automation Operational efficiency ⭐⭐⭐⭐ Scaling, recovery automation

Next Steps

We have now learned all the core content of the CDC series. The next steps are:

  1. Real Project Application: Apply learned content to actual projects
  2. Advanced Topic Exploration: Stream processing, event sourcing, and other advanced topics
  3. Other Technology Stacks: Apache Pulsar, Apache Flink, and other streaming technologies

Series Complete: Change Data Capture Complete Guide Series


Build enterprise-grade real-time data pipelines to complete the core of modern data architecture! 🚀