Apache Kafka 실시간 스트리밍 가이드: 프로듀서부터 컨슈머까지

대용량 실시간 데이터를 처리하는 Apache Kafka의 핵심 개념과 실무 활용 방법을 학습하고 실제 프로젝트에 적용해봅니다.

📋 목차

  1. Kafka 아키텍처 심화 이해
  2. 프로듀서 최적화 전략
  3. 컨슈머 그룹과 파티셔닝
  4. 스트리밍 처리와 KStreams
  5. 모니터링과 운영 관리
  6. 실습: 실무급 스트리밍 시스템 구축
  7. 성능 튜닝과 확장성
  8. 보안과 권한 관리
  9. 학습 요약

🏗 ️ Kafka 아키텍처 심화 이해

핵심 컴포넌트 분석

Apache Kafka는 다음과 같은 핵심 컴포넌트들로 구성됩니다:

1. Broker

  • 역할: 메시지 저장 및 전달의 핵심 엔진
  • 동작 원리: 파티션별로 메시지를 순차적으로 저장
  • 성능 최적화: 배치 처리, 압축, 인덱싱

2. Topic & Partition

  • Topic: 메시지의 논리적 분류
  • Partition: Topic의 물리적 분할 단위
  • Replication: 데이터 안정성 보장

3. Producer

  • 역할: 메시지 생산 및 전송
  • 특징: 비동기 전송, 배치 처리, 압축
  • 성능: Throughput과 Latency의 균형

4. Consumer

  • 역할: 메시지 소비 및 처리
  • Consumer Group: 병렬 처리 및 부하 분산
  • Offset Management: 메시지 처리 상태 추적

메시지 전달 보장 (Delivery Guarantees)

# 메시지 전달 보장 레벨
at-least-once:  # 최소 한 번 전달 (중복 가능)
exactly-once:   # 정확히 한 번 전달 (중복 없음)
at-most-once:   # 최대 한 번 전달 (손실 가능)

⚡ 프로듀서 최적화 전략

1. 배치 처리 최적화

// Producer 설정 최적화
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// 배치 처리 설정
props.put("batch.size", 16384);                    // 배치 크기 (16KB)
props.put("linger.ms", 5);                         // 배치 대기 시간 (5ms)
props.put("compression.type", "snappy");           // 압축 타입
props.put("acks", "all");                          // 모든 복제본 확인
props.put("retries", 3);                           // 재시도 횟수
props.put("retry.backoff.ms", 100);               // 재시도 간격

// 고성능 설정
props.put("buffer.memory", 33554432);              // 버퍼 메모리 (32MB)
props.put("max.in.flight.requests.per.connection", 5); // 동시 요청 수
props.put("enable.idempotence", true);             // 멱등성 보장

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

2. 파티셔닝 전략

// 커스텀 파티셔너 구현
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        // 키 기반 파티셔닝
        if (key != null) {
            return Math.abs(key.hashCode()) % numPartitions;
        }
        
        // 라운드 로빈 파티셔닝
        return ThreadLocalRandom.current().nextInt(numPartitions);
    }
    
    @Override
    public void close() {}
    
    @Override
    public void configure(Map<String, ?> configs) {}
}

// 파티셔너 설정
props.put("partitioner.class", "com.example.CustomPartitioner");

3. 압축과 압축

// 압축 설정 비교
public class CompressionComparison {
    
    // Snappy 압축 (빠른 압축/해제)
    public void setupSnappyCompression() {
        props.put("compression.type", "snappy");
        // 장점: 낮은 CPU 사용량, 빠른 처리
        // 단점: 압축률이 낮음
    }
    
    // Gzip 압축 (높은 압축률)
    public void setupGzipCompression() {
        props.put("compression.type", "gzip");
        // 장점: 높은 압축률, 네트워크 대역폭 절약
        // 단점: 높은 CPU 사용량
    }
    
    // LZ4 압축 (균형잡힌 성능)
    public void setupLZ4Compression() {
        props.put("compression.type", "lz4");
        // 장점: 빠른 압축/해제, 적당한 압축률
        // 단점: Snappy보다 약간 느림
    }
}

🔄 컨슈머 그룹과 파티셔닝

1. 컨슈머 그룹 관리

// 컨슈머 그룹 설정
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 오프셋 관리 설정
consumerProps.put("auto.offset.reset", "earliest");  // earliest, latest, none
consumerProps.put("enable.auto.commit", false);      // 수동 오프셋 커밋
consumerProps.put("max.poll.records", 500);          // 한 번에 처리할 레코드 수
consumerProps.put("session.timeout.ms", 30000);      // 세션 타임아웃
consumerProps.put("heartbeat.interval.ms", 3000);    // 하트비트 간격

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

