Part 1: Apache Flink Basics and Core Concepts - The Beginning of True Streaming Processing

Learn Apache Flink’s basic structure and core concepts including DataStream API, state management, and time processing through hands-on practice.

📋 Table of Contents

  1. What is Apache Flink?
  2. Flink Architecture and Core Concepts
  3. DataStream API Basics
  4. Time Processing and Watermarking
  5. State Management Basics
  6. Real-world Project: Real-time Log Analysis
  7. Learning Summary

Apache Flink started from the Stratosphere project at TU Berlin in 2009. It became a Top-level project of the Apache Software Foundation in 2014, designed with the goal of true streaming processing.

Core Design Philosophy

  1. True Streaming: True streaming processing, not micro-batch
  2. Low Latency: Achieving millisecond-level latency
  3. High Throughput: Maintaining high processing throughput
  4. Exactly-Once: Guaranteeing exactly-once processing
  5. Fault Tolerance: Strong failure recovery capabilities

Fundamental Differences from Spark

# Spark: Micro-batch approach
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

spark = SparkSession.builder.appName("SparkStreaming").getOrCreate()
ssc = StreamingContext(spark.sparkContext, 5)  # 5-second batch interval

# Flink: True streaming approach
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
# No batch interval - processes events immediately upon arrival

Cluster Architecture

┌─────────────────────────────────────────────────────────────┐
│                    Flink Cluster                           │
├─────────────────────────────────────────────────────────────┤
│  JobManager (Master)                                       │
│  ┌─────────────────┐  ┌─────────────────┐                │
│  │   Dispatcher    │  │  ResourceManager│                │
│  └─────────────────┘  └─────────────────┘                │
│  ┌─────────────────┐  ┌─────────────────┐                │
│  │   JobMaster     │  │  CheckpointCoord│                │
│  └─────────────────┘  └─────────────────┘                │
├─────────────────────────────────────────────────────────────┤
│  TaskManager (Worker)                                      │
│  ┌─────────────────┐  ┌─────────────────┐                │
│  │     Task 1      │  │     Task 2      │                │
│  └─────────────────┘  └─────────────────┘                │
│  ┌─────────────────┐  ┌─────────────────┐                │
│  │     Task 3      │  │     Task 4      │                │
│  └─────────────────┘  └─────────────────┘                │
└─────────────────────────────────────────────────────────────┘

Core Components

1. JobManager

  • Role: Job scheduling and resource management
  • Components:
    • Dispatcher: Job submission interface
    • ResourceManager: Resource allocation management
    • JobMaster: Individual job management
    • CheckpointCoordinator: Checkpoint coordination

2. TaskManager

  • Role: Actual data processing execution
  • Features:
    • Parallel processing through Task Slots
    • State storage provision
    • Network communication handling
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

# 1. Create execution environment
env = StreamExecutionEnvironment.get_execution_environment()

# 2. Create data source
data_stream = env.from_collection([1, 2, 3, 4, 5])

# 3. Apply transformation operations
result_stream = data_stream.map(lambda x: x * 2)

# 4. Specify data sink
result_stream.print()

# 5. Execute program
env.execute("Basic Flink Program")

📊 DataStream API Basics

Basic Data Types

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()

# Basic data types
basic_types = [
    "Hello", "World", 123, 45.67, True
]

data_stream = env.from_collection(basic_types)

# Explicit type information specification
typed_stream = data_stream.map(
    lambda x: str(x).upper(),
    output_type=Types.STRING()
)

Core Transformation Operations

1. Map Operation

# Single element transformation
def multiply_by_two(x):
    return x * 2

data_stream = env.from_collection([1, 2, 3, 4, 5])
result = data_stream.map(multiply_by_two)

2. Filter Operation

# Filtering based on conditions
def is_even(x):
    return x % 2 == 0

