Apache Kafka Real-time Streaming Guide: From Producer to Consumer

Learn core concepts and practical applications of Apache Kafka for processing large-scale real-time data and apply them to real projects.

📖 Table of Contents

  1. Deep Understanding of Kafka Architecture
  2. Producer Optimization Strategies
  3. Consumer Groups and Partitioning
  4. Stream Processing and KStreams
  5. Monitoring and Operations Management
  6. Hands-on: Building Production-Grade Streaming System
  7. Performance Tuning and Scalability
  8. Security and Access Control
  9. Learning Summary

🏗️ Deep Understanding of Kafka Architecture

Core Component Analysis

Apache Kafka consists of the following core components:

1. Broker

  • Role: Core engine for message storage and delivery
  • Operation: Sequentially stores messages by partition
  • Performance Optimization: Batch processing, compression, indexing

2. Topic & Partition

  • Topic: Logical classification of messages
  • Partition: Physical division unit of topic
  • Replication: Ensures data stability

3. Producer

  • Role: Message production and transmission
  • Features: Asynchronous transmission, batch processing, compression
  • Performance: Balance between throughput and latency

4. Consumer

  • Role: Message consumption and processing
  • Consumer Group: Parallel processing and load distribution
  • Offset Management: Tracks message processing status

Message Delivery Guarantees

# Message delivery guarantee levels
at-least-once:  # At least once delivery (duplicates possible)
exactly-once:   # Exactly once delivery (no duplicates)
at-most-once:   # At most once delivery (loss possible)

⚡ Producer Optimization Strategies

1. Batch Processing Optimization

// Producer configuration optimization
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Batch processing settings
props.put("batch.size", 16384);                    // Batch size (16KB)
props.put("linger.ms", 5);                         // Batch wait time (5ms)
props.put("compression.type", "snappy");           // Compression type
props.put("acks", "all");                          // All replicas confirmation
props.put("retries", 3);                           // Retry count
props.put("retry.backoff.ms", 100);               // Retry interval

// High performance settings
props.put("buffer.memory", 33554432);              // Buffer memory (32MB)
props.put("max.in.flight.requests.per.connection", 5); // Concurrent requests
props.put("enable.idempotence", true);             // Idempotence guarantee

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

2. Partitioning Strategy

// Custom partitioner implementation
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // Key-based partitioning
        if (key != null) {
            return Math.abs(key.hashCode()) % numPartitions;
        }
        
        // Round-robin partitioning
        return ThreadLocalRandom.current().nextInt(numPartitions);
    }
    
    @Override
    public void close() {}
    
    @Override
    public void configure(Map<String, ?> configs) {}
}

// Partitioner configuration
props.put("partitioner.class", "com.example.CustomPartitioner");

3. Compression and Serialization

// Compression configuration comparison
public class CompressionComparison {
    
    // Snappy compression (fast compression/decompression)
    public void setupSnappyCompression() {
        props.put("compression.type", "snappy");
        // Pros: Low CPU usage, fast processing
        // Cons: Low compression ratio
    }
    
    // Gzip compression (high compression ratio)
    public void setupGzipCompression() {
        props.put("compression.type", "gzip");
        // Pros: High compression ratio, network bandwidth saving
        // Cons: High CPU usage
    }
    
    // LZ4 compression (balanced performance)
    public void setupLZ4Compression() {
        props.put("compression.type", "lz4");
        // Pros: Fast compression/decompression, moderate compression ratio
        // Cons: Slightly slower than Snappy
    }
}

🔄 Consumer Groups and Partitioning

1. Consumer Group Management

// Consumer group configuration
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// Offset management settings
consumerProps.put("auto.offset.reset", "earliest");  // earliest, latest, none
consumerProps.put("enable.auto.commit", false);      // Manual offset commit
consumerProps.put("max.poll.records", 500);          // Records to process at once
consumerProps.put("session.timeout.ms", 30000);      // Session timeout
consumerProps.put("heartbeat.interval.ms", 3000);    // Heartbeat interval

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

2. Manual Offset Management

