Building Custom API Pipelines with dlt: From REST to BigQuery

Every data team eventually hits the same wall: an API that matters to the business, but no connector exists for it. Your options have traditionally been building everything from scratch or waiting months for your vendor to add support.

dlt (data load tool) fills this gap. It’s a Python library (pip-installable, no containers or orchestration required) that turns API data into warehouse tables. For custom APIs, dlt provides two approaches: RESTClient for granular control and REST API Source for speed. Knowing when to use each determines whether your pipeline takes hours or days to build.

Two Approaches to Building API Sources

dlt gives you an imperative option and a declarative option for API sources. They solve the same problem differently.

RESTClient is the lower-level approach. You write Python code that explicitly handles requests, processes responses, and yields data. It gives you fine-grained control over every aspect of the API interaction.

REST API Source is configuration-driven. You define a dictionary describing the API’s structure (endpoints, authentication, pagination), and dlt handles the rest. Less code, faster development, but less flexibility for edge cases.

AspectRESTClientREST API Source
StyleImperative PythonDeclarative config
Code volumeMoreLess
FlexibilityHighMedium
Learning curveSteeperGentler
Best forComplex auth, custom logicStandard REST patterns

Choose REST API Source when the API follows common patterns: JSON responses, standard pagination, straightforward authentication. Choose RESTClient when you need custom pagination logic, complex authentication flows, or fine-grained control over request/response handling.

Building with RESTClient

RESTClient wraps Python’s requests library with pagination and authentication handling built in. You instantiate it with configuration, then call paginate() to iterate through pages automatically.

from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import OffsetPaginator
client = RESTClient(
base_url="https://api.example.com/v1",
headers={"X-API-Version": "2024-01"},
paginator=OffsetPaginator(limit=100)
)
@dlt.resource(write_disposition="merge", primary_key="id")
def customers():
for page in client.paginate("/customers"):
yield page

The key parameters:

  • base_url: API root URL, shared across all endpoints
  • headers: Default headers sent with every request
  • auth: Authentication strategy (covered below)
  • paginator: How to handle multi-page responses
  • data_selector: JSONPath to the actual data in responses

The paginate() method does the heavy lifting. Pass it an endpoint path, and it yields pages until the API signals there’s no more data. Each page contains the parsed JSON response.

Mastering Pagination

Production APIs paginate their responses. Fetching the first page is easy; handling the next 500 pages reliably is where pipelines break.

dlt includes paginators for common patterns:

  • JSONLinkPaginator: Next page URL in the JSON body (common in modern APIs)
  • HeaderLinkPaginator: Next page URL in response headers (GitHub style)
  • OffsetPaginator: Classic offset/limit query parameters
  • PageNumberPaginator: Page number increments (page=1, page=2, …)
  • JSONResponseCursorPaginator: Cursor token in the response body
from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator
# For APIs that return {"data": [...], "next": "https://api.example.com/v1/items?cursor=abc123"}
client = RESTClient(
base_url="https://api.example.com/v1",
paginator=JSONLinkPaginator(next_url_path="next")
)

When APIs don’t follow standard patterns, extend BasePaginator and implement two methods: update_state() to parse the current response and update_request() to modify the next request. I’ve built custom paginators for APIs that encode pagination state in response headers using proprietary formats. dlt’s architecture handles this without fighting the framework.

Authentication Patterns

dlt supports the authentication methods you’ll encounter in practice:

from dlt.sources.helpers.rest_client.auth import (
BearerTokenAuth,
ApiKeyAuth,
HttpBasicAuth,
OAuth2ClientCredentials
)
# Bearer token (most common for modern APIs)
client = RESTClient(
base_url="https://api.example.com",
auth=BearerTokenAuth(token=dlt.secrets.value)
)
# API key in header
client = RESTClient(
base_url="https://api.example.com",
auth=ApiKeyAuth(name="X-API-Key", api_key=dlt.secrets.value, location="header")
)
# OAuth2 client credentials flow
client = RESTClient(
base_url="https://api.example.com",
auth=OAuth2ClientCredentials(
access_token_url="https://api.example.com/oauth/token",
client_id=dlt.secrets["client_id"],
client_secret=dlt.secrets["client_secret"]
)
)

For APIs with non-standard OAuth flows (refresh token rotation, custom grant types), extend these base classes. Implement the methods that differ from standard OAuth and inherit everything else.

The Declarative Approach: REST API Source

For standard APIs, REST API Source eliminates boilerplate. You describe the API’s structure in a configuration dictionary, and dlt generates the pipeline.

import dlt
from dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://api.example.com/v1",
"auth": {
"type": "bearer",
"token": dlt.secrets["api_token"]
}
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge"
},
"resources": [
{
"name": "customers",
"endpoint": {
"path": "customers",
"params": {"status": "active"}
}
},
{
"name": "orders",
"endpoint": {
"path": "orders",
"paginator": {
"type": "offset",
"limit": 100
}
}
}
]
}
source = rest_api_source(config)
pipeline = dlt.pipeline(destination="bigquery", dataset_name="api_data")
pipeline.run(source)

The configuration structure has three parts:

  • client: Base URL, authentication, default headers
  • resource_defaults: Shared settings for all resources (primary key, write disposition)
  • resources: List of endpoints with their specific configuration

REST API Source handles pagination and authentication automatically based on your configuration. It also unnests nested JSON data into relational tables, something you’d have to handle manually with RESTClient. For a step-by-step walkthrough of the REST API Source, see the hands-on dlt guide.

Incremental Loading

Full table refreshes don’t scale. Once you’re past a few thousand records, you need incremental loading: fetch only what changed since the last run.

dlt’s incremental() function tracks cursor values between runs:

@dlt.resource(write_disposition="merge", primary_key="id")
def orders(updated_since=dlt.sources.incremental("updated_at", initial_value="2024-01-01")):
params = {"updated_after": updated_since.last_value}
for page in client.paginate("/orders", params=params):
yield page

On first run, dlt fetches records updated after the initial value. On subsequent runs, it uses the maximum updated_at value from the previous run. State is stored automatically, with no external database required.

Key parameters to understand:

  • cursor_path: JSONPath to the cursor field in each record
  • initial_value: Starting point for first run
  • last_value_func: Function to determine which cursor value to keep (default: max)
  • on_cursor_value_missing: What to do when a record lacks the cursor field (raise, include, or exclude)
  • lag: Seconds to subtract from cursor for attribution windows (handles late-arriving data)

For REST API Source, configure incremental loading in the endpoint definition:

{
"name": "orders",
"endpoint": {
"path": "orders",
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
}
}
}

Error Handling and Resilience

APIs fail, rate limits hit, and networks hiccup, but dlt handles common failure modes automatically.

For HTTP 429 (rate limit) responses, dlt respects Retry-After headers and implements exponential backoff. The default configuration retries up to 5 times with configurable delays:

client = RESTClient(
base_url="https://api.example.com",
request_backoff_factor=2, # Exponential backoff multiplier
request_max_retry_delay=300 # Max seconds between retries
)

REST API Source lets you configure behavior for specific HTTP status codes:

{
"endpoint": {
"path": "items",
"response_actions": [
{"status_code": 404, "action": "ignore"}, # Skip missing resources
{"status_code": 429, "action": "retry"} # Retry rate limits
]
}
}

Secrets Management

Hard-coding credentials is a security incident waiting to happen. dlt provides a configuration hierarchy that keeps secrets out of code.

Priority order (highest to lowest):

  1. Environment variables
  2. secrets.toml
  3. config.toml
  4. Vault integrations
  5. Default argument values

Environment variable naming uses double underscores as separators:

Terminal window
export SOURCES__MY_API__API_KEY="your-secret-key"
export DESTINATION__BIGQUERY__PROJECT_ID="your-project"

For local development, use secrets.toml:

[sources.my_api]
api_key = "your-secret-key"
[destination.bigquery]
project_id = "your-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."

Never commit secrets.toml to version control. In CI/CD pipelines, use environment variables or integrate with cloud secret managers like Google Cloud Secret Manager through dlt’s vault configuration.

Testing Your Pipelines

Test pipelines before they hit production. Just as a testing strategy is essential for dbt projects, dlt’s design makes pipeline testing straightforward.

For unit tests, run the pipeline against DuckDB instead of your production warehouse:

import dlt
import pytest
def test_customers_pipeline():
pipeline = dlt.pipeline(
destination="duckdb",
dataset_name="test_data"
)
# Run with limited data
source = my_api_source()
source.customers.add_limit(10) # Only fetch 10 records
load_info = pipeline.run(source)
# Query results
with pipeline.sql_client() as client:
result = client.execute_sql("SELECT COUNT(*) FROM customers")
assert result[0][0] == 10

For integration tests, extract and normalize without loading:

def test_schema_structure():
pipeline = dlt.pipeline(destination="duckdb", dataset_name="test")
source = my_api_source()
pipeline.extract(source)
pipeline.normalize()
schema = pipeline.default_schema
assert "customers" in schema.tables
assert "id" in schema.tables["customers"]["columns"]

Deployment Options

dlt runs anywhere Python runs. The dlt deploy command generates deployment configurations for common platforms.

GitHub Actions:

Terminal window
dlt deploy my_pipeline.py github-action --schedule "0 6 * * *"

This creates a workflow file that installs dependencies, configures secrets from GitHub repository secrets, and runs your pipeline on schedule.

Airflow/Google Composer:

Terminal window
dlt deploy my_pipeline.py airflow-composer --secrets-format env

The generated DAG uses dlt’s PipelineTasksGroup helper to create separate tasks per resource when you need parallelism.

Modal works well for serverless deployments:

import modal
import dlt
app = modal.App("my-dlt-pipeline")
@app.function(schedule=modal.Period(days=1))
def run_pipeline():
pipeline = dlt.pipeline(destination="bigquery", dataset_name="api_data")
pipeline.run(my_api_source())

Other options include Google Cloud Functions, Cloud Run, Dagster (which has native @dlt_assets integration), and Prefect.

Common Pitfalls and Debugging

Five issues I see repeatedly:

  1. Not testing locally first: Always run python my_pipeline.py before deploying. Most configuration errors surface immediately.

  2. Missing secrets configuration: dlt error messages tell you exactly which key is missing. Check the expected environment variable name.

  3. Incorrect pagination setup: Test with a small limit first. If you get only one page when you expect many, your paginator configuration is wrong.

  4. Schema conflicts with incremental loading: Use unique pipeline names when testing variations. State is stored per pipeline name.

  5. Memory issues with large datasets: Yield pages as you receive them instead of accumulating in memory. Generators are your friend.

For debugging, enable detailed logging:

config.toml
log_level = "INFO"

Use progress="log" for progress bars in logs, and dlt pipeline info to inspect load packages and state.

Wrapping Up

dlt gives you two paths to custom API pipelines. REST API Source gets you to production fast when APIs follow standard patterns. RESTClient provides the control you need when they don’t.

The library handles the infrastructure concerns (pagination, retries, schema evolution, incremental state) so you can focus on the API-specific logic. It’s the middle ground between expensive managed connectors and building everything yourself. Once your data lands in BigQuery, you can layer dbt transformations on top to build your analytics models.

For most new integrations, REST API Source is the faster path. RESTClient is there when you need it. Either way, you end up with a Python script you can version control, test, and deploy anywhere.