2. 수동 오프셋 관리

// 수동 오프셋 커밋 예제
public class ManualOffsetCommit {
    private KafkaConsumer<String, String> consumer;
    private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
    
    public void processMessages() {
        consumer.subscribe(Arrays.asList("my-topic"));
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    // 메시지 처리
                    processMessage(record);
                    
                    // 오프셋 저장 (다음에 커밋할 오프셋)
                    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                    OffsetAndMetadata offset = new OffsetAndMetadata(record.offset() + 1);
                    offsets.put(partition, offset);
                }
                
                // 배치 단위로 오프셋 커밋
                if (!offsets.isEmpty()) {
                    consumer.commitSync(offsets);
                    offsets.clear();
                }
            }
        } catch (Exception e) {
            // 오류 발생 시 오프셋 롤백
            consumer.seekToBeginning(consumer.assignment());
        }
    }
    
    private void processMessage(ConsumerRecord<String, String> record) {
        // 메시지 처리 로직
        System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
                record.topic(), record.partition(), record.offset(), 
                record.key(), record.value());
    }
}

3. 리밸런싱 처리

// 리밸런싱 리스너 구현
public class RebalanceListener implements ConsumerRebalanceListener {
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
    private KafkaConsumer<String, String> consumer;
    
    public RebalanceListener(KafkaConsumer<String, String> consumer) {
        this.consumer = consumer;
    }
    
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 파티션 해제 전 현재 오프셋 커밋
        System.out.println("Partitions revoked: " + partitions);
        consumer.commitSync(currentOffsets);
        currentOffsets.clear();
    }
    
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 파티션 할당 후 오프셋 복원
        System.out.println("Partitions assigned: " + partitions);
        for (TopicPartition partition : partitions) {
            consumer.seekToBeginning(Arrays.asList(partition));
        }
    }
    
    public void addOffset(TopicPartition partition, long offset) {
        currentOffsets.put(partition, new OffsetAndMetadata(offset));
    }
}

🌊 스트리밍 처리와 KStreams

1. KStreams 기본 설정

// KStreams 애플리케이션 설정
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// 상태 저장소 설정
streamsProps.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

// 성능 최적화 설정
streamsProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024); // 10MB

StreamsBuilder builder = new StreamsBuilder();

2. 스트림 처리 예제

// 실시간 데이터 변환 및 집계
public class StreamProcessingExample {
    
    public void setupStreamProcessing() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // 소스 스트림 생성
        KStream<String, String> sourceStream = builder.stream("input-topic");
        
        // 데이터 변환
        KStream<String, String> transformedStream = sourceStream
            .filter((key, value) -> value != null && !value.isEmpty())
            .mapValues(value -> value.toUpperCase())
            .map((key, value) -> KeyValue.pair(key, "PROCESSED: " + value));
        
        // 윈도우 집계
        KTable<Windowed<String>, Long> windowedCounts = sourceStream
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .count();
        
        // 결과를 출력 토픽으로 전송
        transformedStream.to("output-topic");
        windowedCounts.toStream().to("aggregated-topic");
        
        // 스트림 시작
        KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
        streams.start();
        
        // 종료 시그널 처리
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

3. 상태 저장소 활용

// 상태 저장소를 활용한 집계
public class StatefulStreamProcessing {
    
    public void setupStatefulProcessing() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // 상태 저장소 생성
        StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("user-counts"),
            Serdes.String(),
            Serdes.Long()
        );
        builder.addStateStore(storeBuilder);
        
        // 상태 저장소를 사용한 집계
        KStream<String, String> sourceStream = builder.stream("user-events");
        
        sourceStream
            .groupByKey()
            .aggregate(
                () -> 0L,
                (key, value, aggregate) -> aggregate + 1,
                Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("user-counts")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(Serdes.Long())
            )
            .toStream()
            .to("user-counts-topic");
    }
}

📊 모니터링과 운영 관리

1. JMX 메트릭 수집

// JMX 메트릭 모니터링
public class KafkaMetricsMonitor {
    
