Part 1: Apache Flink Basics and Core Concepts - The Beginning of True Streaming Processing
📚 Apache flink complete guide 시리즈
Part 2
⏱️ 35 min
📊 Intermediate
Part 1: Apache Flink Basics and Core Concepts - The Beginning of True Streaming Processing
Learn Apache Flink’s basic structure and core concepts including DataStream API, state management, and time processing through hands-on practice.
📋 Table of Contents
- What is Apache Flink?
- Flink Architecture and Core Concepts
- DataStream API Basics
- Time Processing and Watermarking
- State Management Basics
- Real-world Project: Real-time Log Analysis
- Learning Summary
🚀 What is Apache Flink?
Flink’s Origins
Apache Flink started from the Stratosphere project at TU Berlin in 2009. It became a Top-level project of the Apache Software Foundation in 2014, designed with the goal of true streaming processing.
Core Design Philosophy
- True Streaming: True streaming processing, not micro-batch
- Low Latency: Achieving millisecond-level latency
- High Throughput: Maintaining high processing throughput
- Exactly-Once: Guaranteeing exactly-once processing
- Fault Tolerance: Strong failure recovery capabilities
Fundamental Differences from Spark
# Spark: Micro-batch approach
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
spark = SparkSession.builder.appName("SparkStreaming").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 5) # 5-second batch interval
# Flink: True streaming approach
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# No batch interval - processes events immediately upon arrival
🏗️ Flink Architecture and Core Concepts
Cluster Architecture
┌─────────────────────────────────────────────────────────────┐
│ Flink Cluster │
├─────────────────────────────────────────────────────────────┤
│ JobManager (Master) │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Dispatcher │ │ ResourceManager│ │
│ └─────────────────┘ └─────────────────┘ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ JobMaster │ │ CheckpointCoord│ │
│ └─────────────────┘ └─────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ TaskManager (Worker) │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Task 1 │ │ Task 2 │ │
│ └─────────────────┘ └─────────────────┘ │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Task 3 │ │ Task 4 │ │
│ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Core Components
1. JobManager
- Role: Job scheduling and resource management
- Components:
- Dispatcher: Job submission interface
- ResourceManager: Resource allocation management
- JobMaster: Individual job management
- CheckpointCoordinator: Checkpoint coordination
2. TaskManager
- Role: Actual data processing execution
- Features:
- Parallel processing through Task Slots
- State storage provision
- Network communication handling
Flink Program Structure
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
# 1. Create execution environment
env = StreamExecutionEnvironment.get_execution_environment()
# 2. Create data source
data_stream = env.from_collection([1, 2, 3, 4, 5])
# 3. Apply transformation operations
result_stream = data_stream.map(lambda x: x * 2)
# 4. Specify data sink
result_stream.print()
# 5. Execute program
env.execute("Basic Flink Program")
📊 DataStream API Basics
Basic Data Types
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# Basic data types
basic_types = [
"Hello", "World", 123, 45.67, True
]
data_stream = env.from_collection(basic_types)
# Explicit type information specification
typed_stream = data_stream.map(
lambda x: str(x).upper(),
output_type=Types.STRING()
)
Core Transformation Operations
1. Map Operation
# Single element transformation
def multiply_by_two(x):
return x * 2
data_stream = env.from_collection([1, 2, 3, 4, 5])
result = data_stream.map(multiply_by_two)
2. Filter Operation
# Filtering based on conditions
def is_even(x):
return x % 2 == 0
numbers = env.from_collection([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
even_numbers = numbers.filter(is_even)
3. FlatMap Operation
# 1:N transformation
def split_words(sentence):
return sentence.split()
text_stream = env.from_collection([
"Hello World",
"Apache Flink",
"Stream Processing"
])
words = text_stream.flat_map(split_words)
4. KeyBy Operation
from pyflink.common.typeinfo import Types
# Grouping by key
class LogEntry:
def __init__(self, user_id, action, timestamp):
self.user_id = user_id
self.action = action
self.timestamp = timestamp
# Sample log data
log_entries = [
LogEntry("user1", "login", "2025-01-01 10:00:00"),
LogEntry("user2", "view", "2025-01-01 10:01:00"),
LogEntry("user1", "logout", "2025-01-01 10:02:00")
]
log_stream = env.from_collection(log_entries)
keyed_stream = log_stream.key_by(lambda log: log.user_id)
Window Operations
1. Tumbling Window
from pyflink.datastream.window import TumblingProcessingTimeWindows
from pyflink.common.time import Time
# Fixed-size window
windowed_stream = keyed_stream.window(
TumblingProcessingTimeWindows.of(Time.seconds(10))
).sum(lambda log: 1) # Calculate event count in each window
2. Sliding Window
from pyflink.datastream.window import SlidingProcessingTimeWindows
# Sliding window (10-second size, 5-second slide)
sliding_window = keyed_stream.window(
SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))
).sum(lambda log: 1)
3. Session Window
from pyflink.datastream.window import ProcessingTimeSessionWindows
# Session window (5-second inactivity time)
session_window = keyed_stream.window(
ProcessingTimeSessionWindows.with_gap(Time.seconds(5))
).sum(lambda log: 1)
⏰ Time Processing and Watermarking
Types of Time
1. Event Time
from pyflink.common.time import Time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types
env = StreamExecutionEnvironment.get_execution_environment()
# Event time-based processing configuration
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# Timestamp and watermark assignment
class TimestampedEvent:
def __init__(self, value, timestamp):
self.value = value
self.timestamp = timestamp
# Watermark generator definition
class TimestampExtractor:
def extract_timestamp(self, element, previous_timestamp):
return element.timestamp * 1000 # Convert to milliseconds
events = env.from_collection([
TimestampedEvent("event1", 1640995200), # 2022-01-01 00:00:00
TimestampedEvent("event2", 1640995260), # 2022-01-01 00:01:00
TimestampedEvent("event3", 1640995320), # 2022-01-01 00:02:00
])
# Watermark assignment
watermarked_stream = events.assign_timestamps_and_watermarks(
TimestampExtractor()
)
2. Processing Time
# Processing time-based processing configuration
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)
# Processing time is automatically assigned
processing_time_stream = env.from_collection([1, 2, 3, 4, 5])
3. Ingestion Time
# Ingestion time-based processing configuration
env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
# Ingestion time is assigned when data arrives at Flink
ingestion_time_stream = env.from_collection([1, 2, 3, 4, 5])
Watermarking Strategies
1. Ascending Timestamps
from pyflink.datastream.functions import AscendingTimestampExtractor
class AscendingWatermarkExtractor(AscendingTimestampExtractor):
def extract_timestamp(self, element, previous_timestamp):
return element.timestamp * 1000
ascending_watermark = events.assign_timestamps_and_watermarks(
AscendingWatermarkExtractor()
)
2. Bounded Out-of-Orderness
from pyflink.datastream.functions import BoundedOutOfOrdernessTimestampExtractor
from pyflink.common.time import Time
class BoundedWatermarkExtractor(BoundedOutOfOrdernessTimestampExtractor):
def __init__(self):
super().__init__(Time.seconds(10)) # Allow 10-second delay
def extract_timestamp(self, element, previous_timestamp):
return element.timestamp * 1000
bounded_watermark = events.assign_timestamps_and_watermarks(
BoundedWatermarkExtractor()
)
🗃️ State Management Basics
Types of State
1. ValueState
from pyflink.common.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types
from pyflink.datastream import KeyedProcessFunction
class CounterFunction(KeyedProcessFunction):
def __init__(self):
self.count_state = None
def open(self, runtime_context):
# Initialize ValueState
self.count_state = runtime_context.get_state(
ValueStateDescriptor("count", Types.INT())
)
def process_element(self, value, ctx):
# Get current state value
current_count = self.count_state.value() or 0
# Update state
new_count = current_count + 1
self.count_state.update(new_count)
# Output result
ctx.collect(f"Key: {value}, Count: {new_count}")
# Usage example
counter_stream = keyed_stream.process(CounterFunction())
2. ListState
from pyflink.common.state import ListStateDescriptor
class ListCollector(KeyedProcessFunction):
def __init__(self):
self.list_state = None
def open(self, runtime_context):
self.list_state = runtime_context.get_list_state(
ListStateDescriptor("items", Types.STRING())
)
def process_element(self, value, ctx):
# Add value to list
self.list_state.add(value)
# Collect list contents
items = []
for item in self.list_state.get():
items.append(item)
ctx.collect(f"Collected items: {items}")
list_stream = keyed_stream.process(ListCollector())
3. MapState
from pyflink.common.state import MapStateDescriptor
class MapAggregator(KeyedProcessFunction):
def __init__(self):
self.map_state = None
def open(self, runtime_context):
self.map_state = runtime_context.get_map_state(
MapStateDescriptor("counts", Types.STRING(), Types.INT())
)
def process_element(self, value, ctx):
# Get current count from map
current_count = self.map_state.get(value) or 0
# Increment count
self.map_state.put(value, current_count + 1)
# Output result
ctx.collect(f"Value: {value}, Count: {current_count + 1}")
map_stream = keyed_stream.process(MapAggregator())
Checkpointing
from pyflink.common import Configuration
from pyflink.common.checkpointing import CheckpointingMode
# Checkpointing configuration
env.get_checkpoint_config().enable_checkpointing(1000) # Checkpoint every 1 second
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)
# Checkpoint directory configuration
env.get_checkpoint_config().set_checkpoint_storage_dir("file:///tmp/flink-checkpoints")
# Maximum concurrent checkpoints
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
# Minimum checkpoint interval
env.get_checkpoint_config().set_min_pause_between_checkpoints(500)
🚀 Real-world Project: Real-time Log Analysis
Project Overview
Build a real-time log analysis system that implements the following features:
- Real-time log collection
- User activity analysis
- Anomaly pattern detection
- Real-time alerts
1. Log Data Modeling
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class LogEvent:
user_id: str
session_id: str
action: str
timestamp: int
ip_address: str
user_agent: str
metadata: Optional[dict] = None
def to_json(self):
return json.dumps({
'user_id': self.user_id,
'session_id': self.session_id,
'action': self.action,
'timestamp': self.timestamp,
'ip_address': self.ip_address,
'user_agent': self.user_agent,
'metadata': self.metadata or {}
})
@classmethod
def from_json(cls, json_str):
data = json.loads(json_str)
return cls(**data)
2. Real-time Log Analysis System
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types
from pyflink.common.state import ValueStateDescriptor, ListStateDescriptor
from pyflink.common.time import Time
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
class LogAnalyzer:
def __init__(self):
self.env = StreamExecutionEnvironment.get_execution_environment()
self.setup_environment()
def setup_environment(self):
# Enable checkpointing
self.env.get_checkpoint_config().enable_checkpointing(1000)
# Set event time
self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
def create_kafka_source(self, topic, bootstrap_servers):
"""Create Kafka source"""
kafka_properties = {
'bootstrap.servers': bootstrap_servers,
'group.id': 'log-analyzer-group'
}
kafka_source = FlinkKafkaConsumer(
topic,
SimpleStringSchema(),
kafka_properties
)
return self.env.add_source(kafka_source)
def parse_log_events(self, log_stream):
"""Parse log events"""
def parse_log(log_str):
try:
return LogEvent.from_json(log_str)
except Exception as e:
print(f"Failed to parse log: {log_str}, Error: {e}")
return None
return log_stream.map(parse_log, output_type=Types.PICKLED_BYTE_ARRAY()) \
.filter(lambda x: x is not None)
def detect_anomalies(self, log_stream):
"""Anomaly pattern detection"""
class AnomalyDetector(KeyedProcessFunction):
def __init__(self):
self.failed_login_count = None
self.last_login_time = None
self.suspicious_actions = None
def open(self, runtime_context):
self.failed_login_count = runtime_context.get_state(
ValueStateDescriptor("failed_logins", Types.INT())
)
self.last_login_time = runtime_context.get_state(
ValueStateDescriptor("last_login", Types.LONG())
)
self.suspicious_actions = runtime_context.get_list_state(
ListStateDescriptor("suspicious", Types.STRING())
)
def process_element(self, log_event, ctx):
current_time = log_event.timestamp
# Track failed login attempts
if log_event.action == "login_failed":
current_count = self.failed_login_count.value() or 0
self.failed_login_count.update(current_count + 1)
# Alert if 5 or more failures within 5 minutes
if current_count >= 4: # Starting from 0, so 4 means 5th
ctx.collect({
'type': 'security_alert',
'user_id': log_event.user_id,
'message': f'Multiple failed login attempts: {current_count + 1}',
'timestamp': current_time
})
self.failed_login_count.clear()
# Clear failure count on successful login
elif log_event.action == "login_success":
self.failed_login_count.clear()
self.last_login_time.update(current_time)
# Detect abnormal time activity
elif log_event.action in ["view", "purchase", "download"]:
last_login = self.last_login_time.value()
if last_login and (current_time - last_login) > 86400: # 24 hours
ctx.collect({
'type': 'anomaly_alert',
'user_id': log_event.user_id,
'message': f'Activity after long inactivity: {log_event.action}',
'timestamp': current_time
})
# Key by user and detect anomalies
return log_stream.key_by(lambda log: log.user_id) \
.process(AnomalyDetector())
def calculate_user_metrics(self, log_stream):
"""Calculate user metrics"""
class UserMetricsCalculator(KeyedProcessFunction):
def __init__(self):
self.action_counts = None
self.session_duration = None
self.last_action_time = None
def open(self, runtime_context):
self.action_counts = runtime_context.get_map_state(
MapStateDescriptor("action_counts", Types.STRING(), Types.INT())
)
self.session_duration = runtime_context.get_state(
ValueStateDescriptor("session_duration", Types.LONG())
)
self.last_action_time = runtime_context.get_state(
ValueStateDescriptor("last_action_time", Types.LONG())
)
def process_element(self, log_event, ctx):
current_time = log_event.timestamp
# Update action counts
current_count = self.action_counts.get(log_event.action) or 0
self.action_counts.put(log_event.action, current_count + 1)
# Calculate session duration
last_time = self.last_action_time.value()
if last_time:
duration = current_time - last_time
current_duration = self.session_duration.value() or 0
self.session_duration.update(current_duration + duration)
self.last_action_time.update(current_time)
# Output metrics every minute
if current_time % 60 == 0:
metrics = {
'user_id': log_event.user_id,
'timestamp': current_time,
'session_duration': self.session_duration.value() or 0,
'action_counts': dict(self.action_counts.items())
}
ctx.collect(metrics)
return log_stream.key_by(lambda log: log.user_id) \
.process(UserMetricsCalculator())
def run_analysis(self):
"""Run analysis pipeline"""
# Read logs from Kafka
log_stream = self.create_kafka_source("user-logs", "localhost:9092")
# Parse logs
parsed_logs = self.parse_log_events(log_stream)
# Assign timestamps and watermarks
watermarked_logs = parsed_logs.assign_timestamps_and_watermarks(
TimestampExtractor()
)
# Detect anomalies
anomalies = self.detect_anomalies(watermarked_logs)
# Calculate user metrics
metrics = self.calculate_user_metrics(watermarked_logs)
# Output results
anomalies.print("Security Alerts")
metrics.print("User Metrics")
# Execute
self.env.execute("Real-time Log Analysis")
# Watermark extractor
class TimestampExtractor(BoundedOutOfOrdernessTimestampExtractor):
def __init__(self):
super().__init__(Time.seconds(10)) # Allow 10-second delay
def extract_timestamp(self, element, previous_timestamp):
return element.timestamp * 1000
if __name__ == "__main__":
analyzer = LogAnalyzer()
analyzer.run_analysis()
3. Execution and Testing
# Test data generator
class LogGenerator:
def __init__(self):
import random
import time
self.users = ["user1", "user2", "user3", "user4", "user5"]
self.actions = ["login", "logout", "view", "purchase", "download", "login_failed"]
self.ip_addresses = ["192.168.1.1", "192.168.1.2", "10.0.0.1", "10.0.0.2"]
def generate_log(self):
import random
import time
return LogEvent(
user_id=random.choice(self.users),
session_id=f"session_{random.randint(1000, 9999)}",
action=random.choice(self.actions),
timestamp=int(time.time()),
ip_address=random.choice(self.ip_addresses),
user_agent="Mozilla/5.0 (Test Browser)"
).to_json()
# Test execution
def test_log_analysis():
env = StreamExecutionEnvironment.get_execution_environment()
# Generate test data
generator = LogGenerator()
test_logs = [generator.generate_log() for _ in range(100)]
log_stream = env.from_collection(test_logs)
# Parse logs
parsed_logs = log_stream.map(
lambda x: LogEvent.from_json(x),
output_type=Types.PICKLED_BYTE_ARRAY()
).filter(lambda x: x is not None)
# Calculate simple statistics
user_actions = parsed_logs.map(
lambda log: (log.user_id, log.action, 1)
).key_by(lambda x: x[0]) \
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) \
.sum(2)
user_actions.print()
env.execute("Log Analysis Test")
# Test execution
if __name__ == "__main__":
test_log_analysis()
📚 Learning Summary
What We Learned in This Part
- Apache Flink Introduction
- Flink’s origins and design philosophy
- Fundamental differences from Spark
- Concept of true streaming processing
- Flink Architecture
- Roles of JobManager and TaskManager
- Cluster configuration and resource management
- Program execution structure
- DataStream API
- Basic data types and transformation operations
- Map, Filter, FlatMap, KeyBy operations
- Window operations (Tumbling, Sliding, Session)
- Time Processing
- Event Time, Processing Time, Ingestion Time
- Watermarking strategies and late data processing
- Timestamp assignment methods
- State Management
- ValueState, ListState, MapState
- Checkpointing and failure recovery
- State-based processing logic
- Real-world Project
- Real-time log analysis system
- Anomaly pattern detection
- User metrics calculation
Core Technology Stack
Technology | Purpose | Importance |
---|---|---|
DataStream API | Streaming data processing | ⭐⭐⭐⭐⭐ |
State Management | State-based processing | ⭐⭐⭐⭐⭐ |
Time Processing | Event time-based analysis | ⭐⭐⭐⭐⭐ |
Window Operations | Time-based aggregation | ⭐⭐⭐⭐ |
Checkpointing | Failure recovery | ⭐⭐⭐⭐ |
Next Part Preview
Part 2: Advanced Streaming Processing and State Management will cover:
- Advanced state management patterns
- Deep dive into checkpointing and savepoints
- Complex time processing strategies
- Performance optimization techniques
Next Part: Part 2: Advanced Streaming Processing and State Management
Now you’ve mastered the basics of Flink! In the next part, we’ll explore more advanced features. 🚀