Learn/Advanced Topics

JSON ETL Pipelines — Extract, Transform & Load Structured Data

Moving JSON between systems — from APIs to databases, from files to warehouses — is one of the most common tasks in modern development. This guide covers the end-to-end pattern: extracting from sources, transforming with real tools, loading into destinations, and handling the failures that inevitably occur in production.

The ETL Pattern for JSON

JSON ETL Pipeline Architecture
StagePurposeKey Concerns
ExtractPull JSON from source systemsPagination, rate limiting, authentication, retries
TransformReshape JSON into the target schemaFlattening, type coercion, deduplication, validation
LoadWrite transformed data to destinationIdempotency, batch size, schema migrations, error handling

ETL vs. ELT: When to Transform

ApproachTransform LocationBest ForExample
ETLBefore loadingRelational databases, strict schemasFlatten nested JSON, then INSERT into PostgreSQL
ELTAfter loadingData warehouses with strong SQL enginesLoad raw JSON into BigQuery, transform with dbt

Modern Default

If your destination supports JSON natively (PostgreSQL JSONB, BigQuery, Snowflake), prefer ELT: load raw JSON first, then transform with SQL. This preserves the raw data for reprocessing and lets you iterate on transforms without re-extracting.

Extract: Fetching JSON from Sources

Paginated API Extraction

Extract all pages from a REST APIpython
1import requests
2import json
3import time
4
5def extract_all_pages(base_url, headers=None, per_page=100):
6 """Extract all records from a paginated API endpoint."""
7 all_records = []
8 page = 1
9
10 while True:
11 response = requests.get(
12 base_url,
13 headers=headers,
14 params={"page": page, "per_page": per_page},
15 timeout=30,
16 )
17 response.raise_for_status()
18 data = response.json()
19
20 records = data.get("results", data.get("data", []))
21 if not records:
22 break
23
24 all_records.extend(records)
25 page += 1
26 time.sleep(0.5) # respect rate limits
27
28 return all_records
29
30# Save as NDJSON for streaming downstream
31records = extract_all_pages("https://api.example.com/users")
32with open("users.ndjson", "w") as f:
33 for record in records:
34 f.write(json.dumps(record) + "\n")

NDJSON as Pipeline Format

NDJSON (one JSON object per line) is the ideal interchange format for pipelines because it supports streaming, appending, and parallel processing:

users.ndjson — one record per linejson
1{"id":1,"name":"Alice","email":"[email protected]","plan":"pro"}
2{"id":2,"name":"Bob","email":"[email protected]","plan":"free"}
3{"id":3,"name":"Carol","email":"[email protected]","plan":"pro"}

Transform: Reshaping JSON

CLI Transforms with jq

Transform NDJSON with jqbash
1# Flatten nested address, filter pro users, rename fields
2cat users.ndjson | jq -c '
3 select(.plan == "pro") |
4 {
5 user_id: .id,
6 full_name: .name,
7 email: .email,
8 city: .address.city,
9 country: .address.country,
10 signup_date: .created_at[:10]
11 }
12' > pro_users_flat.ndjson

Python Transforms with Polars

High-performance transforms with Polarspython
1import polars as pl
2
3# Read NDJSON directly into a DataFrame
4df = pl.read_ndjson("users.ndjson")
5
6# Transform: flatten, filter, compute new columns
7result = (
8 df
9 .filter(pl.col("plan") == "pro")
10 .with_columns([
11 pl.col("address").struct.field("city").alias("city"),
12 pl.col("address").struct.field("country").alias("country"),
13 pl.col("created_at").str.slice(0, 10).alias("signup_date"),
14 ])
15 .select(["id", "name", "email", "city", "country", "signup_date"])
16 .rename({"id": "user_id", "name": "full_name"})
17)
18
19# Write to Parquet for warehouse loading
20result.write_parquet("pro_users.parquet")
21
22# Or write back to NDJSON
23result.write_ndjson("pro_users.ndjson")

Node.js Stream Transforms

Streaming transform for large filesjavascript
1import { createReadStream, createWriteStream } from 'fs';
2import { createInterface } from 'readline';
3
4const input = createReadStream('users.ndjson');
5const output = createWriteStream('transformed.ndjson');
6const rl = createInterface({ input, crlfDelay: Infinity });
7
8let processed = 0;
9let errors = 0;
10
11for await (const line of rl) {
12 try {
13 const record = JSON.parse(line);
14 const transformed = {
15 user_id: record.id,
16 full_name: record.name,
17 email: record.email,
18 city: record.address?.city ?? null,
19 signup_date: record.created_at?.slice(0, 10) ?? null,
20 };
21 output.write(JSON.stringify(transformed) + '\n');
22 processed++;
23 } catch (err) {
24 errors++;
25 // Write to dead letter queue
26 appendFileSync('dlq.ndjson', line + '\n');
27 }
28}
29
30console.log(`Processed: ${processed}, Errors: ${errors}`);

Common Transform Patterns

PatternInputOutputTool
Flatten nested objects{"user":{"name":"Alice"}}{"user_name":"Alice"}jq flatten, JSON Flattener tool
Explode arrays to rows{"tags":["a","b"]}{"tag":"a"}, {"tag":"b"}jq .tags[], pandas explode()
Type coercion{"price":"19.99"}{"price":19.99}jq tonumber, Polars cast()
DeduplicationDuplicate records by IDUnique recordsjq unique_by(.id), Polars distinct()
Key renaming{"user_name":"Alice"}{"name":"Alice"}jq with_entries, Polars rename()
Null handling{"city":null}{"city":"Unknown"}jq .city // "Unknown"