numbers = env.from_collection([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
even_numbers = numbers.filter(is_even)

3. FlatMap Operation

# 1:N transformation
def split_words(sentence):
    return sentence.split()

text_stream = env.from_collection([
    "Hello World",
    "Apache Flink",
    "Stream Processing"
])

words = text_stream.flat_map(split_words)

4. KeyBy Operation

from pyflink.common.typeinfo import Types

# Grouping by key
class LogEntry:
    def __init__(self, user_id, action, timestamp):
        self.user_id = user_id
        self.action = action
        self.timestamp = timestamp

# Sample log data
log_entries = [
    LogEntry("user1", "login", "2025-01-01 10:00:00"),
    LogEntry("user2", "view", "2025-01-01 10:01:00"),
    LogEntry("user1", "logout", "2025-01-01 10:02:00")
]

log_stream = env.from_collection(log_entries)
keyed_stream = log_stream.key_by(lambda log: log.user_id)

Window Operations

1. Tumbling Window

from pyflink.datastream.window import TumblingProcessingTimeWindows
from pyflink.common.time import Time

# Fixed-size window
windowed_stream = keyed_stream.window(
    TumblingProcessingTimeWindows.of(Time.seconds(10))
).sum(lambda log: 1)  # Calculate event count in each window

2. Sliding Window

from pyflink.datastream.window import SlidingProcessingTimeWindows

# Sliding window (10-second size, 5-second slide)
sliding_window = keyed_stream.window(
    SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))
).sum(lambda log: 1)

3. Session Window

from pyflink.datastream.window import ProcessingTimeSessionWindows

# Session window (5-second inactivity time)
session_window = keyed_stream.window(
    ProcessingTimeSessionWindows.with_gap(Time.seconds(5))
).sum(lambda log: 1)

⏰ Time Processing and Watermarking

Types of Time

1. Event Time

from pyflink.common.time import Time
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types

env = StreamExecutionEnvironment.get_execution_environment()

# Event time-based processing configuration
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# Timestamp and watermark assignment
class TimestampedEvent:
    def __init__(self, value, timestamp):
        self.value = value
        self.timestamp = timestamp

# Watermark generator definition
class TimestampExtractor:
    def extract_timestamp(self, element, previous_timestamp):
        return element.timestamp * 1000  # Convert to milliseconds

events = env.from_collection([
    TimestampedEvent("event1", 1640995200),  # 2022-01-01 00:00:00
    TimestampedEvent("event2", 1640995260),  # 2022-01-01 00:01:00
    TimestampedEvent("event3", 1640995320),  # 2022-01-01 00:02:00
])

# Watermark assignment
watermarked_stream = events.assign_timestamps_and_watermarks(
    TimestampExtractor()
)

2. Processing Time

# Processing time-based processing configuration
env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime)

# Processing time is automatically assigned
processing_time_stream = env.from_collection([1, 2, 3, 4, 5])

3. Ingestion Time

# Ingestion time-based processing configuration
env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)

# Ingestion time is assigned when data arrives at Flink
ingestion_time_stream = env.from_collection([1, 2, 3, 4, 5])

Watermarking Strategies

1. Ascending Timestamps

from pyflink.datastream.functions import AscendingTimestampExtractor

class AscendingWatermarkExtractor(AscendingTimestampExtractor):
    def extract_timestamp(self, element, previous_timestamp):
        return element.timestamp * 1000

ascending_watermark = events.assign_timestamps_and_watermarks(
    AscendingWatermarkExtractor()
)

2. Bounded Out-of-Orderness

from pyflink.datastream.functions import BoundedOutOfOrdernessTimestampExtractor
from pyflink.common.time import Time

class BoundedWatermarkExtractor(BoundedOutOfOrdernessTimestampExtractor):
    def __init__(self):
        super().__init__(Time.seconds(10))  # Allow 10-second delay
    
    def extract_timestamp(self, element, previous_timestamp):
        return element.timestamp * 1000

bounded_watermark = events.assign_timestamps_and_watermarks(
    BoundedWatermarkExtractor()
)

🗃️ State Management Basics

Types of State

1. ValueState

from pyflink.common.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types
from pyflink.datastream import KeyedProcessFunction

class CounterFunction(KeyedProcessFunction):
    def __init__(self):
        self.count_state = None
    
    def open(self, runtime_context):
        # Initialize ValueState
        self.count_state = runtime_context.get_state(
            ValueStateDescriptor("count", Types.INT())
        )
    
    def process_element(self, value, ctx):
        # Get current state value
        current_count = self.count_state.value() or 0
        
        # Update state
        new_count = current_count + 1
        self.count_state.update(new_count)
        
        # Output result
        ctx.collect(f"Key: {value}, Count: {new_count}")

