Every data team eventually hits the same wall: an API that matters to the business, but no connector exists for it. Your options have traditionally been building everything from scratch or waiting months for your vendor to add support.
dlt (data load tool) fills this gap. It’s a Python library (pip-installable, no containers or orchestration required) that turns API data into warehouse tables. For custom APIs, dlt provides two approaches: RESTClient for granular control and REST API Source for speed. Knowing when to use each determines whether your pipeline takes hours or days to build.
Two Approaches to Building API Sources
dlt gives you an imperative option and a declarative option for API sources. They solve the same problem differently.
RESTClient is the lower-level approach. You write Python code that explicitly handles requests, processes responses, and yields data. It gives you fine-grained control over every aspect of the API interaction.
REST API Source is configuration-driven. You define a dictionary describing the API’s structure (endpoints, authentication, pagination), and dlt handles the rest. Less code, faster development, but less flexibility for edge cases.
| Aspect | RESTClient | REST API Source |
|---|---|---|
| Style | Imperative Python | Declarative config |
| Code volume | More | Less |
| Flexibility | High | Medium |
| Learning curve | Steeper | Gentler |
| Best for | Complex auth, custom logic | Standard REST patterns |
Choose REST API Source when the API follows common patterns: JSON responses, standard pagination, straightforward authentication. Choose RESTClient when you need custom pagination logic, complex authentication flows, or fine-grained control over request/response handling.
Building with RESTClient
RESTClient wraps Python’s requests library with pagination and authentication handling built in. You instantiate it with configuration, then call paginate() to iterate through pages automatically.
from dlt.sources.helpers.rest_client import RESTClientfrom dlt.sources.helpers.rest_client.paginators import OffsetPaginator
client = RESTClient( base_url="https://api.example.com/v1", headers={"X-API-Version": "2024-01"}, paginator=OffsetPaginator(limit=100))
@dlt.resource(write_disposition="merge", primary_key="id")def customers(): for page in client.paginate("/customers"): yield pageThe key parameters:
base_url: API root URL, shared across all endpointsheaders: Default headers sent with every requestauth: Authentication strategy (covered below)paginator: How to handle multi-page responsesdata_selector: JSONPath to the actual data in responses
The paginate() method does the heavy lifting. Pass it an endpoint path, and it yields pages until the API signals there’s no more data. Each page contains the parsed JSON response.
Mastering Pagination
Production APIs paginate their responses. Fetching the first page is easy; handling the next 500 pages reliably is where pipelines break.
dlt includes paginators for common patterns:
- JSONLinkPaginator: Next page URL in the JSON body (common in modern APIs)
- HeaderLinkPaginator: Next page URL in response headers (GitHub style)
- OffsetPaginator: Classic offset/limit query parameters
- PageNumberPaginator: Page number increments (page=1, page=2, …)
- JSONResponseCursorPaginator: Cursor token in the response body
from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator
# For APIs that return {"data": [...], "next": "https://api.example.com/v1/items?cursor=abc123"}client = RESTClient( base_url="https://api.example.com/v1", paginator=JSONLinkPaginator(next_url_path="next"))When APIs don’t follow standard patterns, extend BasePaginator and implement two methods: update_state() to parse the current response and update_request() to modify the next request. I’ve built custom paginators for APIs that encode pagination state in response headers using proprietary formats. dlt’s architecture handles this without fighting the framework.
Authentication Patterns
dlt supports the authentication methods you’ll encounter in practice:
from dlt.sources.helpers.rest_client.auth import ( BearerTokenAuth, ApiKeyAuth, HttpBasicAuth, OAuth2ClientCredentials)
# Bearer token (most common for modern APIs)client = RESTClient( base_url="https://api.example.com", auth=BearerTokenAuth(token=dlt.secrets.value))
# API key in headerclient = RESTClient( base_url="https://api.example.com", auth=ApiKeyAuth(name="X-API-Key", api_key=dlt.secrets.value, location="header"))
# OAuth2 client credentials flowclient = RESTClient( base_url="https://api.example.com", auth=OAuth2ClientCredentials( access_token_url="https://api.example.com/oauth/token", client_id=dlt.secrets["client_id"], client_secret=dlt.secrets["client_secret"] ))For APIs with non-standard OAuth flows (refresh token rotation, custom grant types), extend these base classes. Implement the methods that differ from standard OAuth and inherit everything else.
The Declarative Approach: REST API Source
For standard APIs, REST API Source eliminates boilerplate. You describe the API’s structure in a configuration dictionary, and dlt generates the pipeline.
import dltfrom dlt.sources.rest_api import rest_api_source
config = { "client": { "base_url": "https://api.example.com/v1", "auth": { "type": "bearer", "token": dlt.secrets["api_token"] } }, "resource_defaults": { "primary_key": "id", "write_disposition": "merge" }, "resources": [ { "name": "customers", "endpoint": { "path": "customers", "params": {"status": "active"} } }, { "name": "orders", "endpoint": { "path": "orders", "paginator": { "type": "offset", "limit": 100 } } } ]}
source = rest_api_source(config)pipeline = dlt.pipeline(destination="bigquery", dataset_name="api_data")pipeline.run(source)The configuration structure has three parts:
- client: Base URL, authentication, default headers
- resource_defaults: Shared settings for all resources (primary key, write disposition)
- resources: List of endpoints with their specific configuration
REST API Source handles pagination and authentication automatically based on your configuration. It also unnests nested JSON data into relational tables, something you’d have to handle manually with RESTClient. For a step-by-step walkthrough of the REST API Source, see the hands-on dlt guide.
Incremental Loading
Full table refreshes don’t scale. Once you’re past a few thousand records, you need incremental loading: fetch only what changed since the last run.
dlt’s incremental() function tracks cursor values between runs:
@dlt.resource(write_disposition="merge", primary_key="id")def orders(updated_since=dlt.sources.incremental("updated_at", initial_value="2024-01-01")): params = {"updated_after": updated_since.last_value} for page in client.paginate("/orders", params=params): yield pageOn first run, dlt fetches records updated after the initial value. On subsequent runs, it uses the maximum updated_at value from the previous run. State is stored automatically, with no external database required.
Key parameters to understand:
cursor_path: JSONPath to the cursor field in each recordinitial_value: Starting point for first runlast_value_func: Function to determine which cursor value to keep (default:max)on_cursor_value_missing: What to do when a record lacks the cursor field (raise,include, orexclude)lag: Seconds to subtract from cursor for attribution windows (handles late-arriving data)
For REST API Source, configure incremental loading in the endpoint definition:
{ "name": "orders", "endpoint": { "path": "orders", "incremental": { "cursor_path": "updated_at", "initial_value": "2024-01-01T00:00:00Z" } }}Error Handling and Resilience
APIs fail, rate limits hit, and networks hiccup, but dlt handles common failure modes automatically.
For HTTP 429 (rate limit) responses, dlt respects Retry-After headers and implements exponential backoff. The default configuration retries up to 5 times with configurable delays:
client = RESTClient( base_url="https://api.example.com", request_backoff_factor=2, # Exponential backoff multiplier request_max_retry_delay=300 # Max seconds between retries)REST API Source lets you configure behavior for specific HTTP status codes:
{ "endpoint": { "path": "items", "response_actions": [ {"status_code": 404, "action": "ignore"}, # Skip missing resources {"status_code": 429, "action": "retry"} # Retry rate limits ] }}Secrets Management
Hard-coding credentials is a security incident waiting to happen. dlt provides a configuration hierarchy that keeps secrets out of code.
Priority order (highest to lowest):
- Environment variables
secrets.tomlconfig.toml- Vault integrations
- Default argument values
Environment variable naming uses double underscores as separators:
export SOURCES__MY_API__API_KEY="your-secret-key"export DESTINATION__BIGQUERY__PROJECT_ID="your-project"For local development, use secrets.toml:
[sources.my_api]api_key = "your-secret-key"
[destination.bigquery]project_id = "your-project"private_key = "-----BEGIN PRIVATE KEY-----\n..."Never commit secrets.toml to version control. In CI/CD pipelines, use environment variables or integrate with cloud secret managers like Google Cloud Secret Manager through dlt’s vault configuration.
Testing Your Pipelines
Test pipelines before they hit production. Just as a testing strategy is essential for dbt projects, dlt’s design makes pipeline testing straightforward.
For unit tests, run the pipeline against DuckDB instead of your production warehouse:
import dltimport pytest
def test_customers_pipeline(): pipeline = dlt.pipeline( destination="duckdb", dataset_name="test_data" )
# Run with limited data source = my_api_source() source.customers.add_limit(10) # Only fetch 10 records
load_info = pipeline.run(source)
# Query results with pipeline.sql_client() as client: result = client.execute_sql("SELECT COUNT(*) FROM customers") assert result[0][0] == 10For integration tests, extract and normalize without loading:
def test_schema_structure(): pipeline = dlt.pipeline(destination="duckdb", dataset_name="test") source = my_api_source()
pipeline.extract(source) pipeline.normalize()
schema = pipeline.default_schema assert "customers" in schema.tables assert "id" in schema.tables["customers"]["columns"]Deployment Options
dlt runs anywhere Python runs. The dlt deploy command generates deployment configurations for common platforms.
GitHub Actions:
dlt deploy my_pipeline.py github-action --schedule "0 6 * * *"This creates a workflow file that installs dependencies, configures secrets from GitHub repository secrets, and runs your pipeline on schedule.
Airflow/Google Composer:
dlt deploy my_pipeline.py airflow-composer --secrets-format envThe generated DAG uses dlt’s PipelineTasksGroup helper to create separate tasks per resource when you need parallelism.
Modal works well for serverless deployments:
import modalimport dlt
app = modal.App("my-dlt-pipeline")
@app.function(schedule=modal.Period(days=1))def run_pipeline(): pipeline = dlt.pipeline(destination="bigquery", dataset_name="api_data") pipeline.run(my_api_source())Other options include Google Cloud Functions, Cloud Run, Dagster (which has native @dlt_assets integration), and Prefect.
Common Pitfalls and Debugging
Five issues I see repeatedly:
-
Not testing locally first: Always run
python my_pipeline.pybefore deploying. Most configuration errors surface immediately. -
Missing secrets configuration: dlt error messages tell you exactly which key is missing. Check the expected environment variable name.
-
Incorrect pagination setup: Test with a small limit first. If you get only one page when you expect many, your paginator configuration is wrong.
-
Schema conflicts with incremental loading: Use unique pipeline names when testing variations. State is stored per pipeline name.
-
Memory issues with large datasets: Yield pages as you receive them instead of accumulating in memory. Generators are your friend.
For debugging, enable detailed logging:
log_level = "INFO"Use progress="log" for progress bars in logs, and dlt pipeline info to inspect load packages and state.
Wrapping Up
dlt gives you two paths to custom API pipelines. REST API Source gets you to production fast when APIs follow standard patterns. RESTClient provides the control you need when they don’t.
The library handles the infrastructure concerns (pagination, retries, schema evolution, incremental state) so you can focus on the API-specific logic. It’s the middle ground between expensive managed connectors and building everything yourself. Once your data lands in BigQuery, you can layer dbt transformations on top to build your analytics models.
For most new integrations, REST API Source is the faster path. RESTClient is there when you need it. Either way, you end up with a Python script you can version control, test, and deploy anywhere.