    public void setupJMXMonitoring() {
        // JMX 설정
        System.setProperty("com.sun.management.jmxremote", "true");
        System.setProperty("com.sun.management.jmxremote.port", "9999");
        System.setProperty("com.sun.management.jmxremote.authenticate", "false");
        System.setProperty("com.sun.management.jmxremote.ssl", "false");
        
        // 메트릭 수집
        MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
        
        // Producer 메트릭
        ObjectName producerMetrics = new ObjectName("kafka.producer:type=producer-metrics,client-id=my-producer");
        ObjectName consumerMetrics = new ObjectName("kafka.consumer:type=consumer-metrics,client-id=my-consumer");
        
        // 메트릭 값 조회
        Double recordSendRate = (Double) mBeanServer.getAttribute(producerMetrics, "record-send-rate");
        Double recordSendTotal = (Double) mBeanServer.getAttribute(producerMetrics, "record-send-total");
        
        System.out.println("Record Send Rate: " + recordSendRate);
        System.out.println("Record Send Total: " + recordSendTotal);
    }
}

2. 카프카 관리 도구

#!/bin/bash
# Kafka 관리 스크립트

# 토픽 생성
kafka-topics.sh --create \
  --bootstrap-server localhost:9092 \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 2

# 토픽 목록 조회
kafka-topics.sh --list --bootstrap-server localhost:9092

# 토픽 상세 정보
kafka-topics.sh --describe \
  --bootstrap-server localhost:9092 \
  --topic my-topic

# 컨슈머 그룹 상태 확인
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-consumer-group --describe

# 오프셋 리셋
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-consumer-group --reset-offsets \
  --to-earliest --topic my-topic --execute

# 메시지 전송 테스트
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic my-topic

# 메시지 수신 테스트
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic my-topic --from-beginning

3. 알림 시스템 구축

// Kafka 모니터링 알림 시스템
public class KafkaAlertSystem {
    
    public void setupAlerting() {
        // 메트릭 수집 스레드
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        
        scheduler.scheduleAtFixedRate(() -> {
            try {
                // 지연 시간 모니터링
                double consumerLag = getConsumerLag();
                if (consumerLag > 1000) {
                    sendAlert("High consumer lag detected: " + consumerLag);
                }
                
                // 처리량 모니터링
                double throughput = getThroughput();
                if (throughput < 100) {
                    sendAlert("Low throughput detected: " + throughput);
                }
                
                // 에러율 모니터링
                double errorRate = getErrorRate();
                if (errorRate > 0.01) {
                    sendAlert("High error rate detected: " + errorRate);
                }
                
            } catch (Exception e) {
                System.err.println("Error in monitoring: " + e.getMessage());
            }
        }, 0, 30, TimeUnit.SECONDS);
    }
    
    private void sendAlert(String message) {
        // Slack, 이메일, SMS 등으로 알림 전송
        System.out.println("ALERT: " + message);
        // 실제 알림 구현
    }
}

🛠 ️ 실습: 실무급 스트리밍 시스템 구축

1. 환경 설정

# Docker Compose로 Kafka 클러스터 구성
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

2. 실시간 로그 처리 시스템

// 실시간 로그 처리 시스템
public class LogProcessingSystem {
    
    public static void main(String[] args) {
        // Producer 설정
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "localhost:9092");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("compression.type", "snappy");
        producerProps.put("batch.size", 16384);
        producerProps.put("linger.ms", 5);
        
        // Consumer 설정
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "localhost:9092");
        consumerProps.put("group.id", "log-processor");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("enable.auto.commit", false);
        
        // 로그 수집 스레드
        Thread logCollector = new Thread(() -> {
            try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
                while (true) {
                    // 로그 파일에서 읽기
                    String logLine = readLogLine();
                    if (logLine != null) {
                        ProducerRecord<String, String> record = new ProducerRecord<>("raw-logs", logLine);
                        producer.send(record, (metadata, exception) -> {
                            if (exception != null) {
                                System.err.println("Error sending log: " + exception.getMessage());
                            }
                        });
                    }
                    Thread.sleep(100); // 100ms 간격
                }
            } catch (Exception e) {
                System.err.println("Error in log collector: " + e.getMessage());
            }
        });
        
        // 로그 처리 스레드
        Thread logProcessor = new Thread(() -> {
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
                consumer.subscribe(Arrays.asList("raw-logs"));
                
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                    
                    for (ConsumerRecord<String, String> record : records) {
                        // 로그 파싱 및 처리
                        LogEntry logEntry = parseLog(record.value());
                        if (logEntry != null) {
                            processLogEntry(logEntry);
                        }
                    }
                    
                    // 오프셋 커밋
                    consumer.commitSync();
                }
            } catch (Exception e) {
                System.err.println("Error in log processor: " + e.getMessage());
            }
        });
        
        // 스레드 시작
        logCollector.start();
        logProcessor.start();
        
        // 종료 시그널 대기
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logCollector.interrupt();
            logProcessor.interrupt();
        }));
    }
    
    private static String readLogLine() {
        // 실제 로그 파일에서 읽기 구현
        return "2025-09-09 10:30:45 INFO User login successful user_id=12345";
    }
    
    private static LogEntry parseLog(String logLine) {
        // 로그 파싱 로직 구현
        String[] parts = logLine.split(" ");
        if (parts.length >= 4) {
            return new LogEntry(parts[0] + " " + parts[1], parts[2], parts[3]);
        }
        return null;
    }
    
    private static void processLogEntry(LogEntry logEntry) {
        // 로그 처리 로직 구현
        System.out.println("Processing log: " + logEntry);
    }
    
    static class LogEntry {
        String timestamp;
        String level;
        String message;
        
        LogEntry(String timestamp, String level, String message) {
            this.timestamp = timestamp;
            this.level = level;
            this.message = message;
        }
        
        @Override
        public String toString() {
            return String.format("[%s] %s: %s", timestamp, level, message);
        }
    }
}