# Usage example
counter_stream = keyed_stream.process(CounterFunction())

2. ListState

from pyflink.common.state import ListStateDescriptor

class ListCollector(KeyedProcessFunction):
    def __init__(self):
        self.list_state = None
    
    def open(self, runtime_context):
        self.list_state = runtime_context.get_list_state(
            ListStateDescriptor("items", Types.STRING())
        )
    
    def process_element(self, value, ctx):
        # Add value to list
        self.list_state.add(value)
        
        # Collect list contents
        items = []
        for item in self.list_state.get():
            items.append(item)
        
        ctx.collect(f"Collected items: {items}")

list_stream = keyed_stream.process(ListCollector())

3. MapState

from pyflink.common.state import MapStateDescriptor

class MapAggregator(KeyedProcessFunction):
    def __init__(self):
        self.map_state = None
    
    def open(self, runtime_context):
        self.map_state = runtime_context.get_map_state(
            MapStateDescriptor("counts", Types.STRING(), Types.INT())
        )
    
    def process_element(self, value, ctx):
        # Get current count from map
        current_count = self.map_state.get(value) or 0
        
        # Increment count
        self.map_state.put(value, current_count + 1)
        
        # Output result
        ctx.collect(f"Value: {value}, Count: {current_count + 1}")

map_stream = keyed_stream.process(MapAggregator())

Checkpointing

from pyflink.common import Configuration
from pyflink.common.checkpointing import CheckpointingMode

# Checkpointing configuration
env.get_checkpoint_config().enable_checkpointing(1000)  # Checkpoint every 1 second
env.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)

# Checkpoint directory configuration
env.get_checkpoint_config().set_checkpoint_storage_dir("file:///tmp/flink-checkpoints")

# Maximum concurrent checkpoints
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

# Minimum checkpoint interval
env.get_checkpoint_config().set_min_pause_between_checkpoints(500)

🚀 Real-world Project: Real-time Log Analysis

Project Overview

Build a real-time log analysis system that implements the following features:

  • Real-time log collection
  • User activity analysis
  • Anomaly pattern detection
  • Real-time alerts

1. Log Data Modeling

from dataclasses import dataclass
from typing import Optional
import json

@dataclass
class LogEvent:
    user_id: str
    session_id: str
    action: str
    timestamp: int
    ip_address: str
    user_agent: str
    metadata: Optional[dict] = None
    
    def to_json(self):
        return json.dumps({
            'user_id': self.user_id,
            'session_id': self.session_id,
            'action': self.action,
            'timestamp': self.timestamp,
            'ip_address': self.ip_address,
            'user_agent': self.user_agent,
            'metadata': self.metadata or {}
        })
    
    @classmethod
    def from_json(cls, json_str):
        data = json.loads(json_str)
        return cls(**data)

2. Real-time Log Analysis System

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types
from pyflink.common.state import ValueStateDescriptor, ListStateDescriptor
from pyflink.common.time import Time
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types

