Apache Kafka Real-time Streaming Guide: From Producer to Consumer
⏱️ 30 min
📊 Advanced
Apache Kafka Real-time Streaming Guide: From Producer to Consumer
Learn core concepts and practical applications of Apache Kafka for processing large-scale real-time data and apply them to real projects.
📖 Table of Contents
- Deep Understanding of Kafka Architecture
- Producer Optimization Strategies
- Consumer Groups and Partitioning
- Stream Processing and KStreams
- Monitoring and Operations Management
- Hands-on: Building Production-Grade Streaming System
- Performance Tuning and Scalability
- Security and Access Control
- Learning Summary
🏗️ Deep Understanding of Kafka Architecture
Core Component Analysis
Apache Kafka consists of the following core components:
1. Broker
- Role: Core engine for message storage and delivery
- Operation: Sequentially stores messages by partition
- Performance Optimization: Batch processing, compression, indexing
2. Topic & Partition
- Topic: Logical classification of messages
- Partition: Physical division unit of topic
- Replication: Ensures data stability
3. Producer
- Role: Message production and transmission
- Features: Asynchronous transmission, batch processing, compression
- Performance: Balance between throughput and latency
4. Consumer
- Role: Message consumption and processing
- Consumer Group: Parallel processing and load distribution
- Offset Management: Tracks message processing status
Message Delivery Guarantees
# Message delivery guarantee levels
at-least-once: # At least once delivery (duplicates possible)
exactly-once: # Exactly once delivery (no duplicates)
at-most-once: # At most once delivery (loss possible)
⚡ Producer Optimization Strategies
1. Batch Processing Optimization
// Producer configuration optimization
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Batch processing settings
props.put("batch.size", 16384); // Batch size (16KB)
props.put("linger.ms", 5); // Batch wait time (5ms)
props.put("compression.type", "snappy"); // Compression type
props.put("acks", "all"); // All replicas confirmation
props.put("retries", 3); // Retry count
props.put("retry.backoff.ms", 100); // Retry interval
// High performance settings
props.put("buffer.memory", 33554432); // Buffer memory (32MB)
props.put("max.in.flight.requests.per.connection", 5); // Concurrent requests
props.put("enable.idempotence", true); // Idempotence guarantee
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
2. Partitioning Strategy
// Custom partitioner implementation
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// Key-based partitioning
if (key != null) {
return Math.abs(key.hashCode()) % numPartitions;
}
// Round-robin partitioning
return ThreadLocalRandom.current().nextInt(numPartitions);
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
// Partitioner configuration
props.put("partitioner.class", "com.example.CustomPartitioner");
3. Compression and Serialization
// Compression configuration comparison
public class CompressionComparison {
// Snappy compression (fast compression/decompression)
public void setupSnappyCompression() {
props.put("compression.type", "snappy");
// Pros: Low CPU usage, fast processing
// Cons: Low compression ratio
}
// Gzip compression (high compression ratio)
public void setupGzipCompression() {
props.put("compression.type", "gzip");
// Pros: High compression ratio, network bandwidth saving
// Cons: High CPU usage
}
// LZ4 compression (balanced performance)
public void setupLZ4Compression() {
props.put("compression.type", "lz4");
// Pros: Fast compression/decompression, moderate compression ratio
// Cons: Slightly slower than Snappy
}
}
🔄 Consumer Groups and Partitioning
1. Consumer Group Management
// Consumer group configuration
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Offset management settings
consumerProps.put("auto.offset.reset", "earliest"); // earliest, latest, none
consumerProps.put("enable.auto.commit", false); // Manual offset commit
consumerProps.put("max.poll.records", 500); // Records to process at once
consumerProps.put("session.timeout.ms", 30000); // Session timeout
consumerProps.put("heartbeat.interval.ms", 3000); // Heartbeat interval
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
2. Manual Offset Management
// Manual offset commit example
public class ManualOffsetCommit {
private KafkaConsumer<String, String> consumer;
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
public void processMessages() {
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// Process message
processMessage(record);
// Store offset (next offset to commit)
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offset = new OffsetAndMetadata(record.offset() + 1);
offsets.put(partition, offset);
}
// Commit offsets in batches
if (!offsets.isEmpty()) {
consumer.commitSync(offsets);
offsets.clear();
}
}
} catch (Exception e) {
// Rollback offset on error
consumer.seekToBeginning(consumer.assignment());
}
}
private void processMessage(ConsumerRecord<String, String> record) {
// Message processing logic
System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
3. Rebalancing Handling
// Rebalancing listener implementation
public class RebalanceListener implements ConsumerRebalanceListener {
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private KafkaConsumer<String, String> consumer;
public RebalanceListener(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Commit current offset before partition revocation
System.out.println("Partitions revoked: " + partitions);
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Restore offset after partition assignment
System.out.println("Partitions assigned: " + partitions);
for (TopicPartition partition : partitions) {
consumer.seekToBeginning(Arrays.asList(partition));
}
}
public void addOffset(TopicPartition partition, long offset) {
currentOffsets.put(partition, new OffsetAndMetadata(offset));
}
}
🌊 Stream Processing and KStreams
1. KStreams Basic Configuration
// KStreams application configuration
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// State store configuration
streamsProps.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// Performance optimization settings
streamsProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024); // 10MB
StreamsBuilder builder = new StreamsBuilder();
2. Stream Processing Example
// Real-time data transformation and aggregation
public class StreamProcessingExample {
public void setupStreamProcessing() {
StreamsBuilder builder = new StreamsBuilder();
// Create source stream
KStream<String, String> sourceStream = builder.stream("input-topic");
// Data transformation
KStream<String, String> transformedStream = sourceStream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> value.toUpperCase())
.map((key, value) -> KeyValue.pair(key, "PROCESSED: " + value));
// Window aggregation
KTable<Windowed<String>, Long> windowedCounts = sourceStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// Send results to output topic
transformedStream.to("output-topic");
windowedCounts.toStream().to("aggregated-topic");
// Start stream
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
streams.start();
// Handle shutdown signal
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
3. State Store Utilization
// Aggregation using state store
public class StatefulStreamProcessing {
public void setupStatefulProcessing() {
StreamsBuilder builder = new StreamsBuilder();
// Create state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("user-counts"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
// Aggregation using state store
KStream<String, String> sourceStream = builder.stream("user-events");
sourceStream
.groupByKey()
.aggregate(
() -> 0L,
(key, value, aggregate) -> aggregate + 1,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("user-counts")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
)
.toStream()
.to("user-counts-topic");
}
}
📊 Monitoring and Operations Management
1. JMX Metrics Collection
// JMX metrics monitoring
public class KafkaMetricsMonitor {
public void setupJMXMonitoring() {
// JMX configuration
System.setProperty("com.sun.management.jmxremote", "true");
System.setProperty("com.sun.management.jmxremote.port", "9999");
System.setProperty("com.sun.management.jmxremote.authenticate", "false");
System.setProperty("com.sun.management.jmxremote.ssl", "false");
// Metrics collection
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// Producer metrics
ObjectName producerMetrics = new ObjectName("kafka.producer:type=producer-metrics,client-id=my-producer");
ObjectName consumerMetrics = new ObjectName("kafka.consumer:type=consumer-metrics,client-id=my-consumer");
// Get metric values
Double recordSendRate = (Double) mBeanServer.getAttribute(producerMetrics, "record-send-rate");
Double recordSendTotal = (Double) mBeanServer.getAttribute(producerMetrics, "record-send-total");
System.out.println("Record Send Rate: " + recordSendRate);
System.out.println("Record Send Total: " + recordSendTotal);
}
}
2. Kafka Management Tools
#!/bin/bash
# Kafka management script
# Create topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 3 \
--replication-factor 2
# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092
# Describe topic
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic
# Check consumer group status
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --describe
# Reset offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --reset-offsets \
--to-earliest --topic my-topic --execute
# Send message test
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic my-topic
# Receive message test
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic my-topic --from-beginning
3. Alert System Implementation
// Kafka monitoring alert system
public class KafkaAlertSystem {
public void setupAlerting() {
// Metrics collection thread
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
// Latency monitoring
double consumerLag = getConsumerLag();
if (consumerLag > 1000) {
sendAlert("High consumer lag detected: " + consumerLag);
}
// Throughput monitoring
double throughput = getThroughput();
if (throughput < 100) {
sendAlert("Low throughput detected: " + throughput);
}
// Error rate monitoring
double errorRate = getErrorRate();
if (errorRate > 0.01) {
sendAlert("High error rate detected: " + errorRate);
}
} catch (Exception e) {
System.err.println("Error in monitoring: " + e.getMessage());
}
}, 0, 30, TimeUnit.SECONDS);
}
private void sendAlert(String message) {
// Send alerts via Slack, email, SMS, etc.
System.out.println("ALERT: " + message);
// Implement actual alerting
}
}
🛠️ Hands-on: Building Production-Grade Streaming System
1. Environment Setup
# Docker Compose Kafka cluster configuration
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
2. Real-time Log Processing System
// Real-time log processing system
public class LogProcessingSystem {
public static void main(String[] args) {
// Producer configuration
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("compression.type", "snappy");
producerProps.put("batch.size", 16384);
producerProps.put("linger.ms", 5);
// Consumer configuration
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "log-processor");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("auto.offset.reset", "earliest");
consumerProps.put("enable.auto.commit", false);
// Log collection thread
Thread logCollector = new Thread(() -> {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
while (true) {
// Read from log file
String logLine = readLogLine();
if (logLine != null) {
ProducerRecord<String, String> record = new ProducerRecord<>("raw-logs", logLine);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error sending log: " + exception.getMessage());
}
});
}
Thread.sleep(100); // 100ms interval
}
} catch (Exception e) {
System.err.println("Error in log collector: " + e.getMessage());
}
});
// Log processing thread
Thread logProcessor = new Thread(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Arrays.asList("raw-logs"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// Parse and process log
LogEntry logEntry = parseLog(record.value());
if (logEntry != null) {
processLogEntry(logEntry);
}
}
// Commit offset
consumer.commitSync();
}
} catch (Exception e) {
System.err.println("Error in log processor: " + e.getMessage());
}
});
// Start threads
logCollector.start();
logProcessor.start();
// Wait for shutdown signal
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logCollector.interrupt();
logProcessor.interrupt();
}));
}
private static String readLogLine() {
// Implement reading from actual log file
return "2025-09-09 10:30:45 INFO User login successful user_id=12345";
}
private static LogEntry parseLog(String logLine) {
// Implement log parsing logic
String[] parts = logLine.split(" ");
if (parts.length >= 4) {
return new LogEntry(parts[0] + " " + parts[1], parts[2], parts[3]);
}
return null;
}
private static void processLogEntry(LogEntry logEntry) {
// Implement log processing logic
System.out.println("Processing log: " + logEntry);
}
static class LogEntry {
String timestamp;
String level;
String message;
LogEntry(String timestamp, String level, String message) {
this.timestamp = timestamp;
this.level = level;
this.message = message;
}
@Override
public String toString() {
return String.format("[%s] %s: %s", timestamp, level, message);
}
}
}
3. Event-driven Microservices
// Event-driven microservices example
public class EventDrivenMicroservice {
public static void main(String[] args) {
// Event processing service
EventProcessor eventProcessor = new EventProcessor();
eventProcessor.start();
// Order service
OrderService orderService = new OrderService();
orderService.start();
// Inventory service
InventoryService inventoryService = new InventoryService();
inventoryService.start();
}
// Event processor
static class EventProcessor {
private KafkaConsumer<String, String> consumer;
public void start() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "event-processor");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders", "inventory", "payments"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
processEvent(record.topic(), record.key(), record.value());
}
}
}
private void processEvent(String topic, String key, String value) {
switch (topic) {
case "orders":
handleOrderEvent(key, value);
break;
case "inventory":
handleInventoryEvent(key, value);
break;
case "payments":
handlePaymentEvent(key, value);
break;
}
}
private void handleOrderEvent(String key, String value) {
System.out.println("Processing order event: " + key + " -> " + value);
// Order event processing logic
}
private void handleInventoryEvent(String key, String value) {
System.out.println("Processing inventory event: " + key + " -> " + value);
// Inventory event processing logic
}
private void handlePaymentEvent(String key, String value) {
System.out.println("Processing payment event: " + key + " -> " + value);
// Payment event processing logic
}
}
}
📚 Learning Summary
What We Learned in This Post
- Deep Understanding of Kafka Architecture
- Core component analysis
- Message delivery guarantee levels
- Partitioning and replication strategies
- Producer Optimization Strategies
- Batch processing optimization
- Partitioning strategies
- Compression and serialization
- Consumer Groups and Partitioning
- Consumer group management
- Manual offset management
- Rebalancing handling
- Stream Processing and KStreams
- KStreams basic configuration
- Stream processing examples
- State store utilization
- Monitoring and Operations Management
- JMX metrics collection
- Kafka management tools
- Alert system implementation
- Building Production-Grade Streaming System
- Real-time log processing system
- Event-driven microservices
- Docker environment configuration
Key Concepts Summary
Concept | Description | Importance |
---|---|---|
Producer Optimization | Performance and stability improvement | ⭐⭐⭐⭐⭐ |
Consumer Groups | Parallel processing and load distribution | ⭐⭐⭐⭐⭐ |
Stream Processing | Real-time data transformation | ⭐⭐⭐⭐⭐ |
Monitoring | System status tracking | ⭐⭐⭐⭐ |
Practical Application Considerations
- Performance Optimization: Batch processing, compression, partitioning
- Stability: Replication, offset management, error handling
- Monitoring: Metrics collection, alerts, log analysis
- Scalability: Cluster configuration, load distribution
With this guide, you can master the advanced features of Apache Kafka and build stable and efficient streaming systems in production! 🚀