Load: Writing to Destinations

PostgreSQL with JSONB

Load NDJSON into PostgreSQLpython
1import json
2import psycopg2
3from psycopg2.extras import execute_values
4
5conn = psycopg2.connect("postgresql://user:pass@localhost/mydb")
6cur = conn.cursor()
7
8# Option 1: Load into a typed table
9with open("pro_users.ndjson") as f:
10 rows = [json.loads(line) for line in f]
11
12execute_values(
13 cur,
14 """INSERT INTO users (user_id, full_name, email, city, signup_date)
15 VALUES %s ON CONFLICT (user_id) DO UPDATE
16 SET full_name = EXCLUDED.full_name, email = EXCLUDED.email""",
17 [(r["user_id"], r["full_name"], r["email"], r["city"], r["signup_date"])
18 for r in rows],
19)
20
21# Option 2: Load raw JSON into a JSONB column (ELT pattern)
22execute_values(
23 cur,
24 "INSERT INTO raw_events (payload) VALUES %s",
25 [(json.dumps(r),) for r in rows],
26 template="(%(payload)s::jsonb)",
27)
28
29conn.commit()

Data Warehouse (BigQuery)

Load NDJSON into BigQuerybash
1# BigQuery natively supports NDJSON
2bq load \
3 --source_format=NEWLINE_DELIMITED_JSON \
4 --autodetect \
5 mydataset.users \
6 gs://mybucket/pro_users.ndjson

Error Handling: Dead Letter Queues

DLQ Pattern for Pipeline Failures

Never Crash on Bad Records

A single malformed record should never stop your entire pipeline. Catch parse errors per-record, write failures to a dead letter queue with the raw input and error message, and continue processing. Review the DLQ periodically to fix systematic issues.

Schema Drift Detection

Detect new or missing fieldspython
1import json
2
3EXPECTED_FIELDS = {"user_id", "full_name", "email", "city", "signup_date"}
4
5def check_schema_drift(record: dict) -> list[str]:
6 warnings = []
7 record_fields = set(record.keys())
8
9 new_fields = record_fields - EXPECTED_FIELDS
10 if new_fields:
11 warnings.append(f"New fields detected: {new_fields}")
12
13 missing_fields = EXPECTED_FIELDS - record_fields
14 if missing_fields:
15 warnings.append(f"Missing fields: {missing_fields}")
16
17 return warnings
18
19# Run against a sample of incoming records
20with open("users.ndjson") as f:
21 for i, line in enumerate(f):
22 if i >= 100: # sample first 100 records
23 break
24 record = json.loads(line)
25 drift = check_schema_drift(record)
26 if drift:
27 print(f"Record {i}: {drift}")

Pipeline Monitoring Checklist

  • Track records extracted, transformed, and loaded per run
  • Monitor DLQ size — alert if error rate exceeds threshold (e.g., >1%)
  • Log pipeline duration and set alerts for slowdowns
  • Validate row counts: source count should match destination count (minus DLQ)
  • Run schema drift checks on a sample of each batch
  • Use idempotent loads (upsert by primary key) so retries are safe
  • Never silently drop records — every input must be accounted for
  • Never hard-code API keys in pipeline scripts — use environment variables or secrets managers

Frequently Asked Questions

What is a JSON ETL pipeline?
A JSON ETL pipeline is a data workflow that Extracts JSON from sources (APIs, files, message queues), Transforms it (flattening, filtering, renaming, joining), and Loads the result into a destination (database, warehouse, file). It automates the process of moving structured data between systems.
What is the difference between ETL and ELT?
In ETL, data is transformed before loading into the destination. In ELT, raw data is loaded first (often into a data lake or warehouse), then transformed using SQL or the destination's query engine. ELT is preferred when the destination has strong query capabilities (BigQuery, Snowflake) and you want to preserve raw data.
What is NDJSON and why is it used in pipelines?
NDJSON (Newline-Delimited JSON, also called JSON Lines) is a format where each line is a separate, valid JSON object. It is ideal for pipelines because it can be streamed line-by-line without loading the entire dataset into memory, supports append operations, and is trivially splittable for parallel processing.
How do I handle malformed JSON in a pipeline?
Use a dead letter queue (DLQ) pattern: attempt to parse each record, and route failures to a separate storage location with the raw input and error details. Process the DLQ separately for investigation. Never let one bad record crash the entire pipeline.
How do I detect schema drift in JSON pipelines?
Compare the set of keys and types in incoming records against a known schema. Tools like Great Expectations, dbt tests, or custom validation with JSON Schema can flag new fields, missing fields, or type changes. Alert on drift but do not necessarily reject records — additive changes (new fields) are usually safe.
What tools are best for JSON ETL pipelines?
For CLI: jq for ad-hoc transforms. For Python: pandas or polars for batch, aiohttp for async extraction. For Node.js: stream transforms with JSONStream. For SQL-based transforms: dbt. For managed pipelines: Airbyte, Fivetran, or AWS Glue. Choose based on team skills and scale.