3. 이벤트 기반 마이크로서비스

// 이벤트 기반 마이크로서비스 예제
public class EventDrivenMicroservice {
    
    public static void main(String[] args) {
        // 이벤트 처리 서비스
        EventProcessor eventProcessor = new EventProcessor();
        eventProcessor.start();
        
        // 주문 서비스
        OrderService orderService = new OrderService();
        orderService.start();
        
        // 재고 서비스
        InventoryService inventoryService = new InventoryService();
        inventoryService.start();
    }
    
    // 이벤트 처리기
    static class EventProcessor {
        private KafkaConsumer<String, String> consumer;
        
        public void start() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "event-processor");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("orders", "inventory", "payments"));
            
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    processEvent(record.topic(), record.key(), record.value());
                }
            }
        }
        
        private void processEvent(String topic, String key, String value) {
            switch (topic) {
                case "orders":
                    handleOrderEvent(key, value);
                    break;
                case "inventory":
                    handleInventoryEvent(key, value);
                    break;
                case "payments":
                    handlePaymentEvent(key, value);
                    break;
            }
        }
        
        private void handleOrderEvent(String key, String value) {
            System.out.println("Processing order event: " + key + " -> " + value);
            // 주문 이벤트 처리 로직
        }
        
        private void handleInventoryEvent(String key, String value) {
            System.out.println("Processing inventory event: " + key + " -> " + value);
            // 재고 이벤트 처리 로직
        }
        
        private void handlePaymentEvent(String key, String value) {
            System.out.println("Processing payment event: " + key + " -> " + value);
            // 결제 이벤트 처리 로직
        }
    }
}

📚 학습 요약

이번 포스트에서 학습한 내용

  1. Kafka 아키텍처 심화 이해
    • 핵심 컴포넌트 분석
    • 메시지 전달 보장 레벨
    • 파티셔닝과 복제 전략
  2. 프로듀서 최적화 전략
    • 배치 처리 최적화
    • 파티셔닝 전략
    • 압축과 압축
  3. 컨슈머 그룹과 파티셔닝
    • 컨슈머 그룹 관리
    • 수동 오프셋 관리
    • 리밸런싱 처리
  4. 스트리밍 처리와 KStreams
    • KStreams 기본 설정
    • 스트림 처리 예제
    • 상태 저장소 활용
  5. 모니터링과 운영 관리
    • JMX 메트릭 수집
    • 카프카 관리 도구
    • 알림 시스템 구축
  6. 실무급 스트리밍 시스템 구축
    • 실시간 로그 처리 시스템
    • 이벤트 기반 마이크로서비스
    • Docker 환경 구성

핵심 개념 정리

개념 설명 중요도
프로듀서 최적화 성능과 안정성 향상 ⭐⭐⭐⭐⭐
컨슈머 그룹 병렬 처리 및 부하 분산 ⭐⭐⭐⭐⭐
스트리밍 처리 실시간 데이터 변환 ⭐⭐⭐⭐⭐
모니터링 시스템 상태 추적 ⭐⭐⭐⭐

실무 적용 시 고려사항

  1. 성능 최적화: 배치 처리, 압축, 파티셔닝
  2. 안정성: 복제, 오프셋 관리, 에러 처리
  3. 모니터링: 메트릭 수집, 알림, 로그 분석
  4. 확장성: 클러스터 구성, 부하 분산

이 가이드를 통해 Apache Kafka의 고급 기능들을 마스터하고, 실무에서 안정적이고 효율적인 스트리밍 시스템을 구축할 수 있습니다! 🚀