class LogAnalyzer:
    def __init__(self):
        self.env = StreamExecutionEnvironment.get_execution_environment()
        self.setup_environment()
    
    def setup_environment(self):
        # Enable checkpointing
        self.env.get_checkpoint_config().enable_checkpointing(1000)
        
        # Set event time
        self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    
    def create_kafka_source(self, topic, bootstrap_servers):
        """Create Kafka source"""
        kafka_properties = {
            'bootstrap.servers': bootstrap_servers,
            'group.id': 'log-analyzer-group'
        }
        
        kafka_source = FlinkKafkaConsumer(
            topic,
            SimpleStringSchema(),
            kafka_properties
        )
        
        return self.env.add_source(kafka_source)
    
    def parse_log_events(self, log_stream):
        """Parse log events"""
        def parse_log(log_str):
            try:
                return LogEvent.from_json(log_str)
            except Exception as e:
                print(f"Failed to parse log: {log_str}, Error: {e}")
                return None
        
        return log_stream.map(parse_log, output_type=Types.PICKLED_BYTE_ARRAY()) \
                        .filter(lambda x: x is not None)
    
    def detect_anomalies(self, log_stream):
        """Anomaly pattern detection"""
        class AnomalyDetector(KeyedProcessFunction):
            def __init__(self):
                self.failed_login_count = None
                self.last_login_time = None
                self.suspicious_actions = None
            
            def open(self, runtime_context):
                self.failed_login_count = runtime_context.get_state(
                    ValueStateDescriptor("failed_logins", Types.INT())
                )
                self.last_login_time = runtime_context.get_state(
                    ValueStateDescriptor("last_login", Types.LONG())
                )
                self.suspicious_actions = runtime_context.get_list_state(
                    ListStateDescriptor("suspicious", Types.STRING())
                )
            
            def process_element(self, log_event, ctx):
                current_time = log_event.timestamp
                
                # Track failed login attempts
                if log_event.action == "login_failed":
                    current_count = self.failed_login_count.value() or 0
                    self.failed_login_count.update(current_count + 1)
                    
                    # Alert if 5 or more failures within 5 minutes
                    if current_count >= 4:  # Starting from 0, so 4 means 5th
                        ctx.collect({
                            'type': 'security_alert',
                            'user_id': log_event.user_id,
                            'message': f'Multiple failed login attempts: {current_count + 1}',
                            'timestamp': current_time
                        })
                        self.failed_login_count.clear()
                
                # Clear failure count on successful login
                elif log_event.action == "login_success":
                    self.failed_login_count.clear()
                    self.last_login_time.update(current_time)
                
                # Detect abnormal time activity
                elif log_event.action in ["view", "purchase", "download"]:
                    last_login = self.last_login_time.value()
                    if last_login and (current_time - last_login) > 86400:  # 24 hours
                        ctx.collect({
                            'type': 'anomaly_alert',
                            'user_id': log_event.user_id,
                            'message': f'Activity after long inactivity: {log_event.action}',
                            'timestamp': current_time
                        })
        
        # Key by user and detect anomalies
        return log_stream.key_by(lambda log: log.user_id) \
                        .process(AnomalyDetector())
    
    def calculate_user_metrics(self, log_stream):
        """Calculate user metrics"""
        class UserMetricsCalculator(KeyedProcessFunction):
            def __init__(self):
                self.action_counts = None
                self.session_duration = None
                self.last_action_time = None
            
            def open(self, runtime_context):
                self.action_counts = runtime_context.get_map_state(
                    MapStateDescriptor("action_counts", Types.STRING(), Types.INT())
                )
                self.session_duration = runtime_context.get_state(
                    ValueStateDescriptor("session_duration", Types.LONG())
                )
                self.last_action_time = runtime_context.get_state(
                    ValueStateDescriptor("last_action_time", Types.LONG())
                )
            
            def process_element(self, log_event, ctx):
                current_time = log_event.timestamp
                
                # Update action counts
                current_count = self.action_counts.get(log_event.action) or 0
                self.action_counts.put(log_event.action, current_count + 1)
                
                # Calculate session duration
                last_time = self.last_action_time.value()
                if last_time:
                    duration = current_time - last_time
                    current_duration = self.session_duration.value() or 0
                    self.session_duration.update(current_duration + duration)
                
                self.last_action_time.update(current_time)
                
                # Output metrics every minute
                if current_time % 60 == 0:
                    metrics = {
                        'user_id': log_event.user_id,
                        'timestamp': current_time,
                        'session_duration': self.session_duration.value() or 0,
                        'action_counts': dict(self.action_counts.items())
                    }
                    ctx.collect(metrics)
        
        return log_stream.key_by(lambda log: log.user_id) \
                        .process(UserMetricsCalculator())
    
    def run_analysis(self):
        """Run analysis pipeline"""
        # Read logs from Kafka
        log_stream = self.create_kafka_source("user-logs", "localhost:9092")
        
        # Parse logs
        parsed_logs = self.parse_log_events(log_stream)
        
        # Assign timestamps and watermarks
        watermarked_logs = parsed_logs.assign_timestamps_and_watermarks(
            TimestampExtractor()
        )
        
        # Detect anomalies
        anomalies = self.detect_anomalies(watermarked_logs)
        
        # Calculate user metrics
        metrics = self.calculate_user_metrics(watermarked_logs)
        
        # Output results
        anomalies.print("Security Alerts")
        metrics.print("User Metrics")
        
        # Execute
        self.env.execute("Real-time Log Analysis")

