Part 2: Kafka Connect and Production CDC Operations - Enterprise Real-time Data Pipeline
Part 2: Kafka Connect and Production CDC Operations - Enterprise Real-time Data Pipeline
Advanced Kafka Connect architecture, custom connector development, large-scale CDC pipeline operation strategies, performance optimization and disaster recovery.
📋 Table of Contents
- Kafka Connect Advanced Architecture
- Custom Connector Development
- Large-scale CDC Pipeline Operations
- Performance Optimization and Bottleneck Resolution
- Data Consistency Assurance and Validation
- Monitoring and Disaster Recovery Strategies
- Practical Project: Enterprise CDC Operation System
- Learning Summary
🏗️ Kafka Connect Advanced Architecture
Kafka Connect Architecture Deep Dive
Kafka Connect is a distributed streaming platform that provides a scalable framework for data synchronization between databases and systems.
Core Components Detailed Analysis
Component | Role | Scalability Considerations | Operational Points |
---|---|---|---|
Connect Workers | Connector execution environment | • Horizontal scaling capability • CPU/memory-based scaling |
• Resource monitoring • Automated failure recovery |
Connectors | Data source/sink logic | • Plugin architecture • Independent deployment |
• Version management • Compatibility testing |
Tasks | Actual data processing | • Partition-based parallelization • Dynamic task allocation |
• Load distribution • Failure isolation |
Transforms | Data transformation logic | • Chainable transformations • Custom SMT support |
• Performance optimization • Memory management |
Cluster Configuration Strategy
class KafkaConnectClusterManager:
def __init__(self):
self.cluster_config = {}
def design_cluster_architecture(self, requirements):
"""Design cluster architecture for large-scale CDC requirements"""
architecture = {
"cluster_size": {
"workers": self._calculate_worker_count(requirements),
"connectors": requirements["estimated_connectors"],
"tasks_per_connector": requirements["max_concurrency"]
},
"resource_allocation": {
"worker_specs": {
"cpu": "4 cores",
"memory": "8GB",
"heap": "4GB",
"disk": "100GB SSD"
},
"jvm_settings": {
"Xmx": "4g",
"Xms": "4g",
"GC_algorithm": "G1GC",
"GC_tuning": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200"
}
},
"network_topology": {
"worker_distribution": "multi_zone",
"replication_strategy": "rack_aware",
"load_balancing": "round_robin"
}
}
return architecture
def _calculate_worker_count(self, requirements):
"""Calculate worker count"""
# Basic calculation formula
base_workers = max(3, requirements["estimated_connectors"] // 10)
# Minimum workers for high availability
ha_workers = max(base_workers, 3)
# Buffer for load distribution
buffer_workers = int(ha_workers * 0.3)
return ha_workers + buffer_workers
def configure_distributed_mode(self, cluster_config):
"""Configure distributed mode"""
distributed_config = {
"bootstrap.servers": cluster_config["kafka_bootstrap_servers"],
"group.id": cluster_config["connect_cluster_id"],
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"offset.storage.topic": f"{cluster_config['connect_cluster_id']}-offsets",
"offset.storage.replication.factor": "3",
"offset.storage.partitions": "25",
"config.storage.topic": f"{cluster_config['connect_cluster_id']}-configs",
"config.storage.replication.factor": "3",
"status.storage.topic": f"{cluster_config['connect_cluster_id']}-status",
"status.storage.replication.factor": "3",
"status.storage.partitions": "5",
"rest.host.name": "0.0.0.0",
"rest.port": "8083",
"plugin.path": "/opt/connectors",
"connector.client.config.override.policy": "All",
"producer.security.protocol": "SASL_SSL",
"consumer.security.protocol": "SASL_SSL"
}
return distributed_config
High Availability and Disaster Recovery
Failure Scenario | Recovery Strategy | Automation Level | Recovery Time |
---|---|---|---|
Worker Node Failure | • Automatic task reallocation • Health check-based detection |
Fully automated | 30-60 seconds |
Connector Failure | • Backoff retry • Failure isolation |
Automated | 1-5 minutes |
Kafka Broker Failure | • Leader election • Partition reassignment |
Fully automated | 10-30 seconds |
Network Partition | • Quorum-based consensus • Failure recovery |
Automated | 1-2 minutes |
🔧 Custom Connector Development
Custom Connector Architecture
Custom connectors are specialized connectors developed for specific data sources or sinks.
Source Connector Development
class CustomSourceConnector(SourceConnector):
"""Custom source connector example"""
def __init__(self):
self.config = {}
self.version = "1.0.0"
def start(self, props):
"""Start connector"""
self.config = {
"database.hostname": props.get("database.hostname"),
"database.port": props.get("database.port"),
"database.user": props.get("database.user"),
"database.password": props.get("database.password"),
"database.name": props.get("database.name"),
"batch.size": int(props.get("batch.size", "1000")),
"poll.interval.ms": int(props.get("poll.interval.ms", "1000")),
"topic.prefix": props.get("topic.prefix", "custom"),
"table.whitelist": props.get("table.whitelist", "").split(",")
}
# Test database connection
self._validate_connection()
def task_configs(self, maxTasks):
"""Generate task configurations"""
# Distribute tables across tasks
tables = self.config["table.whitelist"]
tasks_config = []
for i in range(maxTasks):
task_config = self.config.copy()
task_config["task.id"] = str(i)
# Table distribution logic
if tables:
start_idx = i * len(tables) // maxTasks
end_idx = (i + 1) * len(tables) // maxTasks
task_config["table.whitelist"] = tables[start_idx:end_idx]
tasks_config.append(task_config)
return tasks_config
def stop(self):
"""Stop connector"""
pass
def config(self):
"""Define configuration schema"""
return ConfigDef() \
.define("database.hostname", Type.STRING, ConfigDef.Importance.HIGH,
"Database hostname") \
.define("database.port", Type.INT, 3306, ConfigDef.Importance.HIGH,
"Database port") \
.define("database.user", Type.STRING, ConfigDef.Importance.HIGH,
"Database username") \
.define("database.password", Type.PASSWORD, ConfigDef.Importance.HIGH,
"Database password") \
.define("database.name", Type.STRING, ConfigDef.Importance.HIGH,
"Database name") \
.define("batch.size", Type.INT, 1000, ConfigDef.Importance.MEDIUM,
"Batch size") \
.define("poll.interval.ms", Type.INT, 1000, ConfigDef.Importance.MEDIUM,
"Polling interval (milliseconds)") \
.define("topic.prefix", Type.STRING, "custom", ConfigDef.Importance.HIGH,
"Topic prefix") \
.define("table.whitelist", Type.STRING, ConfigDef.Importance.HIGH,
"Table whitelist (comma-separated)")
def _validate_connection(self):
"""Validate database connection"""
try:
# Actual connection validation logic
connection = self._create_connection()
connection.close()
self.log.info("Database connection validation successful")
except Exception as e:
raise ConnectException(f"Database connection failed: {e}")
def _create_connection(self):
"""Create database connection"""
# Actual connection creation logic
pass
class CustomSourceTask(SourceTask):
"""Custom source task"""
def __init__(self):
self.config = {}
self.connection = None
self.last_offset = {}
def start(self, props):
"""Start task"""
self.config = props
self.connection = self._create_connection()
# Restore offset
self._restore_offset()
def poll(self):
"""Poll data"""
try:
# Fetch data in batch size
records = self._fetch_records()
if not records:
# Short wait if no data
time.sleep(self.config.get("poll.interval.ms", 1000) / 1000.0)
return []
# Convert to SourceRecord
source_records = []
for record in records:
source_record = self._convert_to_source_record(record)
source_records.append(source_record)
# Update offset
self._update_offset(record)
return source_records
except Exception as e:
self.log.error(f"Error occurred while polling data: {e}")
raise
def stop(self):
"""Stop task"""
if self.connection:
self.connection.close()
def _fetch_records(self):
"""Fetch actual data"""
# Batch size
batch_size = int(self.config.get("batch.size", 1000))
# Table list
tables = self.config.get("table.whitelist", "").split(",")
records = []
for table in tables:
if table.strip():
table_records = self._fetch_table_records(table.strip(), batch_size)
records.extend(table_records)
return records
def _convert_to_source_record(self, record):
"""Convert record to SourceRecord"""
topic = f"{self.config['topic.prefix']}.{record['table']}"
# Schema definition
key_schema = SchemaBuilder.struct() \
.field("id", Schema.INT64_SCHEMA) \
.build()
value_schema = SchemaBuilder.struct() \
.field("id", Schema.INT64_SCHEMA) \
.field("name", Schema.STRING_SCHEMA) \
.field("created_at", Schema.STRING_SCHEMA) \
.build()
# Create key and value
key = Struct(key_schema).put("id", record["id"])
value = Struct(value_schema) \
.put("id", record["id"]) \
.put("name", record["name"]) \
.put("created_at", record["created_at"])
# Offset information
offset = {
"table": record["table"],
"id": record["id"],
"timestamp": record["timestamp"]
}
return SourceRecord(
partition=None,
offset=offset,
topic=topic,
key_schema=key_schema,
key=key,
value_schema=value_schema,
value=value,
timestamp=record["timestamp"]
)
def _update_offset(self, record):
"""Update offset"""
self.last_offset[record["table"]] = {
"id": record["id"],
"timestamp": record["timestamp"]
}
def _restore_offset(self):
"""Restore offset"""
# Kafka Connect automatically manages offsets
pass
Sink Connector Development
class CustomSinkConnector(SinkConnector):
"""Custom sink connector"""
def __init__(self):
self.config = {}
def start(self, props):
"""Start connector"""
self.config = {
"target.hostname": props.get("target.hostname"),
"target.port": props.get("target.port"),
"target.database": props.get("target.database"),
"target.username": props.get("target.username"),
"target.password": props.get("target.password"),
"batch.size": int(props.get("batch.size", "1000")),
"flush.timeout.ms": int(props.get("flush.timeout.ms", "5000")),
"auto.create": props.get("auto.create", "true").lower() == "true",
"delete.enabled": props.get("delete.enabled", "false").lower() == "true"
}
def task_configs(self, maxTasks):
"""Generate task configurations"""
tasks_config = []
for i in range(maxTasks):
task_config = self.config.copy()
task_config["task.id"] = str(i)
tasks_config.append(task_config)
return tasks_config
def stop(self):
"""Stop connector"""
pass
def config(self):
"""Configuration schema"""
return ConfigDef() \
.define("target.hostname", Type.STRING, ConfigDef.Importance.HIGH,
"Target system hostname") \
.define("target.port", Type.INT, ConfigDef.Importance.HIGH,
"Target system port") \
.define("target.database", Type.STRING, ConfigDef.Importance.HIGH,
"Target database") \
.define("target.username", Type.STRING, ConfigDef.Importance.HIGH,
"Target system username") \
.define("target.password", Type.PASSWORD, ConfigDef.Importance.HIGH,
"Target system password") \
.define("batch.size", Type.INT, 1000, ConfigDef.Importance.MEDIUM,
"Batch size") \
.define("flush.timeout.ms", Type.INT, 5000, ConfigDef.Importance.MEDIUM,
"Flush timeout") \
.define("auto.create", Type.BOOLEAN, True, ConfigDef.Importance.LOW,
"Auto table creation") \
.define("delete.enabled", Type.BOOLEAN, False, ConfigDef.Importance.MEDIUM,
"Enable delete operations")
class CustomSinkTask(SinkTask):
"""Custom sink task"""
def __init__(self):
self.config = {}
self.connection = None
self.batch = []
self.last_flush_time = time.time()
def start(self, props):
"""Start task"""
self.config = props
self.connection = self._create_connection()
# Initialize table schema cache
self._initialize_table_schemas()
def put(self, records):
"""Process records"""
for record in records:
self.batch.append(record)
# Flush when batch size or timeout reached
if (len(self.batch) >= int(self.config["batch.size"]) or
time.time() - self.last_flush_time > int(self.config["flush.timeout.ms"]) / 1000.0):
self._flush_batch()
def flush(self, offsets):
"""Manual flush"""
self._flush_batch()
def stop(self):
"""Stop task"""
# Process remaining batch
if self.batch:
self._flush_batch()
if self.connection:
self.connection.close()
def _flush_batch(self):
"""Flush batch"""
if not self.batch:
return
try:
# Group batch by table
grouped_records = self._group_by_table(self.batch)
# Process each table
for table, records in grouped_records.items():
self._process_table_batch(table, records)
# Reset batch
self.batch = []
self.last_flush_time = time.time()
except Exception as e:
self.log.error(f"Error occurred while flushing batch: {e}")
raise
def _process_table_batch(self, table, records):
"""Process table batch"""
# Classify as INSERT, UPDATE, DELETE
inserts = [r for r in records if r.value() is not None]
deletes = [r for r in records if r.value() is None]
# Process INSERT/UPDATE
if inserts:
self._upsert_records(table, inserts)
# Process DELETE
if deletes and self.config.get("delete.enabled", "false").lower() == "true":
self._delete_records(table, deletes)
def _upsert_records(self, table, records):
"""Upsert records"""
# Actual upsert logic implementation
pass
def _delete_records(self, table, records):
"""Delete records"""
# Actual delete logic implementation
pass
Custom Transform Development
class CustomTransform(Transform):
"""Custom Single Message Transform"""
def __init__(self):
self.config = {}
def configure(self, configs):
"""Configure settings"""
self.config = {
"field.mapping": configs.get("field.mapping", ""),
"data.type": configs.get("data.type", "json"),
"validation.enabled": configs.get("validation.enabled", "true").lower() == "true"
}
# Parse field mapping
if self.config["field.mapping"]:
self.field_mapping = self._parse_field_mapping(self.config["field.mapping"])
else:
self.field_mapping = {}
def apply(self, record):
"""Apply record transformation"""
if record is None:
return None
try:
# Extract value
value = record.value()
if value is None:
return record
# Process by data type
if self.config["data.type"] == "json":
transformed_value = self._transform_json(value)
elif self.config["data.type"] == "avro":
transformed_value = self._transform_avro(value)
else:
transformed_value = value
# Validation
if self.config["validation.enabled"]:
self._validate_record(transformed_value)
# Create new record
return record.new_record(
topic=record.topic(),
partition=record.kafkaPartition(),
key_schema=record.keySchema(),
key=record.key(),
value_schema=record.valueSchema(),
value=transformed_value,
timestamp=record.timestamp()
)
except Exception as e:
self.log.error(f"Error occurred while transforming record: {e}")
return record # Return original record
def _transform_json(self, value):
"""Transform JSON data"""
if isinstance(value, dict):
transformed = value.copy()
# Apply field mapping
for old_field, new_field in self.field_mapping.items():
if old_field in transformed:
transformed[new_field] = transformed.pop(old_field)
# Additional transformation logic
transformed = self._apply_business_rules(transformed)
return transformed
return value
def _transform_avro(self, value):
"""Transform Avro data"""
# Avro schema transformation logic
return value
def _apply_business_rules(self, data):
"""Apply business rules"""
# Example: timestamp format conversion
if "created_at" in data:
data["created_at"] = self._format_timestamp(data["created_at"])
# Example: data normalization
if "email" in data:
data["email"] = data["email"].lower().strip()
return data
def _validate_record(self, value):
"""Validate record"""
if not isinstance(value, dict):
return
# Required field validation
required_fields = ["id", "name"]
for field in required_fields:
if field not in value:
raise ValueError(f"Required field '{field}' is missing")
def _parse_field_mapping(self, mapping_str):
"""Parse field mapping"""
mapping = {}
for pair in mapping_str.split(","):
if ":" in pair:
old_field, new_field = pair.split(":", 1)
mapping[old_field.strip()] = new_field.strip()
return mapping
def _format_timestamp(self, timestamp):
"""Format timestamp"""
# Actual formatting logic
return timestamp
🔒 Data Consistency Assurance and Validation
Data Consistency Strategy
Ensuring data consistency in CDC pipelines is crucial.
Consistency Assurance Methods
Method | Description | Implementation Complexity | Performance Impact | Use Cases |
---|---|---|---|---|
Exactly-once Semantics | Process each message exactly once | High | Medium | Financial transactions, order processing |
At-least-once Semantics | Process messages at least once | Low | Low | Log collection, metrics collection |
At-most-once Semantics | Process messages at most once | Medium | Low | Notifications, event streaming |
Transactional Processing | Guarantee consistency by transaction unit | High | High | Account transfers, inventory management |
📊 Monitoring and Disaster Recovery Strategies
Comprehensive Monitoring System
A comprehensive monitoring system for healthy operation of CDC pipelines.
Monitoring Layers
Layer | Monitoring Target | Key Metrics | Alert Thresholds |
---|---|---|---|
Infrastructure Layer | • Kubernetes cluster • Kafka brokers • Databases |
• CPU/memory usage • Disk I/O • Network throughput |
• CPU > 80% • Memory > 90% • Disk > 85% |
Application Layer | • Kafka Connect workers • Connectors • Tasks |
• JVM memory • GC time • Thread count |
• GC time > 20% • Thread count > 1000 • Heap usage > 80% |
Data Layer | • Data throughput • Latency • Error rate |
• Records/sec • P99 latency • Failure rate |
• Throughput < 1000/s • Latency > 5s • Error rate > 1% |
Business Layer | • Data quality • Consistency • Completeness |
• Data validation failures • Checksum mismatches • Missing records |
• Validation failures > 0 • Checksum mismatches > 0 • Missing rate > 0.1% |
🚀 Practical Project: Enterprise CDC Operation System
Project Overview
Build an enterprise-grade CDC operation system for a large-scale e-commerce platform.
System Architecture
Component | Technology Stack | Capacity | High Availability |
---|---|---|---|
Source Systems | • MySQL 8.0 (Orders) • PostgreSQL 13 (Users) • MongoDB 5.0 (Products) |
• 1M+ records/day • 50+ tables • 10+ databases |
• Read-only replicas • Automatic failure recovery |
Kafka Cluster | • Apache Kafka 3.0 • Schema Registry • Kafka Connect |
• 3 brokers • 100+ topics • 1000+ partitions |
• Triple replication • Automatic rebalancing |
Target Systems | • Elasticsearch 8.0 • Redis 7.0 • S3 Data Lake • Snowflake |
• 3-node ES cluster • 6-node Redis cluster • Unlimited S3 storage |
• Cluster mode • Automatic backup |
📚 Learning Summary
What We Learned in This Part
- Kafka Connect Advanced Architecture
- Distributed mode configuration and cluster design
- Worker node management and scalability strategies
- High availability and disaster recovery mechanisms
- Custom Connector Development
- Source Connector and Sink Connector implementation
- Custom Transform development
- Connector testing and deployment strategies
- Large-scale CDC Pipeline Operations
- Pipeline orchestration
- Load balancing and failure isolation
- Scalability and data consistency assurance
- Performance Optimization and Bottleneck Resolution
- Performance bottleneck analysis
- Dynamic performance tuning
- Auto-scaling implementation
- Data Consistency Assurance and Validation
- Consistency assurance methodology
- Data validation system
- Drift detection and anomaly detection
- Monitoring and Disaster Recovery Strategies
- Comprehensive monitoring system
- Disaster recovery planning
- Automatic failure handling
- Practical Project
- Enterprise-grade CDC operation system
- Large-scale e-commerce platform application
- Operation automation and optimization
Core Technology Stack
Technology | Role | Importance | Learning Points |
---|---|---|---|
Kafka Connect | Connector framework | ⭐⭐⭐⭐⭐ | Distributed architecture, scalability |
Debezium | CDC platform | ⭐⭐⭐⭐⭐ | Advanced configuration, performance optimization |
Custom Connectors | Specialized data processing | ⭐⭐⭐⭐ | Development patterns, testing strategies |
Monitoring | Operational visibility | ⭐⭐⭐⭐⭐ | Metric collection, alert configuration |
Automation | Operational efficiency | ⭐⭐⭐⭐ | Scaling, recovery automation |
Next Steps
We have now learned all the core content of the CDC series. The next steps are:
- Real Project Application: Apply learned content to actual projects
- Advanced Topic Exploration: Stream processing, event sourcing, and other advanced topics
- Other Technology Stacks: Apache Pulsar, Apache Flink, and other streaming technologies
Series Complete: Change Data Capture Complete Guide Series
Build enterprise-grade real-time data pipelines to complete the core of modern data architecture! 🚀