The ETL Pattern for JSON
| Stage | Purpose | Key Concerns |
|---|---|---|
| Extract | Pull JSON from source systems | Pagination, rate limiting, authentication, retries |
| Transform | Reshape JSON into the target schema | Flattening, type coercion, deduplication, validation |
| Load | Write transformed data to destination | Idempotency, batch size, schema migrations, error handling |
ETL vs. ELT: When to Transform
| Approach | Transform Location | Best For | Example |
|---|---|---|---|
| ETL | Before loading | Relational databases, strict schemas | Flatten nested JSON, then INSERT into PostgreSQL |
| ELT | After loading | Data warehouses with strong SQL engines | Load raw JSON into BigQuery, transform with dbt |
Modern Default
If your destination supports JSON natively (PostgreSQL JSONB, BigQuery, Snowflake), prefer ELT: load raw JSON first, then transform with SQL. This preserves the raw data for reprocessing and lets you iterate on transforms without re-extracting.
Extract: Fetching JSON from Sources
Paginated API Extraction
Extract all pages from a REST APIpython
1import requests2import json3import time45def extract_all_pages(base_url, headers=None, per_page=100):6 """Extract all records from a paginated API endpoint."""7 all_records = []8 page = 1910 while True:11 response = requests.get(12 base_url,13 headers=headers,14 params={"page": page, "per_page": per_page},15 timeout=30,16 )17 response.raise_for_status()18 data = response.json()1920 records = data.get("results", data.get("data", []))21 if not records:22 break2324 all_records.extend(records)25 page += 126 time.sleep(0.5) # respect rate limits2728 return all_records2930# Save as NDJSON for streaming downstream31records = extract_all_pages("https://api.example.com/users")32with open("users.ndjson", "w") as f:33 for record in records:34 f.write(json.dumps(record) + "\n")NDJSON as Pipeline Format
NDJSON (one JSON object per line) is the ideal interchange format for pipelines because it supports streaming, appending, and parallel processing:
users.ndjson — one record per linejson
1{"id":1,"name":"Alice","email":"[email protected]","plan":"pro"}2{"id":2,"name":"Bob","email":"[email protected]","plan":"free"}3{"id":3,"name":"Carol","email":"[email protected]","plan":"pro"}Transform: Reshaping JSON
CLI Transforms with jq
Transform NDJSON with jqbash
1# Flatten nested address, filter pro users, rename fields2cat users.ndjson | jq -c '3 select(.plan == "pro") |4 {5 user_id: .id,6 full_name: .name,7 email: .email,8 city: .address.city,9 country: .address.country,10 signup_date: .created_at[:10]11 }12' > pro_users_flat.ndjsonPython Transforms with Polars
High-performance transforms with Polarspython
1import polars as pl23# Read NDJSON directly into a DataFrame4df = pl.read_ndjson("users.ndjson")56# Transform: flatten, filter, compute new columns7result = (8 df9 .filter(pl.col("plan") == "pro")10 .with_columns([11 pl.col("address").struct.field("city").alias("city"),12 pl.col("address").struct.field("country").alias("country"),13 pl.col("created_at").str.slice(0, 10).alias("signup_date"),14 ])15 .select(["id", "name", "email", "city", "country", "signup_date"])16 .rename({"id": "user_id", "name": "full_name"})17)1819# Write to Parquet for warehouse loading20result.write_parquet("pro_users.parquet")2122# Or write back to NDJSON23result.write_ndjson("pro_users.ndjson")Node.js Stream Transforms
Streaming transform for large filesjavascript
1import { createReadStream, createWriteStream } from 'fs';2import { createInterface } from 'readline';34const input = createReadStream('users.ndjson');5const output = createWriteStream('transformed.ndjson');6const rl = createInterface({ input, crlfDelay: Infinity });78let processed = 0;9let errors = 0;1011for await (const line of rl) {12 try {13 const record = JSON.parse(line);14 const transformed = {15 user_id: record.id,16 full_name: record.name,17 email: record.email,18 city: record.address?.city ?? null,19 signup_date: record.created_at?.slice(0, 10) ?? null,20 };21 output.write(JSON.stringify(transformed) + '\n');22 processed++;23 } catch (err) {24 errors++;25 // Write to dead letter queue26 appendFileSync('dlq.ndjson', line + '\n');27 }28}2930console.log(`Processed: ${processed}, Errors: ${errors}`);Common Transform Patterns
| Pattern | Input | Output | Tool |
|---|---|---|---|
| Flatten nested objects | {"user":{"name":"Alice"}} | {"user_name":"Alice"} | jq flatten, JSON Flattener tool |
| Explode arrays to rows | {"tags":["a","b"]} | {"tag":"a"}, {"tag":"b"} | jq .tags[], pandas explode() |
| Type coercion | {"price":"19.99"} | {"price":19.99} | jq tonumber, Polars cast() |
| Deduplication | Duplicate records by ID | Unique records | jq unique_by(.id), Polars distinct() |
| Key renaming | {"user_name":"Alice"} | {"name":"Alice"} | jq with_entries, Polars rename() |
| Null handling | {"city":null} | {"city":"Unknown"} | jq .city // "Unknown" |
Load: Writing to Destinations
PostgreSQL with JSONB
Load NDJSON into PostgreSQLpython
1import json2import psycopg23from psycopg2.extras import execute_values45conn = psycopg2.connect("postgresql://user:pass@localhost/mydb")6cur = conn.cursor()78# Option 1: Load into a typed table9with open("pro_users.ndjson") as f:10 rows = [json.loads(line) for line in f]1112execute_values(13 cur,14 """INSERT INTO users (user_id, full_name, email, city, signup_date)15 VALUES %s ON CONFLICT (user_id) DO UPDATE16 SET full_name = EXCLUDED.full_name, email = EXCLUDED.email""",17 [(r["user_id"], r["full_name"], r["email"], r["city"], r["signup_date"])18 for r in rows],19)2021# Option 2: Load raw JSON into a JSONB column (ELT pattern)22execute_values(23 cur,24 "INSERT INTO raw_events (payload) VALUES %s",25 [(json.dumps(r),) for r in rows],26 template="(%(payload)s::jsonb)",27)2829conn.commit()Data Warehouse (BigQuery)
Load NDJSON into BigQuerybash
1# BigQuery natively supports NDJSON2bq load \3 --source_format=NEWLINE_DELIMITED_JSON \4 --autodetect \5 mydataset.users \6 gs://mybucket/pro_users.ndjsonError Handling: Dead Letter Queues
Never Crash on Bad Records
A single malformed record should never stop your entire pipeline. Catch parse errors per-record, write failures to a dead letter queue with the raw input and error message, and continue processing. Review the DLQ periodically to fix systematic issues.
Schema Drift Detection
Detect new or missing fieldspython
1import json23EXPECTED_FIELDS = {"user_id", "full_name", "email", "city", "signup_date"}45def check_schema_drift(record: dict) -> list[str]:6 warnings = []7 record_fields = set(record.keys())89 new_fields = record_fields - EXPECTED_FIELDS10 if new_fields:11 warnings.append(f"New fields detected: {new_fields}")1213 missing_fields = EXPECTED_FIELDS - record_fields14 if missing_fields:15 warnings.append(f"Missing fields: {missing_fields}")1617 return warnings1819# Run against a sample of incoming records20with open("users.ndjson") as f:21 for i, line in enumerate(f):22 if i >= 100: # sample first 100 records23 break24 record = json.loads(line)25 drift = check_schema_drift(record)26 if drift:27 print(f"Record {i}: {drift}")Pipeline Monitoring Checklist
- ✓Track records extracted, transformed, and loaded per run
- ✓Monitor DLQ size — alert if error rate exceeds threshold (e.g., >1%)
- ✓Log pipeline duration and set alerts for slowdowns
- ✓Validate row counts: source count should match destination count (minus DLQ)
- ✓Run schema drift checks on a sample of each batch
- ✓Use idempotent loads (upsert by primary key) so retries are safe
- ✗Never silently drop records — every input must be accounted for
- ✗Never hard-code API keys in pipeline scripts — use environment variables or secrets managers
Try These Tools
Continue Learning
Frequently Asked Questions
What is a JSON ETL pipeline?
A JSON ETL pipeline is a data workflow that Extracts JSON from sources (APIs, files, message queues), Transforms it (flattening, filtering, renaming, joining), and Loads the result into a destination (database, warehouse, file). It automates the process of moving structured data between systems.
What is the difference between ETL and ELT?
In ETL, data is transformed before loading into the destination. In ELT, raw data is loaded first (often into a data lake or warehouse), then transformed using SQL or the destination's query engine. ELT is preferred when the destination has strong query capabilities (BigQuery, Snowflake) and you want to preserve raw data.
What is NDJSON and why is it used in pipelines?
NDJSON (Newline-Delimited JSON, also called JSON Lines) is a format where each line is a separate, valid JSON object. It is ideal for pipelines because it can be streamed line-by-line without loading the entire dataset into memory, supports append operations, and is trivially splittable for parallel processing.
How do I handle malformed JSON in a pipeline?
Use a dead letter queue (DLQ) pattern: attempt to parse each record, and route failures to a separate storage location with the raw input and error details. Process the DLQ separately for investigation. Never let one bad record crash the entire pipeline.
How do I detect schema drift in JSON pipelines?
Compare the set of keys and types in incoming records against a known schema. Tools like Great Expectations, dbt tests, or custom validation with JSON Schema can flag new fields, missing fields, or type changes. Alert on drift but do not necessarily reject records — additive changes (new fields) are usually safe.
What tools are best for JSON ETL pipelines?
For CLI: jq for ad-hoc transforms. For Python: pandas or polars for batch, aiohttp for async extraction. For Node.js: stream transforms with JSONStream. For SQL-based transforms: dbt. For managed pipelines: Airbyte, Fivetran, or AWS Glue. Choose based on team skills and scale.