// Manual offset commit example
public class ManualOffsetCommit {
    private KafkaConsumer<String, String> consumer;
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    
    public void processMessages() {
        consumer.subscribe(Arrays.asList("my-topic"));
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    // Process message
                    processMessage(record);
                    
                    // Store offset (next offset to commit)
                    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                    OffsetAndMetadata offset = new OffsetAndMetadata(record.offset() + 1);
                    offsets.put(partition, offset);
                }
                
                // Commit offsets in batches
                if (!offsets.isEmpty()) {
                    consumer.commitSync(offsets);
                    offsets.clear();
                }
            }
        } catch (Exception e) {
            // Rollback offset on error
            consumer.seekToBeginning(consumer.assignment());
        }
    }
    
    private void processMessage(ConsumerRecord<String, String> record) {
        // Message processing logic
        System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
                record.topic(), record.partition(), record.offset(), 
                record.key(), record.value());
    }
}

3. Rebalancing Handling

// Rebalancing listener implementation
public class RebalanceListener implements ConsumerRebalanceListener {
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    private KafkaConsumer<String, String> consumer;
    
    public RebalanceListener(KafkaConsumer<String, String> consumer) {
        this.consumer = consumer;
    }
    
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // Commit current offset before partition revocation
        System.out.println("Partitions revoked: " + partitions);
        consumer.commitSync(currentOffsets);
        currentOffsets.clear();
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Restore offset after partition assignment
        System.out.println("Partitions assigned: " + partitions);
        for (TopicPartition partition : partitions) {
            consumer.seekToBeginning(Arrays.asList(partition));
        }
    }
    
    public void addOffset(TopicPartition partition, long offset) {
        currentOffsets.put(partition, new OffsetAndMetadata(offset));
    }
}

🌊 Stream Processing and KStreams

1. KStreams Basic Configuration

// KStreams application configuration
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// State store configuration
streamsProps.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

// Performance optimization settings
streamsProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024); // 10MB

StreamsBuilder builder = new StreamsBuilder();

2. Stream Processing Example

// Real-time data transformation and aggregation
public class StreamProcessingExample {
    
    public void setupStreamProcessing() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Create source stream
        KStream<String, String> sourceStream = builder.stream("input-topic");
        
        // Data transformation
        KStream<String, String> transformedStream = sourceStream
            .filter((key, value) -> value != null && !value.isEmpty())
            .mapValues(value -> value.toUpperCase())
            .map((key, value) -> KeyValue.pair(key, "PROCESSED: " + value));
        
        // Window aggregation
        KTable<Windowed<String>, Long> windowedCounts = sourceStream
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .count();
        
        // Send results to output topic
        transformedStream.to("output-topic");
        windowedCounts.toStream().to("aggregated-topic");
        
        // Start stream
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
        streams.start();
        
        // Handle shutdown signal
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

3. State Store Utilization

// Aggregation using state store
public class StatefulStreamProcessing {
    
    public void setupStatefulProcessing() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Create state store
        StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("user-counts"),
            Serdes.String(),
            Serdes.Long()
        );
        builder.addStateStore(storeBuilder);
        
        // Aggregation using state store
        KStream<String, String> sourceStream = builder.stream("user-events");
        
        sourceStream
            .groupByKey()
            .aggregate(
                () -> 0L,
                (key, value, aggregate) -> aggregate + 1,
                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("user-counts")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(Serdes.Long())
            )
            .toStream()
            .to("user-counts-topic");
    }
}

📊 Monitoring and Operations Management

1. JMX Metrics Collection

// JMX metrics monitoring
public class KafkaMetricsMonitor {
    
