Run python my_pipeline.py before deploying. Most configuration errors — wrong endpoint path, mismatched paginator, missing secret — surface immediately on a local run. dlt’s design allows local testing without a production BigQuery project, a running Airflow instance, or cloud credentials.
Unit Tests with DuckDB
Run pipelines against DuckDB instead of your production warehouse. DuckDB is pip-installable, runs entirely in-process, and stores everything locally. Your pipeline code doesn’t change — only the destination:
import dltimport pytest
def test_customers_pipeline(): pipeline = dlt.pipeline( destination="duckdb", dataset_name="test_data" )
# Limit records to avoid hitting the real API for every test run source = my_api_source() source.customers.add_limit(10)
load_info = pipeline.run(source)
# Query results directly with pipeline.sql_client() as client: result = client.execute_sql("SELECT COUNT(*) FROM customers") assert result[0][0] == 10The add_limit() method caps how many records a resource yields — essential for tests that call real APIs. Without it, your test suite will slowly page through your entire production dataset, hit rate limits, and run for minutes or hours.
Use DuckDB as the destination for all unit tests. It’s fast (sub-second for small datasets), requires no credentials, and leaves no cloud costs behind. The pipeline state, schema, and data all live in a local .duckdb file that’s easy to inspect and delete.
Integration Tests for Schema Validation
Beyond verifying record counts, test that the schema matches what your downstream dbt models expect. dlt’s extract() and normalize() steps produce a schema that you can inspect without actually loading data to a destination:
def test_schema_structure(): pipeline = dlt.pipeline(destination="duckdb", dataset_name="test") source = my_api_source()
# Extract and normalize without loading pipeline.extract(source) pipeline.normalize()
schema = pipeline.default_schema assert "customers" in schema.tables assert "id" in schema.tables["customers"]["columns"] assert "email" in schema.tables["customers"]["columns"] assert "updated_at" in schema.tables["customers"]["columns"]This test doesn’t load any data. It verifies that the API response structure produces the schema you expect. Run this after API changes — if the vendor silently renames or removes a field, this test catches it before your dbt models fail with cryptic column-not-found errors.
Testing Incremental Loading
Incremental loading state can cause subtle issues during development. Key things to verify:
State isolation between test runs. Use unique pipeline names for each test, or explicitly clear state between runs:
def test_incremental_state(): pipeline = dlt.pipeline( pipeline_name=f"test_{uuid.uuid4().hex[:8]}", # unique name per test destination="duckdb", dataset_name="test" ) ...Without unique pipeline names, a previous test run’s cursor state persists and affects the next run. This manifests as “incremental pipeline returns no results” — because from dlt’s perspective, there’s nothing newer than the previous run’s cursor.
Verify the cursor advances. On a first run with known data, check that the recorded cursor value matches what you expect. Then simulate a second run and verify that fewer records come back:
def test_cursor_advances(): pipeline = dlt.pipeline(destination="duckdb", dataset_name="test")
# First run: should load records source = my_api_source() source.orders.add_limit(5) pipeline.run(source)
# Second run with same pipeline: should load 0 records # (assuming test data has no records newer than the first run's cursor) source2 = my_api_source() load_info = pipeline.run(source2) assert load_info.load_packages[0].jobs_count == 0Common Failure Modes
Five issues come up repeatedly when building dlt pipelines:
1. Missing secrets configuration. dlt’s error messages are specific: they tell you exactly which key was expected and in which format. Check the expected environment variable name against what you’ve set. See dlt Secrets Management for the naming conventions.
2. Incorrect pagination setup. Test with a small limit (limit=2) before trusting that pagination works. If you get exactly one page when you expect many, the paginator isn’t recognizing the “more pages” signal. Inspect a raw API response to find where the next-page indicator actually lives.
3. Schema conflicts with incremental loading. Use unique pipeline names when testing variations. State is stored per pipeline name — if you’re iterating on the schema while reusing the same pipeline name, you can accumulate state from previous test runs that makes new runs behave unexpectedly.
4. Memory issues with large datasets. Yield pages as you receive them rather than accumulating in memory. This is the default generator pattern, but if you’re doing any intermediate processing, ensure you’re not buffering the full dataset before yielding.
5. Not testing locally first. Deploy only after a successful local run. This seems obvious but the temptation to “just push it and see” is real — and the debugging cycle for a production deployment is 10x longer than catching the same issue locally.
Debugging Tools
Enable detailed logging in .dlt/config.toml:
log_level = "INFO"Use progress="log" for progress bars in non-interactive environments (CI/CD, server logs):
pipeline.run(source, progress="log")Inspect load packages and state with the CLI:
dlt pipeline info my_pipeline_nameThis shows the pipeline’s current state, loaded packages, and cursor values — useful when incremental loading produces unexpected results and you want to see what state dlt actually recorded.