Intermediate
Data Pipeline Patterns
Proven patterns and anti-patterns for robust data pipelines
⏱️ 40 min read
📅 Updated Jan 2025
👤 Based on Densmore's Pocket Reference
📚 Reference: Data Pipelines Pocket Reference
This chapter draws from James Densmore's practical guide to building production data pipelines.
Mode Baca Pemula
Anggap pipeline pattern sebagai "template arsitektur yang bisa dipilih". Fokus baca:
- Pattern mana cocok untuk kebutuhan batch/streaming
- Anti-pattern yang sering bikin pipeline rapuh
- Pola error handling dan idempotency
Kamus istilah: DE-GLOSSARY.md
Prasyarat Ringan
- Paham pipeline punya tahap extract, transform, load
- Tahu job bisa diulang (rerun) saat gagal
- Pernah melihat masalah data telat, duplikat, atau tidak lengkap
Istilah Penting (3 Lapis)
Istilah: Fan-out / Fan-in
Definisi awam: Satu alur dipecah jadi banyak lalu digabung lagi.
Definisi teknis: Pola paralelisasi task (fan-out) diikuti agregasi hasil (fan-in) untuk efisiensi throughput.
Contoh praktis: Data per negara diproses paralel lalu disatukan ke tabel global.
Istilah: Idempotency
Definisi awam: Diulang berkali-kali hasilnya tetap sama.
Definisi teknis: Properti operasi pipeline yang aman terhadap retry/rerun tanpa efek samping tambahan.
Contoh praktis: Upsert dengan primary key membuat run ulang tidak menggandakan data.
Pipeline Architecture Patterns
Pattern 1: Extract-Load-Transform (ELT)
When to use: Modern cloud warehouses with powerful compute
- Raw data loaded to warehouse first
- Transformations done in SQL
- Easy to reprocess without re-extracting
Pattern 2: Lambda Architecture
Two parallel paths for batch and streaming:
batch_results = process_historical_data(all_data)
real_time_results = process_stream(events)
final = merge(batch_results, real_time_results)
Pattern 3: Kappa Architecture
Streaming-only approach - simpler than Lambda:
- Everything is a stream
- Reprocess by replaying events
- Single code path
Data Quality Patterns
The "Check and Fail Fast" Pattern
def validate_input(df):
checks = [
df['customer_id'].notnull().all(),
df['amount'].ge(0).all(),
len(df) > 0
]
if not all(checks):
raise ValueError("Data validation failed!")
return df
Schema Validation Pattern
import pandera as pa
schema = pa.DataFrameSchema({
"customer_id": pa.Column(str, nullable=False),
"amount": pa.Column(
float,
checks=pa.Check.ge(0)
),
"order_date": pa.Column(pa.DateTime)
})
validated_df = schema(df)
Pipeline Anti-Patterns
❌ Anti-Pattern: Hidden Dependencies
Tasks that depend on external state not captured in DAG
Fix: Explicit dependencies in code
❌ Anti-Pattern: Silent Failures
Empty results treated as success
Fix: Row count checks, anomaly detection
❌ Anti-Pattern: Hardcoded Config
Connection strings in code
Fix: Environment variables, config management
❌ Anti-Pattern: Giant Monolithic Jobs
One job does everything
Fix: Break into smaller, composable tasks
Testing Data Pipelines
Types of Pipeline Tests
| Test Type |
What It Tests |
When to Run |
| Unit Tests |
Individual functions |
Pre-commit |
| Integration Tests |
Component interactions |
CI/CD |
| Data Diff |
Output vs expected |
Staging |
| Great Expectations |
Data quality rules |
Production |
def test_transform_orders():
input_df = pd.DataFrame({
'amount': [100, 200],
'currency': ['USD', 'EUR']
})
result = transform_orders(input_df)
assert result['amount_usd'].iloc[1] == 220
assert len(result) == 2
Error Handling Patterns
1. Dead Letter Queue (DLQ)
Failed records go to a separate queue for inspection:
- Main pipeline continues processing
- Failed items logged separately
- Manual or automated retry possible
2. Circuit Breaker
Stop calling failing external service:
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failures = 0
self.threshold = failure_threshold
self.timeout = timeout
self.last_failure_time = None
self.state = 'CLOSED'
def call(self, func, *args):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
Incremental Processing Patterns
High-Water Mark Pattern
def extract_incremental(source_table, watermark_column):
last_value = get_last_watermark(source_table)
query = f"""
SELECT * FROM {source_table}
WHERE {watermark_column} > {last_value}
ORDER BY {watermark_column}
"""
df = execute(query)
if not df.empty:
new_watermark = df[watermark_column].max()
save_watermark(source_table, new_watermark)
return df
Idempotency
Running a pipeline multiple times should produce the same result:
✅ Making Pipelines Idempotent
- Use deterministic IDs (not auto-increment)
- DELETE then INSERT instead of APPEND
- Upsert (MERGE) for updates
- Partition overwrite for date-based data
def load_idempotent(df, table, partition_date):
execute(f"DELETE FROM {table} WHERE date = '{partition_date}'")
df.to_sql(table, if_exists='append')
Decision Framework: Pipeline Pattern Selection
| Decision Point |
Pilih Opsi A Jika... |
Pilih Opsi B Jika... |
| Fan-in vs Fan-out |
Banyak source perlu konsolidasi ke satu model |
Satu source melayani banyak use case downstream |
| Micro-batch vs Streaming |
Butuh latency menit dan operasi lebih sederhana |
Butuh respon detik dan event-driven processing |
| Single pipeline vs Layered pipelines |
Scope kecil dan dependency minim |
Skala besar dengan kebutuhan reuse antar domain |
Failure Modes & Anti-Patterns
Anti-Patterns Pipeline Design
- God pipeline: satu DAG/job menangani terlalu banyak concern.
- Tight coupling: perubahan kecil di source merusak banyak downstream.
- No backpressure handling: spike traffic memicu cascading failures.
- Missing replay capability: sulit recovery data historis saat bug terdeteksi.
Production Readiness Checklist
Checklist Pipeline Patterns
- Pattern dipilih sesuai latency dan volume nyata.
- Dependency graph terdokumentasi dan mudah ditelusuri.
- Setiap pipeline punya owner dan SLA jelas.
- Retry, DLQ, dan replay strategy tersedia.
- Testing strategy ada untuk unit + integration + data quality.
- Cost visibility per pipeline tersedia.
✏️ Exercise: Refactor a Pipeline
Given this problematic pipeline:
def pipeline():
data = extract("SELECT * FROM orders") # Full extract every run!
transformed = transform(data)
transformed.to_sql('orders_fact', if_exists='append') # Duplicates!
Fix these issues:
- Make it incremental (not full extract)
- Make it idempotent (no duplicates)
- Add validation
- Add error handling
🎯 Quick Quiz
1. What is the main benefit of ELT over ETL?
A. Faster extraction
B. Raw data preserved, can re-transform
C. Less storage needed
D. Better for real-time
2. What does idempotency mean in data pipelines?
A. Pipeline runs faster each time
B. Same input produces same output, safe to retry
C. Pipeline can handle any data size
D. No errors ever occur
3. When should Circuit Breaker pattern be used?
A. When processing small files
B. When calling external services that might fail
C. When doing data validation
D. When reading from database
Kesimpulan
Following proven patterns and avoiding anti-patterns makes your pipelines more reliable, testable, and maintainable.
🎯 Key Takeaways
- Choose architecture pattern based on latency needs
- Always validate data quality
- Test pipelines like you test software
- Design for failure with proper error handling
- Make pipelines idempotent for safe retries
📚 References & Resources
Primary Sources
- Data Pipelines Pocket Reference - James Densmore (O'Reilly, 2021)
Chapters 3-6: Ingestion Patterns, Error Handling, Testing, Idempotency
- Fundamentals of Data Engineering - Joe Reis & Matt Housley (O'Reilly, 2022)
Chapter 7: Pipeline Patterns, Chapter 12: Orchestration Patterns
- Building Microservices - Sam Newman (O'Reilly, 2021)
Chapter 12: Resiliency Patterns (Circuit Breaker)
Official Documentation
Articles & Guides