    public void setupJMXMonitoring() {
        // JMX configuration
        System.setProperty("com.sun.management.jmxremote", "true");
        System.setProperty("com.sun.management.jmxremote.port", "9999");
        System.setProperty("com.sun.management.jmxremote.authenticate", "false");
        System.setProperty("com.sun.management.jmxremote.ssl", "false");
        
        // Metrics collection
        MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
        
        // Producer metrics
        ObjectName producerMetrics = new ObjectName("kafka.producer:type=producer-metrics,client-id=my-producer");
        ObjectName consumerMetrics = new ObjectName("kafka.consumer:type=consumer-metrics,client-id=my-consumer");
        
        // Get metric values
        Double recordSendRate = (Double) mBeanServer.getAttribute(producerMetrics, "record-send-rate");
        Double recordSendTotal = (Double) mBeanServer.getAttribute(producerMetrics, "record-send-total");
        
        System.out.println("Record Send Rate: " + recordSendRate);
        System.out.println("Record Send Total: " + recordSendTotal);
    }
}

2. Kafka Management Tools

#!/bin/bash
# Kafka management script

# Create topic
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 2

# List topics
kafka-topics.sh --list --bootstrap-server localhost:9092

# Describe topic
kafka-topics.sh --describe \
  --bootstrap-server localhost:9092 \
  --topic my-topic

# Check consumer group status
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-consumer-group --describe

# Reset offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-consumer-group --reset-offsets \
  --to-earliest --topic my-topic --execute

# Send message test
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic my-topic

# Receive message test
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic --from-beginning

3. Alert System Implementation

// Kafka monitoring alert system
public class KafkaAlertSystem {
    
    public void setupAlerting() {
        // Metrics collection thread
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        
        scheduler.scheduleAtFixedRate(() -> {
            try {
                // Latency monitoring
                double consumerLag = getConsumerLag();
                if (consumerLag > 1000) {
                    sendAlert("High consumer lag detected: " + consumerLag);
                }
                
                // Throughput monitoring
                double throughput = getThroughput();
                if (throughput < 100) {
                    sendAlert("Low throughput detected: " + throughput);
                }
                
                // Error rate monitoring
                double errorRate = getErrorRate();
                if (errorRate > 0.01) {
                    sendAlert("High error rate detected: " + errorRate);
                }
                
            } catch (Exception e) {
                System.err.println("Error in monitoring: " + e.getMessage());
            }
        }, 0, 30, TimeUnit.SECONDS);
    }
    
    private void sendAlert(String message) {
        // Send alerts via Slack, email, SMS, etc.
        System.out.println("ALERT: " + message);
        // Implement actual alerting
    }
}

🛠️ Hands-on: Building Production-Grade Streaming System

1. Environment Setup

# Docker Compose Kafka cluster configuration
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

2. Real-time Log Processing System

// Real-time log processing system
public class LogProcessingSystem {
    
    public static void main(String[] args) {
        // Producer configuration
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("compression.type", "snappy");
        producerProps.put("batch.size", 16384);
        producerProps.put("linger.ms", 5);
        
        // Consumer configuration
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "log-processor");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("enable.auto.commit", false);
        
        // Log collection thread
        Thread logCollector = new Thread(() -> {
            try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
                while (true) {
                    // Read from log file
                    String logLine = readLogLine();
                    if (logLine != null) {
                        ProducerRecord<String, String> record = new ProducerRecord<>("raw-logs", logLine);
                        producer.send(record, (metadata, exception) -> {
                            if (exception != null) {
                                System.err.println("Error sending log: " + exception.getMessage());
                            }
                        });
                    }
                    Thread.sleep(100); // 100ms interval
                }
            } catch (Exception e) {
                System.err.println("Error in log collector: " + e.getMessage());
            }
        });
        
        // Log processing thread
        Thread logProcessor = new Thread(() -> {
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
                consumer.subscribe(Arrays.asList("raw-logs"));
                
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                    
                    for (ConsumerRecord<String, String> record : records) {
                        // Parse and process log
                        LogEntry logEntry = parseLog(record.value());
                        if (logEntry != null) {
                            processLogEntry(logEntry);
                        }
                    }
                    
                    // Commit offset
                    consumer.commitSync();
                }
            } catch (Exception e) {
                System.err.println("Error in log processor: " + e.getMessage());
            }
        });
        
        // Start threads
        logCollector.start();
        logProcessor.start();
        
        // Wait for shutdown signal
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logCollector.interrupt();
            logProcessor.interrupt();
        }));
    }
    
    private static String readLogLine() {
        // Implement reading from actual log file
        return "2025-09-09 10:30:45 INFO User login successful user_id=12345";
    }
    
    private static LogEntry parseLog(String logLine) {
        // Implement log parsing logic
        String[] parts = logLine.split(" ");
        if (parts.length >= 4) {
            return new LogEntry(parts[0] + " " + parts[1], parts[2], parts[3]);
        }
        return null;
    }
    
    private static void processLogEntry(LogEntry logEntry) {
        // Implement log processing logic
        System.out.println("Processing log: " + logEntry);
    }
    
    static class LogEntry {
        String timestamp;
        String level;
        String message;
        
        LogEntry(String timestamp, String level, String message) {
            this.timestamp = timestamp;
            this.level = level;
            this.message = message;
        }
        
        @Override
        public String toString() {
            return String.format("[%s] %s: %s", timestamp, level, message);
        }
    }
}

