ServicesÀ proposNotesContact Me contacter →
EN FR
Note

Tests de pipelines dlt

Tester les pipelines dlt localement avec DuckDB avant de toucher la production — tests unitaires avec des limites sur les ressources, tests d'intégration pour la validation du schéma et patterns de débogage courants.

Planté
dltdata engineeringetltesting

Exécutez python my_pipeline.py avant de déployer. La plupart des erreurs de configuration — mauvais chemin d’endpoint, paginateur mal configuré, secret manquant — apparaissent immédiatement lors d’une exécution locale. La conception de dlt permet les tests locaux sans projet BigQuery de production, instance Airflow en cours d’exécution ou credentials cloud.

Tests unitaires avec DuckDB

Exécutez les pipelines contre DuckDB plutôt que votre entrepôt de production. DuckDB s’installe via pip, s’exécute entièrement en processus et stocke tout localement. Votre code de pipeline ne change pas — seulement la destination :

import dlt
import pytest
def test_customers_pipeline():
pipeline = dlt.pipeline(
destination="duckdb",
dataset_name="test_data"
)
# Limiter les enregistrements pour éviter de frapper l'API réelle à chaque exécution de test
source = my_api_source()
source.customers.add_limit(10)
load_info = pipeline.run(source)
# Interroger les résultats directement
with pipeline.sql_client() as client:
result = client.execute_sql("SELECT COUNT(*) FROM customers")
assert result[0][0] == 10

La méthode add_limit() plafonne le nombre d’enregistrements qu’une ressource produit — essentielle pour les tests qui appellent des APIs réelles. Sans elle, votre suite de tests va lentement parcourir l’ensemble de votre dataset de production, atteindre les limites de débit et s’exécuter pendant des minutes ou des heures.

Utilisez DuckDB comme destination pour tous les tests unitaires. C’est rapide (sous-seconde pour les petits datasets), ne nécessite aucun credential et ne laisse aucun coût cloud. L’état, le schéma et les données du pipeline vivent tous dans un fichier .duckdb local facile à inspecter et supprimer.

Tests d’intégration pour la validation du schéma

Au-delà de la vérification du nombre d’enregistrements, testez que le schéma correspond à ce que vos modèles dbt en aval attendent. Les étapes extract() et normalize() de dlt produisent un schéma que vous pouvez inspecter sans réellement charger des données vers une destination :

def test_schema_structure():
pipeline = dlt.pipeline(destination="duckdb", dataset_name="test")
source = my_api_source()
# Extraire et normaliser sans charger
pipeline.extract(source)
pipeline.normalize()
schema = pipeline.default_schema
assert "customers" in schema.tables
assert "id" in schema.tables["customers"]["columns"]
assert "email" in schema.tables["customers"]["columns"]
assert "updated_at" in schema.tables["customers"]["columns"]

Ce test ne charge aucune donnée. Il vérifie que la structure de la réponse API produit le schéma attendu. Exécutez-le après les changements d’API — si le fournisseur renomme ou supprime silencieusement un champ, ce test le détecte avant que vos modèles dbt n’échouent avec des erreurs cryptiques de colonne introuvable.

Tester le chargement incrémental

L’état du chargement incrémental peut causer des problèmes subtils pendant le développement. Points clés à vérifier :

Isolation d’état entre les exécutions de test. Utilisez des noms de pipeline uniques pour chaque test, ou effacez explicitement l’état entre les exécutions :

def test_incremental_state():
pipeline = dlt.pipeline(
pipeline_name=f"test_{uuid.uuid4().hex[:8]}", # nom unique par test
destination="duckdb",
dataset_name="test"
)
...

Sans noms de pipeline uniques, l’état du curseur d’une exécution de test précédente persiste et affecte l’exécution suivante. Cela se manifeste par “le pipeline incrémental ne retourne aucun résultat” — parce que du point de vue de dlt, il n’y a rien de plus récent que le curseur de l’exécution précédente.

Vérifiez que le curseur avance. Sur une première exécution avec des données connues, vérifiez que la valeur du curseur enregistrée correspond à ce que vous attendez. Simulez ensuite une deuxième exécution et vérifiez que moins d’enregistrements reviennent :

def test_cursor_advances():
pipeline = dlt.pipeline(destination="duckdb", dataset_name="test")
# Première exécution : doit charger des enregistrements
source = my_api_source()
source.orders.add_limit(5)
pipeline.run(source)
# Deuxième exécution avec le même pipeline : doit charger 0 enregistrement
# (en supposant que les données de test n'ont pas d'enregistrements plus récents que le curseur de la première exécution)
source2 = my_api_source()
load_info = pipeline.run(source2)
assert load_info.load_packages[0].jobs_count == 0

Modes d’échec courants

Cinq problèmes apparaissent régulièrement lors de la construction de pipelines dlt :

1. Configuration de secrets manquante. Les messages d’erreur de dlt sont précis : ils vous indiquent exactement quelle clé était attendue et dans quel format. Vérifiez le nom de variable d’environnement attendu par rapport à ce que vous avez défini. Voir Gestion des secrets dlt pour les conventions de nommage.

2. Configuration de pagination incorrecte. Testez avec une petite limite (limit=2) avant de faire confiance au fonctionnement de la pagination. Si vous obtenez exactement une page quand vous en attendez plusieurs, le paginateur ne reconnaît pas le signal “il y a d’autres pages”. Inspectez une réponse API brute pour trouver où se trouve réellement l’indicateur de page suivante.

3. Conflits de schéma avec le chargement incrémental. Utilisez des noms de pipeline uniques lors du test des variations. L’état est stocké par nom de pipeline — si vous itérez sur le schéma tout en réutilisant le même nom de pipeline, vous pouvez accumuler des états des exécutions de test précédentes qui font que les nouvelles exécutions se comportent de manière inattendue.

4. Problèmes de mémoire avec les grands datasets. Produisez des pages au fur et à mesure de leur réception plutôt que de les accumuler en mémoire. C’est le pattern générateur par défaut, mais si vous faites un traitement intermédiaire, assurez-vous de ne pas tamponner l’ensemble du dataset avant de produire.

5. Ne pas tester localement en premier. Déployez uniquement après une exécution locale réussie. Cela semble évident mais la tentation de “juste pousser et voir” est réelle — et le cycle de débogage pour un déploiement en production est 10x plus long que de détecter le même problème localement.

Outils de débogage

Activez la journalisation détaillée dans .dlt/config.toml :

log_level = "INFO"

Utilisez progress="log" pour les barres de progression dans les environnements non interactifs (CI/CD, logs serveur) :

pipeline.run(source, progress="log")

Inspectez les packages de chargement et l’état avec le CLI :

Terminal window
dlt pipeline info my_pipeline_name

Cela affiche l’état actuel du pipeline, les packages chargés et les valeurs de curseur — utile quand le chargement incrémental produit des résultats inattendus et que vous voulez voir quel état dlt a réellement enregistré.