Apache Kafka 실시간 스트리밍 가이드: 프로듀서부터 컨슈머까지
⏱️ 30분
📊 고급
Apache Kafka 실시간 스트리밍 가이드: 프로듀서부터 컨슈머까지
대용량 실시간 데이터를 처리하는 Apache Kafka의 핵심 개념과 실무 활용 방법을 학습하고 실제 프로젝트에 적용해봅니다.
📋 목차
- Kafka 아키텍처 심화 이해
- 프로듀서 최적화 전략
- 컨슈머 그룹과 파티셔닝
- 스트리밍 처리와 KStreams
- 모니터링과 운영 관리
- 실습: 실무급 스트리밍 시스템 구축
- 성능 튜닝과 확장성
- 보안과 권한 관리
- 학습 요약
🏗 ️ Kafka 아키텍처 심화 이해
핵심 컴포넌트 분석
Apache Kafka는 다음과 같은 핵심 컴포넌트들로 구성됩니다:
1. Broker
- 역할: 메시지 저장 및 전달의 핵심 엔진
- 동작 원리: 파티션별로 메시지를 순차적으로 저장
- 성능 최적화: 배치 처리, 압축, 인덱싱
2. Topic & Partition
- Topic: 메시지의 논리적 분류
- Partition: Topic의 물리적 분할 단위
- Replication: 데이터 안정성 보장
3. Producer
- 역할: 메시지 생산 및 전송
- 특징: 비동기 전송, 배치 처리, 압축
- 성능: Throughput과 Latency의 균형
4. Consumer
- 역할: 메시지 소비 및 처리
- Consumer Group: 병렬 처리 및 부하 분산
- Offset Management: 메시지 처리 상태 추적
메시지 전달 보장 (Delivery Guarantees)
# 메시지 전달 보장 레벨
at-least-once: # 최소 한 번 전달 (중복 가능)
exactly-once: # 정확히 한 번 전달 (중복 없음)
at-most-once: # 최대 한 번 전달 (손실 가능)
⚡ 프로듀서 최적화 전략
1. 배치 처리 최적화
// Producer 설정 최적화
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 배치 처리 설정
props.put("batch.size", 16384); // 배치 크기 (16KB)
props.put("linger.ms", 5); // 배치 대기 시간 (5ms)
props.put("compression.type", "snappy"); // 압축 타입
props.put("acks", "all"); // 모든 복제본 확인
props.put("retries", 3); // 재시도 횟수
props.put("retry.backoff.ms", 100); // 재시도 간격
// 고성능 설정
props.put("buffer.memory", 33554432); // 버퍼 메모리 (32MB)
props.put("max.in.flight.requests.per.connection", 5); // 동시 요청 수
props.put("enable.idempotence", true); // 멱등성 보장
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
2. 파티셔닝 전략
// 커스텀 파티셔너 구현
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 키 기반 파티셔닝
if (key != null) {
return Math.abs(key.hashCode()) % numPartitions;
}
// 라운드 로빈 파티셔닝
return ThreadLocalRandom.current().nextInt(numPartitions);
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
// 파티셔너 설정
props.put("partitioner.class", "com.example.CustomPartitioner");
3. 압축과 압축
// 압축 설정 비교
public class CompressionComparison {
// Snappy 압축 (빠른 압축/해제)
public void setupSnappyCompression() {
props.put("compression.type", "snappy");
// 장점: 낮은 CPU 사용량, 빠른 처리
// 단점: 압축률이 낮음
}
// Gzip 압축 (높은 압축률)
public void setupGzipCompression() {
props.put("compression.type", "gzip");
// 장점: 높은 압축률, 네트워크 대역폭 절약
// 단점: 높은 CPU 사용량
}
// LZ4 압축 (균형잡힌 성능)
public void setupLZ4Compression() {
props.put("compression.type", "lz4");
// 장점: 빠른 압축/해제, 적당한 압축률
// 단점: Snappy보다 약간 느림
}
}
🔄 컨슈머 그룹과 파티셔닝
1. 컨슈머 그룹 관리
// 컨슈머 그룹 설정
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "my-consumer-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 오프셋 관리 설정
consumerProps.put("auto.offset.reset", "earliest"); // earliest, latest, none
consumerProps.put("enable.auto.commit", false); // 수동 오프셋 커밋
consumerProps.put("max.poll.records", 500); // 한 번에 처리할 레코드 수
consumerProps.put("session.timeout.ms", 30000); // 세션 타임아웃
consumerProps.put("heartbeat.interval.ms", 3000); // 하트비트 간격
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
2. 수동 오프셋 관리
// 수동 오프셋 커밋 예제
public class ManualOffsetCommit {
private KafkaConsumer<String, String> consumer;
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
public void processMessages() {
consumer.subscribe(Arrays.asList("my-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 메시지 처리
processMessage(record);
// 오프셋 저장 (다음에 커밋할 오프셋)
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offset = new OffsetAndMetadata(record.offset() + 1);
offsets.put(partition, offset);
}
// 배치 단위로 오프셋 커밋
if (!offsets.isEmpty()) {
consumer.commitSync(offsets);
offsets.clear();
}
}
} catch (Exception e) {
// 오류 발생 시 오프셋 롤백
consumer.seekToBeginning(consumer.assignment());
}
}
private void processMessage(ConsumerRecord<String, String> record) {
// 메시지 처리 로직
System.out.printf("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
3. 리밸런싱 처리
// 리밸런싱 리스너 구현
public class RebalanceListener implements ConsumerRebalanceListener {
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
private KafkaConsumer<String, String> consumer;
public RebalanceListener(KafkaConsumer<String, String> consumer) {
this.consumer = consumer;
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 파티션 해제 전 현재 오프셋 커밋
System.out.println("Partitions revoked: " + partitions);
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 파티션 할당 후 오프셋 복원
System.out.println("Partitions assigned: " + partitions);
for (TopicPartition partition : partitions) {
consumer.seekToBeginning(Arrays.asList(partition));
}
}
public void addOffset(TopicPartition partition, long offset) {
currentOffsets.put(partition, new OffsetAndMetadata(offset));
}
}
🌊 스트리밍 처리와 KStreams
1. KStreams 기본 설정
// KStreams 애플리케이션 설정
Properties streamsProps = new Properties();
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-streams-app");
streamsProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 상태 저장소 설정
streamsProps.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
streamsProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
// 성능 최적화 설정
streamsProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024); // 10MB
StreamsBuilder builder = new StreamsBuilder();
2. 스트림 처리 예제
// 실시간 데이터 변환 및 집계
public class StreamProcessingExample {
public void setupStreamProcessing() {
StreamsBuilder builder = new StreamsBuilder();
// 소스 스트림 생성
KStream<String, String> sourceStream = builder.stream("input-topic");
// 데이터 변환
KStream<String, String> transformedStream = sourceStream
.filter((key, value) -> value != null && !value.isEmpty())
.mapValues(value -> value.toUpperCase())
.map((key, value) -> KeyValue.pair(key, "PROCESSED: " + value));
// 윈도우 집계
KTable<Windowed<String>, Long> windowedCounts = sourceStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
// 결과를 출력 토픽으로 전송
transformedStream.to("output-topic");
windowedCounts.toStream().to("aggregated-topic");
// 스트림 시작
KafkaStreams streams = new KafkaStreams(builder.build(), streamsProps);
streams.start();
// 종료 시그널 처리
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
3. 상태 저장소 활용
// 상태 저장소를 활용한 집계
public class StatefulStreamProcessing {
public void setupStatefulProcessing() {
StreamsBuilder builder = new StreamsBuilder();
// 상태 저장소 생성
StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("user-counts"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
// 상태 저장소를 사용한 집계
KStream<String, String> sourceStream = builder.stream("user-events");
sourceStream
.groupByKey()
.aggregate(
() -> 0L,
(key, value, aggregate) -> aggregate + 1,
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("user-counts")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
)
.toStream()
.to("user-counts-topic");
}
}
📊 모니터링과 운영 관리
1. JMX 메트릭 수집
// JMX 메트릭 모니터링
public class KafkaMetricsMonitor {
public void setupJMXMonitoring() {
// JMX 설정
System.setProperty("com.sun.management.jmxremote", "true");
System.setProperty("com.sun.management.jmxremote.port", "9999");
System.setProperty("com.sun.management.jmxremote.authenticate", "false");
System.setProperty("com.sun.management.jmxremote.ssl", "false");
// 메트릭 수집
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// Producer 메트릭
ObjectName producerMetrics = new ObjectName("kafka.producer:type=producer-metrics,client-id=my-producer");
ObjectName consumerMetrics = new ObjectName("kafka.consumer:type=consumer-metrics,client-id=my-consumer");
// 메트릭 값 조회
Double recordSendRate = (Double) mBeanServer.getAttribute(producerMetrics, "record-send-rate");
Double recordSendTotal = (Double) mBeanServer.getAttribute(producerMetrics, "record-send-total");
System.out.println("Record Send Rate: " + recordSendRate);
System.out.println("Record Send Total: " + recordSendTotal);
}
}
2. 카프카 관리 도구
#!/bin/bash
# Kafka 관리 스크립트
# 토픽 생성
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic my-topic \
--partitions 3 \
--replication-factor 2
# 토픽 목록 조회
kafka-topics.sh --list --bootstrap-server localhost:9092
# 토픽 상세 정보
kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic my-topic
# 컨슈머 그룹 상태 확인
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --describe
# 오프셋 리셋
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-consumer-group --reset-offsets \
--to-earliest --topic my-topic --execute
# 메시지 전송 테스트
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic my-topic
# 메시지 수신 테스트
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic my-topic --from-beginning
3. 알림 시스템 구축
// Kafka 모니터링 알림 시스템
public class KafkaAlertSystem {
public void setupAlerting() {
// 메트릭 수집 스레드
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
// 지연 시간 모니터링
double consumerLag = getConsumerLag();
if (consumerLag > 1000) {
sendAlert("High consumer lag detected: " + consumerLag);
}
// 처리량 모니터링
double throughput = getThroughput();
if (throughput < 100) {
sendAlert("Low throughput detected: " + throughput);
}
// 에러율 모니터링
double errorRate = getErrorRate();
if (errorRate > 0.01) {
sendAlert("High error rate detected: " + errorRate);
}
} catch (Exception e) {
System.err.println("Error in monitoring: " + e.getMessage());
}
}, 0, 30, TimeUnit.SECONDS);
}
private void sendAlert(String message) {
// Slack, 이메일, SMS 등으로 알림 전송
System.out.println("ALERT: " + message);
// 실제 알림 구현
}
}
🛠 ️ 실습: 실무급 스트리밍 시스템 구축
1. 환경 설정
# Docker Compose로 Kafka 클러스터 구성
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
kafka-ui:
image: provectuslabs/kafka-ui:latest
depends_on:
- kafka
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
2. 실시간 로그 처리 시스템
// 실시간 로그 처리 시스템
public class LogProcessingSystem {
public static void main(String[] args) {
// Producer 설정
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("compression.type", "snappy");
producerProps.put("batch.size", 16384);
producerProps.put("linger.ms", 5);
// Consumer 설정
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("group.id", "log-processor");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("auto.offset.reset", "earliest");
consumerProps.put("enable.auto.commit", false);
// 로그 수집 스레드
Thread logCollector = new Thread(() -> {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
while (true) {
// 로그 파일에서 읽기
String logLine = readLogLine();
if (logLine != null) {
ProducerRecord<String, String> record = new ProducerRecord<>("raw-logs", logLine);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Error sending log: " + exception.getMessage());
}
});
}
Thread.sleep(100); // 100ms 간격
}
} catch (Exception e) {
System.err.println("Error in log collector: " + e.getMessage());
}
});
// 로그 처리 스레드
Thread logProcessor = new Thread(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Arrays.asList("raw-logs"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// 로그 파싱 및 처리
LogEntry logEntry = parseLog(record.value());
if (logEntry != null) {
processLogEntry(logEntry);
}
}
// 오프셋 커밋
consumer.commitSync();
}
} catch (Exception e) {
System.err.println("Error in log processor: " + e.getMessage());
}
});
// 스레드 시작
logCollector.start();
logProcessor.start();
// 종료 시그널 대기
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logCollector.interrupt();
logProcessor.interrupt();
}));
}
private static String readLogLine() {
// 실제 로그 파일에서 읽기 구현
return "2025-09-09 10:30:45 INFO User login successful user_id=12345";
}
private static LogEntry parseLog(String logLine) {
// 로그 파싱 로직 구현
String[] parts = logLine.split(" ");
if (parts.length >= 4) {
return new LogEntry(parts[0] + " " + parts[1], parts[2], parts[3]);
}
return null;
}
private static void processLogEntry(LogEntry logEntry) {
// 로그 처리 로직 구현
System.out.println("Processing log: " + logEntry);
}
static class LogEntry {
String timestamp;
String level;
String message;
LogEntry(String timestamp, String level, String message) {
this.timestamp = timestamp;
this.level = level;
this.message = message;
}
@Override
public String toString() {
return String.format("[%s] %s: %s", timestamp, level, message);
}
}
}
3. 이벤트 기반 마이크로서비스
// 이벤트 기반 마이크로서비스 예제
public class EventDrivenMicroservice {
public static void main(String[] args) {
// 이벤트 처리 서비스
EventProcessor eventProcessor = new EventProcessor();
eventProcessor.start();
// 주문 서비스
OrderService orderService = new OrderService();
orderService.start();
// 재고 서비스
InventoryService inventoryService = new InventoryService();
inventoryService.start();
}
// 이벤트 처리기
static class EventProcessor {
private KafkaConsumer<String, String> consumer;
public void start() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "event-processor");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders", "inventory", "payments"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
processEvent(record.topic(), record.key(), record.value());
}
}
}
private void processEvent(String topic, String key, String value) {
switch (topic) {
case "orders":
handleOrderEvent(key, value);
break;
case "inventory":
handleInventoryEvent(key, value);
break;
case "payments":
handlePaymentEvent(key, value);
break;
}
}
private void handleOrderEvent(String key, String value) {
System.out.println("Processing order event: " + key + " -> " + value);
// 주문 이벤트 처리 로직
}
private void handleInventoryEvent(String key, String value) {
System.out.println("Processing inventory event: " + key + " -> " + value);
// 재고 이벤트 처리 로직
}
private void handlePaymentEvent(String key, String value) {
System.out.println("Processing payment event: " + key + " -> " + value);
// 결제 이벤트 처리 로직
}
}
}
📚 학습 요약
이번 포스트에서 학습한 내용
- Kafka 아키텍처 심화 이해
- 핵심 컴포넌트 분석
- 메시지 전달 보장 레벨
- 파티셔닝과 복제 전략
- 프로듀서 최적화 전략
- 배치 처리 최적화
- 파티셔닝 전략
- 압축과 압축
- 컨슈머 그룹과 파티셔닝
- 컨슈머 그룹 관리
- 수동 오프셋 관리
- 리밸런싱 처리
- 스트리밍 처리와 KStreams
- KStreams 기본 설정
- 스트림 처리 예제
- 상태 저장소 활용
- 모니터링과 운영 관리
- JMX 메트릭 수집
- 카프카 관리 도구
- 알림 시스템 구축
- 실무급 스트리밍 시스템 구축
- 실시간 로그 처리 시스템
- 이벤트 기반 마이크로서비스
- Docker 환경 구성
핵심 개념 정리
| 개념 | 설명 | 중요도 |
|---|---|---|
| 프로듀서 최적화 | 성능과 안정성 향상 | ⭐⭐⭐⭐⭐ |
| 컨슈머 그룹 | 병렬 처리 및 부하 분산 | ⭐⭐⭐⭐⭐ |
| 스트리밍 처리 | 실시간 데이터 변환 | ⭐⭐⭐⭐⭐ |
| 모니터링 | 시스템 상태 추적 | ⭐⭐⭐⭐ |
실무 적용 시 고려사항
- 성능 최적화: 배치 처리, 압축, 파티셔닝
- 안정성: 복제, 오프셋 관리, 에러 처리
- 모니터링: 메트릭 수집, 알림, 로그 분석
- 확장성: 클러스터 구성, 부하 분산
이 가이드를 통해 Apache Kafka의 고급 기능들을 마스터하고, 실무에서 안정적이고 효율적인 스트리밍 시스템을 구축할 수 있습니다! 🚀