3. Event-driven Microservices

// Event-driven microservices example
public class EventDrivenMicroservice {
    
    public static void main(String[] args) {
        // Event processing service
        EventProcessor eventProcessor = new EventProcessor();
        eventProcessor.start();
        
        // Order service
        OrderService orderService = new OrderService();
        orderService.start();
        
        // Inventory service
        InventoryService inventoryService = new InventoryService();
        inventoryService.start();
    }
    
    // Event processor
    static class EventProcessor {
        private KafkaConsumer<String, String> consumer;
        
        public void start() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "event-processor");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("orders", "inventory", "payments"));
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    processEvent(record.topic(), record.key(), record.value());
                }
            }
        }
        
        private void processEvent(String topic, String key, String value) {
            switch (topic) {
                case "orders":
                    handleOrderEvent(key, value);
                    break;
                case "inventory":
                    handleInventoryEvent(key, value);
                    break;
                case "payments":
                    handlePaymentEvent(key, value);
                    break;
            }
        }
        
        private void handleOrderEvent(String key, String value) {
            System.out.println("Processing order event: " + key + " -> " + value);
            // Order event processing logic
        }
        
        private void handleInventoryEvent(String key, String value) {
            System.out.println("Processing inventory event: " + key + " -> " + value);
            // Inventory event processing logic
        }
        
        private void handlePaymentEvent(String key, String value) {
            System.out.println("Processing payment event: " + key + " -> " + value);
            // Payment event processing logic
        }
    }
}

📚 Learning Summary

What We Learned in This Post

  1. Deep Understanding of Kafka Architecture
    • Core component analysis
    • Message delivery guarantee levels
    • Partitioning and replication strategies
  2. Producer Optimization Strategies
    • Batch processing optimization
    • Partitioning strategies
    • Compression and serialization
  3. Consumer Groups and Partitioning
    • Consumer group management
    • Manual offset management
    • Rebalancing handling
  4. Stream Processing and KStreams
    • KStreams basic configuration
    • Stream processing examples
    • State store utilization
  5. Monitoring and Operations Management
    • JMX metrics collection
    • Kafka management tools
    • Alert system implementation
  6. Building Production-Grade Streaming System
    • Real-time log processing system
    • Event-driven microservices
    • Docker environment configuration

Key Concepts Summary

Concept Description Importance
Producer Optimization Performance and stability improvement ⭐⭐⭐⭐⭐
Consumer Groups Parallel processing and load distribution ⭐⭐⭐⭐⭐
Stream Processing Real-time data transformation ⭐⭐⭐⭐⭐
Monitoring System status tracking ⭐⭐⭐⭐

Practical Application Considerations

  1. Performance Optimization: Batch processing, compression, partitioning
  2. Stability: Replication, offset management, error handling
  3. Monitoring: Metrics collection, alerts, log analysis
  4. Scalability: Cluster configuration, load distribution

With this guide, you can master the advanced features of Apache Kafka and build stable and efficient streaming systems in production! 🚀