Intermediate

Data Pipeline Patterns

Proven patterns and anti-patterns for robust data pipelines

⏱️ 40 min read 📅 Updated Jan 2025 👤 Based on Densmore's Pocket Reference

📚 Reference: Data Pipelines Pocket Reference

This chapter draws from James Densmore's practical guide to building production data pipelines.

Mode Baca Pemula

Anggap pipeline pattern sebagai "template arsitektur yang bisa dipilih". Fokus baca:

  1. Pattern mana cocok untuk kebutuhan batch/streaming
  2. Anti-pattern yang sering bikin pipeline rapuh
  3. Pola error handling dan idempotency

Kamus istilah: DE-GLOSSARY.md

Prasyarat Ringan

Istilah Penting (3 Lapis)

Istilah: Fan-out / Fan-in

Definisi awam: Satu alur dipecah jadi banyak lalu digabung lagi.

Definisi teknis: Pola paralelisasi task (fan-out) diikuti agregasi hasil (fan-in) untuk efisiensi throughput.

Contoh praktis: Data per negara diproses paralel lalu disatukan ke tabel global.

Istilah: Idempotency

Definisi awam: Diulang berkali-kali hasilnya tetap sama.

Definisi teknis: Properti operasi pipeline yang aman terhadap retry/rerun tanpa efek samping tambahan.

Contoh praktis: Upsert dengan primary key membuat run ulang tidak menggandakan data.

Pipeline Architecture Patterns

Pattern 1: Extract-Load-Transform (ELT)

When to use: Modern cloud warehouses with powerful compute

Pattern 2: Lambda Architecture

Two parallel paths for batch and streaming:

# Batch Layer: Complete recomputation batch_results = process_historical_data(all_data) # Speed Layer: Real-time approximation real_time_results = process_stream(events) # Serving Layer: Merge results final = merge(batch_results, real_time_results)

Pattern 3: Kappa Architecture

Streaming-only approach - simpler than Lambda:

Data Quality Patterns

The "Check and Fail Fast" Pattern

def validate_input(df): checks = [ df['customer_id'].notnull().all(), df['amount'].ge(0).all(), len(df) > 0 ] if not all(checks): raise ValueError("Data validation failed!") return df

Schema Validation Pattern

import pandera as pa schema = pa.DataFrameSchema({ "customer_id": pa.Column(str, nullable=False), "amount": pa.Column( float, checks=pa.Check.ge(0) ), "order_date": pa.Column(pa.DateTime) }) # Validates on assignment validated_df = schema(df)

Pipeline Anti-Patterns

❌ Anti-Pattern: Hidden Dependencies

Tasks that depend on external state not captured in DAG

Fix: Explicit dependencies in code

❌ Anti-Pattern: Silent Failures

Empty results treated as success

Fix: Row count checks, anomaly detection

❌ Anti-Pattern: Hardcoded Config

Connection strings in code

Fix: Environment variables, config management

❌ Anti-Pattern: Giant Monolithic Jobs

One job does everything

Fix: Break into smaller, composable tasks

Testing Data Pipelines

Types of Pipeline Tests

Test Type What It Tests When to Run
Unit Tests Individual functions Pre-commit
Integration Tests Component interactions CI/CD
Data Diff Output vs expected Staging
Great Expectations Data quality rules Production
# Example: pytest for data pipelines def test_transform_orders(): # Given input_df = pd.DataFrame({ 'amount': [100, 200], 'currency': ['USD', 'EUR'] }) # When result = transform_orders(input_df) # Then assert result['amount_usd'].iloc[1] == 220 # Converted assert len(result) == 2

Error Handling Patterns

1. Dead Letter Queue (DLQ)

Failed records go to a separate queue for inspection:

2. Circuit Breaker

Stop calling failing external service:

class CircuitBreaker: def __init__(self, failure_threshold=5, timeout=60): self.failures = 0 self.threshold = failure_threshold self.timeout = timeout self.last_failure_time = None self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN def call(self, func, *args): if self.state == 'OPEN': if time.time() - self.last_failure_time > self.timeout: self.state = 'HALF_OPEN' else: raise Exception("Circuit breaker is OPEN") try: result = func(*args) self.on_success() return result except Exception as e: self.on_failure() raise e

Incremental Processing Patterns

High-Water Mark Pattern

# Track last processed ID/timestamp def extract_incremental(source_table, watermark_column): last_value = get_last_watermark(source_table) query = f""" SELECT * FROM {source_table} WHERE {watermark_column} > {last_value} ORDER BY {watermark_column} """ df = execute(query) # Update watermark for next run if not df.empty: new_watermark = df[watermark_column].max() save_watermark(source_table, new_watermark) return df

Idempotency

Running a pipeline multiple times should produce the same result:

✅ Making Pipelines Idempotent

# Idempotent load pattern def load_idempotent(df, table, partition_date): # Clear existing data for this partition execute(f"DELETE FROM {table} WHERE date = '{partition_date}'") # Insert new data df.to_sql(table, if_exists='append') # Safe to re-run - duplicates removed

Decision Framework: Pipeline Pattern Selection

Decision Point Pilih Opsi A Jika... Pilih Opsi B Jika...
Fan-in vs Fan-out Banyak source perlu konsolidasi ke satu model Satu source melayani banyak use case downstream
Micro-batch vs Streaming Butuh latency menit dan operasi lebih sederhana Butuh respon detik dan event-driven processing
Single pipeline vs Layered pipelines Scope kecil dan dependency minim Skala besar dengan kebutuhan reuse antar domain

Failure Modes & Anti-Patterns

Anti-Patterns Pipeline Design

Production Readiness Checklist

Checklist Pipeline Patterns

  1. Pattern dipilih sesuai latency dan volume nyata.
  2. Dependency graph terdokumentasi dan mudah ditelusuri.
  3. Setiap pipeline punya owner dan SLA jelas.
  4. Retry, DLQ, dan replay strategy tersedia.
  5. Testing strategy ada untuk unit + integration + data quality.
  6. Cost visibility per pipeline tersedia.

✏️ Exercise: Refactor a Pipeline

Given this problematic pipeline:

def pipeline(): data = extract("SELECT * FROM orders") # Full extract every run! transformed = transform(data) transformed.to_sql('orders_fact', if_exists='append') # Duplicates!

Fix these issues:

  1. Make it incremental (not full extract)
  2. Make it idempotent (no duplicates)
  3. Add validation
  4. Add error handling

🎯 Quick Quiz

1. What is the main benefit of ELT over ETL?

A. Faster extraction
B. Raw data preserved, can re-transform
C. Less storage needed
D. Better for real-time

2. What does idempotency mean in data pipelines?

A. Pipeline runs faster each time
B. Same input produces same output, safe to retry
C. Pipeline can handle any data size
D. No errors ever occur

3. When should Circuit Breaker pattern be used?

A. When processing small files
B. When calling external services that might fail
C. When doing data validation
D. When reading from database

Kesimpulan

Following proven patterns and avoiding anti-patterns makes your pipelines more reliable, testable, and maintainable.

🎯 Key Takeaways

📚 References & Resources

Primary Sources

Official Documentation

Articles & Guides