Toute équipe data finit par se heurter au même mur : une API importante pour le business, mais aucun connecteur disponible. Les options se résument traditionnellement à tout construire de zéro ou attendre des mois que votre éditeur ajoute le support.
dlt (data load tool) comble ce vide. C’est une bibliothèque Python (installable via pip, sans conteneurs ni orchestration) qui transforme les données d’API en tables dans votre data warehouse. Pour les API personnalisées, dlt propose deux approches : RESTClient pour un contrôle granulaire et REST API Source pour la rapidité. Savoir quand utiliser l’une ou l’autre détermine si votre pipeline prend des heures ou des jours à construire.
Deux approches pour construire des sources API
dlt offre une option impérative et une option déclarative pour les sources API. Elles résolvent le même problème différemment.
RESTClient est l’approche bas niveau. Vous écrivez du code Python qui gère explicitement les requêtes, traite les réponses et produit les données. Vous gardez un contrôle fin sur chaque aspect de l’interaction avec l’API.
REST API Source est piloté par la configuration. Vous définissez un dictionnaire décrivant la structure de l’API (endpoints, authentification, pagination), et dlt s’occupe du reste. Moins de code, un développement plus rapide, mais moins de flexibilité pour les cas particuliers.
| Aspect | RESTClient | REST API Source |
|---|---|---|
| Style | Python impératif | Config déclarative |
| Volume de code | Plus | Moins |
| Flexibilité | Élevée | Moyenne |
| Courbe d’apprentissage | Plus raide | Plus douce |
| Idéal pour | Auth complexe, logique custom | Patterns REST standards |
Choisissez REST API Source quand l’API suit des patterns courants : réponses JSON, pagination standard, authentification classique. Choisissez RESTClient quand vous avez besoin d’une logique de pagination personnalisée, de flux d’authentification complexes ou d’un contrôle fin sur la gestion des requêtes et réponses.
Construire avec RESTClient
RESTClient encapsule la bibliothèque requests de Python avec la gestion de la pagination et de l’authentification intégrée. Vous l’instanciez avec une configuration, puis appelez paginate() pour itérer automatiquement sur les pages.
from dlt.sources.helpers.rest_client import RESTClientfrom dlt.sources.helpers.rest_client.paginators import OffsetPaginator
client = RESTClient( base_url="https://api.example.com/v1", headers={"X-API-Version": "2024-01"}, paginator=OffsetPaginator(limit=100))
@dlt.resource(write_disposition="merge", primary_key="id")def customers(): for page in client.paginate("/customers"): yield pageLes paramètres clés :
base_url: URL racine de l’API, partagée entre tous les endpointsheaders: En-têtes par défaut envoyés avec chaque requêteauth: Stratégie d’authentification (détaillée ci-dessous)paginator: Comment gérer les réponses multi-pagesdata_selector: JSONPath vers les données effectives dans les réponses
La méthode paginate() fait le gros du travail. Passez-lui un chemin d’endpoint, et elle produit des pages jusqu’à ce que l’API signale qu’il n’y a plus de données. Chaque page contient la réponse JSON parsée.
Maîtriser la pagination
Les API en production paginent leurs réponses. Récupérer la première page est simple ; gérer les 500 pages suivantes de manière fiable, c’est là que les pipelines cassent.
dlt inclut des paginateurs pour les patterns courants :
- JSONLinkPaginator : URL de la page suivante dans le corps JSON (courant dans les API modernes)
- HeaderLinkPaginator : URL de la page suivante dans les en-têtes de réponse (style GitHub)
- OffsetPaginator : Paramètres classiques offset/limit
- PageNumberPaginator : Incrémentation du numéro de page (page=1, page=2, …)
- JSONResponseCursorPaginator : Token curseur dans le corps de la réponse
from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator
# Pour les API qui retournent {"data": [...], "next": "https://api.example.com/v1/items?cursor=abc123"}client = RESTClient( base_url="https://api.example.com/v1", paginator=JSONLinkPaginator(next_url_path="next"))Quand les API ne suivent pas les patterns standards, étendez BasePaginator et implémentez deux méthodes : update_state() pour parser la réponse courante et update_request() pour modifier la requête suivante. J’ai construit des paginateurs personnalisés pour des API qui encodent l’état de pagination dans les en-têtes de réponse avec des formats propriétaires. L’architecture de dlt gère ce cas sans se battre contre le framework.
Patterns d’authentification
dlt supporte les méthodes d’authentification que vous rencontrerez en pratique :
from dlt.sources.helpers.rest_client.auth import ( BearerTokenAuth, ApiKeyAuth, HttpBasicAuth, OAuth2ClientCredentials)
# Bearer token (le plus courant pour les API modernes)client = RESTClient( base_url="https://api.example.com", auth=BearerTokenAuth(token=dlt.secrets.value))
# Clé API dans l'en-têteclient = RESTClient( base_url="https://api.example.com", auth=ApiKeyAuth(name="X-API-Key", api_key=dlt.secrets.value, location="header"))
# Flux OAuth2 client credentialsclient = RESTClient( base_url="https://api.example.com", auth=OAuth2ClientCredentials( access_token_url="https://api.example.com/oauth/token", client_id=dlt.secrets["client_id"], client_secret=dlt.secrets["client_secret"] ))Pour les API avec des flux OAuth non standards (rotation de refresh token, types de grant personnalisés), étendez ces classes de base. Implémentez les méthodes qui diffèrent du OAuth standard et héritez de tout le reste.
L’approche déclarative : REST API Source
Pour les API standards, REST API Source élimine le boilerplate. Vous décrivez la structure de l’API dans un dictionnaire de configuration, et dlt génère le pipeline.
import dltfrom dlt.sources.rest_api import rest_api_source
config = { "client": { "base_url": "https://api.example.com/v1", "auth": { "type": "bearer", "token": dlt.secrets["api_token"] } }, "resource_defaults": { "primary_key": "id", "write_disposition": "merge" }, "resources": [ { "name": "customers", "endpoint": { "path": "customers", "params": {"status": "active"} } }, { "name": "orders", "endpoint": { "path": "orders", "paginator": { "type": "offset", "limit": 100 } } } ]}
source = rest_api_source(config)pipeline = dlt.pipeline(destination="bigquery", dataset_name="api_data")pipeline.run(source)La structure de configuration comporte trois parties :
- client : URL de base, authentification, en-têtes par défaut
- resource_defaults : Paramètres partagés pour toutes les ressources (clé primaire, write disposition)
- resources : Liste des endpoints avec leur configuration spécifique
REST API Source gère automatiquement la pagination et l’authentification selon votre configuration. Il unneste aussi les données JSON imbriquées en tables relationnelles, ce que vous devriez gérer manuellement avec RESTClient. Pour un tutoriel pas à pas de REST API Source, consultez le guide pratique de dlt.
Chargement incrémental
Les rafraîchissements complets ne passent pas à l’échelle. Au-delà de quelques milliers d’enregistrements, il faut du chargement incrémental : ne récupérer que ce qui a changé depuis la dernière exécution.
La fonction incremental() de dlt suit les valeurs de curseur entre les exécutions :
@dlt.resource(write_disposition="merge", primary_key="id")def orders(updated_since=dlt.sources.incremental("updated_at", initial_value="2024-01-01")): params = {"updated_after": updated_since.last_value} for page in client.paginate("/orders", params=params): yield pageÀ la première exécution, dlt récupère les enregistrements mis à jour après la valeur initiale. Aux exécutions suivantes, il utilise la valeur maximale de updated_at de l’exécution précédente. L’état est stocké automatiquement, sans base de données externe.
Les paramètres clés à connaître :
cursor_path: JSONPath vers le champ curseur dans chaque enregistrementinitial_value: Point de départ pour la première exécutionlast_value_func: Fonction pour déterminer quelle valeur de curseur conserver (par défaut :max)on_cursor_value_missing: Comportement quand un enregistrement n’a pas le champ curseur (raise,includeouexclude)lag: Secondes à soustraire du curseur pour les fenêtres d’attribution (gère les données en retard)
Avec REST API Source, configurez le chargement incrémental dans la définition de l’endpoint :
{ "name": "orders", "endpoint": { "path": "orders", "incremental": { "cursor_path": "updated_at", "initial_value": "2024-01-01T00:00:00Z" } }}Gestion des erreurs et résilience
Les API tombent, les rate limits se déclenchent et le réseau a des ratés, mais dlt gère automatiquement les modes de défaillance courants.
Pour les réponses HTTP 429 (rate limit), dlt respecte les en-têtes Retry-After et implémente un backoff exponentiel. La configuration par défaut retente jusqu’à 5 fois avec des délais configurables :
client = RESTClient( base_url="https://api.example.com", request_backoff_factor=2, # Multiplicateur de backoff exponentiel request_max_retry_delay=300 # Secondes max entre les tentatives)REST API Source permet de configurer le comportement pour des codes HTTP spécifiques :
{ "endpoint": { "path": "items", "response_actions": [ {"status_code": 404, "action": "ignore"}, # Ignorer les ressources manquantes {"status_code": 429, "action": "retry"} # Retenter sur rate limit ] }}Gestion des secrets
Coder les identifiants en dur, c’est s’exposer à un incident de sécurité tôt ou tard. dlt fournit une hiérarchie de configuration qui garde les secrets en dehors du code.
Ordre de priorité (du plus élevé au plus bas) :
- Variables d’environnement
secrets.tomlconfig.toml- Intégrations vault
- Valeurs par défaut des arguments
Le nommage des variables d’environnement utilise des doubles underscores comme séparateurs :
export SOURCES__MY_API__API_KEY="your-secret-key"export DESTINATION__BIGQUERY__PROJECT_ID="your-project"Pour le développement local, utilisez secrets.toml :
[sources.my_api]api_key = "your-secret-key"
[destination.bigquery]project_id = "your-project"private_key = "-----BEGIN PRIVATE KEY-----\n..."Ne commitez jamais secrets.toml dans le contrôle de version. Dans les pipelines CI/CD, utilisez des variables d’environnement ou intégrez des gestionnaires de secrets cloud comme Google Cloud Secret Manager via la configuration vault de dlt.
Tester vos pipelines
Testez vos pipelines avant de les envoyer en production. Tout comme une stratégie de tests est essentielle pour les projets dbt, l’architecture de dlt rend le test des pipelines simple.
Pour les tests unitaires, exécutez le pipeline sur DuckDB au lieu de votre data warehouse de production :
import dltimport pytest
def test_customers_pipeline(): pipeline = dlt.pipeline( destination="duckdb", dataset_name="test_data" )
# Exécuter avec des données limitées source = my_api_source() source.customers.add_limit(10) # Ne récupérer que 10 enregistrements
load_info = pipeline.run(source)
# Interroger les résultats with pipeline.sql_client() as client: result = client.execute_sql("SELECT COUNT(*) FROM customers") assert result[0][0] == 10Pour les tests d’intégration, extrayez et normalisez sans charger :
def test_schema_structure(): pipeline = dlt.pipeline(destination="duckdb", dataset_name="test") source = my_api_source()
pipeline.extract(source) pipeline.normalize()
schema = pipeline.default_schema assert "customers" in schema.tables assert "id" in schema.tables["customers"]["columns"]Options de déploiement
dlt fonctionne partout où Python fonctionne. La commande dlt deploy génère des configurations de déploiement pour les plateformes courantes.
GitHub Actions :
dlt deploy my_pipeline.py github-action --schedule "0 6 * * *"Cela crée un fichier workflow qui installe les dépendances, configure les secrets depuis les secrets du dépôt GitHub et exécute votre pipeline selon le planning.
Airflow/Google Composer :
dlt deploy my_pipeline.py airflow-composer --secrets-format envLe DAG généré utilise le helper PipelineTasksGroup de dlt pour créer des tâches séparées par ressource quand vous avez besoin de parallélisme.
Modal fonctionne bien pour les déploiements serverless :
import modalimport dlt
app = modal.App("my-dlt-pipeline")
@app.function(schedule=modal.Period(days=1))def run_pipeline(): pipeline = dlt.pipeline(destination="bigquery", dataset_name="api_data") pipeline.run(my_api_source())Parmi les autres options : Google Cloud Functions, Cloud Run, Dagster (qui a une intégration native @dlt_assets) et Prefect.
Pièges courants et débogage
Cinq problèmes que je rencontre régulièrement :
-
Ne pas tester localement d’abord : exécutez toujours
python my_pipeline.pyavant de déployer. La plupart des erreurs de configuration apparaissent immédiatement. -
Configuration des secrets manquante : les messages d’erreur de dlt vous indiquent précisément quelle clé manque. Vérifiez le nom attendu de la variable d’environnement.
-
Mauvaise configuration de la pagination : testez d’abord avec une petite limite. Si vous n’obtenez qu’une seule page alors que vous en attendez plusieurs, votre configuration du paginateur est incorrecte.
-
Conflits de schéma avec le chargement incrémental : utilisez des noms de pipeline uniques quand vous testez des variantes. L’état est stocké par nom de pipeline.
-
Problèmes de mémoire avec les gros volumes : produisez les pages au fur et à mesure au lieu de les accumuler en mémoire. Les générateurs sont vos alliés.
Pour le débogage, activez les logs détaillés :
log_level = "INFO"Utilisez progress="log" pour les barres de progression dans les logs, et dlt pipeline info pour inspecter les packages de chargement et l’état.
Pour conclure
dlt offre deux chemins vers des pipelines API personnalisés. REST API Source vous amène rapidement en production quand les API suivent des patterns standards. RESTClient fournit le contrôle nécessaire quand ce n’est pas le cas.
La bibliothèque gère les préoccupations d’infrastructure (pagination, retries, évolution du schéma, état incrémental) pour que vous puissiez vous concentrer sur la logique spécifique à l’API. C’est le juste milieu entre les connecteurs managés coûteux et tout construire soi-même. Une fois vos données chargées dans BigQuery, vous pouvez ajouter des transformations dbt par-dessus pour construire vos modèles analytiques.
Pour la plupart des nouvelles intégrations, REST API Source est la voie la plus rapide. RESTClient est là quand vous en avez besoin. Dans les deux cas, vous obtenez un script Python versionnable, testable et déployable n’importe où.