# Watermark extractor
class TimestampExtractor(BoundedOutOfOrdernessTimestampExtractor):
    def __init__(self):
        super().__init__(Time.seconds(10))  # Allow 10-second delay
    
    def extract_timestamp(self, element, previous_timestamp):
        return element.timestamp * 1000

if __name__ == "__main__":
    analyzer = LogAnalyzer()
    analyzer.run_analysis()

3. Execution and Testing

# Test data generator
class LogGenerator:
    def __init__(self):
        import random
        import time
        self.users = ["user1", "user2", "user3", "user4", "user5"]
        self.actions = ["login", "logout", "view", "purchase", "download", "login_failed"]
        self.ip_addresses = ["192.168.1.1", "192.168.1.2", "10.0.0.1", "10.0.0.2"]
    
    def generate_log(self):
        import random
        import time
        
        return LogEvent(
            user_id=random.choice(self.users),
            session_id=f"session_{random.randint(1000, 9999)}",
            action=random.choice(self.actions),
            timestamp=int(time.time()),
            ip_address=random.choice(self.ip_addresses),
            user_agent="Mozilla/5.0 (Test Browser)"
        ).to_json()

# Test execution
def test_log_analysis():
    env = StreamExecutionEnvironment.get_execution_environment()
    
    # Generate test data
    generator = LogGenerator()
    test_logs = [generator.generate_log() for _ in range(100)]
    
    log_stream = env.from_collection(test_logs)
    
    # Parse logs
    parsed_logs = log_stream.map(
        lambda x: LogEvent.from_json(x),
        output_type=Types.PICKLED_BYTE_ARRAY()
    ).filter(lambda x: x is not None)
    
    # Calculate simple statistics
    user_actions = parsed_logs.map(
        lambda log: (log.user_id, log.action, 1)
    ).key_by(lambda x: x[0]) \
     .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) \
     .sum(2)
    
    user_actions.print()
    
    env.execute("Log Analysis Test")

# Test execution
if __name__ == "__main__":
    test_log_analysis()

📚 Learning Summary

What We Learned in This Part

  1. Apache Flink Introduction
    • Flink’s origins and design philosophy
    • Fundamental differences from Spark
    • Concept of true streaming processing
  2. Flink Architecture
    • Roles of JobManager and TaskManager
    • Cluster configuration and resource management
    • Program execution structure
  3. DataStream API
    • Basic data types and transformation operations
    • Map, Filter, FlatMap, KeyBy operations
    • Window operations (Tumbling, Sliding, Session)
  4. Time Processing
    • Event Time, Processing Time, Ingestion Time
    • Watermarking strategies and late data processing
    • Timestamp assignment methods
  5. State Management
    • ValueState, ListState, MapState
    • Checkpointing and failure recovery
    • State-based processing logic
  6. Real-world Project
    • Real-time log analysis system
    • Anomaly pattern detection
    • User metrics calculation

Core Technology Stack

Technology Purpose Importance
DataStream API Streaming data processing ⭐⭐⭐⭐⭐
State Management State-based processing ⭐⭐⭐⭐⭐
Time Processing Event time-based analysis ⭐⭐⭐⭐⭐
Window Operations Time-based aggregation ⭐⭐⭐⭐
Checkpointing Failure recovery ⭐⭐⭐⭐

Next Part Preview

Part 2: Advanced Streaming Processing and State Management will cover:

  • Advanced state management patterns
  • Deep dive into checkpointing and savepoints
  • Complex time processing strategies
  • Performance optimization techniques

Next Part: Part 2: Advanced Streaming Processing and State Management


Now you’ve mastered the basics of Flink! In the next part, we’ll explore more advanced features. 🚀