Construire des pipelines API personnalisés avec dlt : de REST à BigQuery

Toute équipe data finit par se heurter au même mur : une API importante pour le business, mais aucun connecteur disponible. Les options se résument traditionnellement à tout construire de zéro ou attendre des mois que votre éditeur ajoute le support.

dlt (data load tool) comble ce vide. C’est une bibliothèque Python (installable via pip, sans conteneurs ni orchestration) qui transforme les données d’API en tables dans votre data warehouse. Pour les API personnalisées, dlt propose deux approches : RESTClient pour un contrôle granulaire et REST API Source pour la rapidité. Savoir quand utiliser l’une ou l’autre détermine si votre pipeline prend des heures ou des jours à construire.

Deux approches pour construire des sources API

dlt offre une option impérative et une option déclarative pour les sources API. Elles résolvent le même problème différemment.

RESTClient est l’approche bas niveau. Vous écrivez du code Python qui gère explicitement les requêtes, traite les réponses et produit les données. Vous gardez un contrôle fin sur chaque aspect de l’interaction avec l’API.

REST API Source est piloté par la configuration. Vous définissez un dictionnaire décrivant la structure de l’API (endpoints, authentification, pagination), et dlt s’occupe du reste. Moins de code, un développement plus rapide, mais moins de flexibilité pour les cas particuliers.

AspectRESTClientREST API Source
StylePython impératifConfig déclarative
Volume de codePlusMoins
FlexibilitéÉlevéeMoyenne
Courbe d’apprentissagePlus raidePlus douce
Idéal pourAuth complexe, logique customPatterns REST standards

Choisissez REST API Source quand l’API suit des patterns courants : réponses JSON, pagination standard, authentification classique. Choisissez RESTClient quand vous avez besoin d’une logique de pagination personnalisée, de flux d’authentification complexes ou d’un contrôle fin sur la gestion des requêtes et réponses.

Construire avec RESTClient

RESTClient encapsule la bibliothèque requests de Python avec la gestion de la pagination et de l’authentification intégrée. Vous l’instanciez avec une configuration, puis appelez paginate() pour itérer automatiquement sur les pages.

from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import OffsetPaginator
client = RESTClient(
base_url="https://api.example.com/v1",
headers={"X-API-Version": "2024-01"},
paginator=OffsetPaginator(limit=100)
)
@dlt.resource(write_disposition="merge", primary_key="id")
def customers():
for page in client.paginate("/customers"):
yield page

Les paramètres clés :

  • base_url : URL racine de l’API, partagée entre tous les endpoints
  • headers : En-têtes par défaut envoyés avec chaque requête
  • auth : Stratégie d’authentification (détaillée ci-dessous)
  • paginator : Comment gérer les réponses multi-pages
  • data_selector : JSONPath vers les données effectives dans les réponses

La méthode paginate() fait le gros du travail. Passez-lui un chemin d’endpoint, et elle produit des pages jusqu’à ce que l’API signale qu’il n’y a plus de données. Chaque page contient la réponse JSON parsée.

Maîtriser la pagination

Les API en production paginent leurs réponses. Récupérer la première page est simple ; gérer les 500 pages suivantes de manière fiable, c’est là que les pipelines cassent.

dlt inclut des paginateurs pour les patterns courants :

  • JSONLinkPaginator : URL de la page suivante dans le corps JSON (courant dans les API modernes)
  • HeaderLinkPaginator : URL de la page suivante dans les en-têtes de réponse (style GitHub)
  • OffsetPaginator : Paramètres classiques offset/limit
  • PageNumberPaginator : Incrémentation du numéro de page (page=1, page=2, …)
  • JSONResponseCursorPaginator : Token curseur dans le corps de la réponse
from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator
# Pour les API qui retournent {"data": [...], "next": "https://api.example.com/v1/items?cursor=abc123"}
client = RESTClient(
base_url="https://api.example.com/v1",
paginator=JSONLinkPaginator(next_url_path="next")
)

Quand les API ne suivent pas les patterns standards, étendez BasePaginator et implémentez deux méthodes : update_state() pour parser la réponse courante et update_request() pour modifier la requête suivante. J’ai construit des paginateurs personnalisés pour des API qui encodent l’état de pagination dans les en-têtes de réponse avec des formats propriétaires. L’architecture de dlt gère ce cas sans se battre contre le framework.

Patterns d’authentification

dlt supporte les méthodes d’authentification que vous rencontrerez en pratique :

from dlt.sources.helpers.rest_client.auth import (
BearerTokenAuth,
ApiKeyAuth,
HttpBasicAuth,
OAuth2ClientCredentials
)
# Bearer token (le plus courant pour les API modernes)
client = RESTClient(
base_url="https://api.example.com",
auth=BearerTokenAuth(token=dlt.secrets.value)
)
# Clé API dans l'en-tête
client = RESTClient(
base_url="https://api.example.com",
auth=ApiKeyAuth(name="X-API-Key", api_key=dlt.secrets.value, location="header")
)
# Flux OAuth2 client credentials
client = RESTClient(
base_url="https://api.example.com",
auth=OAuth2ClientCredentials(
access_token_url="https://api.example.com/oauth/token",
client_id=dlt.secrets["client_id"],
client_secret=dlt.secrets["client_secret"]
)
)

Pour les API avec des flux OAuth non standards (rotation de refresh token, types de grant personnalisés), étendez ces classes de base. Implémentez les méthodes qui diffèrent du OAuth standard et héritez de tout le reste.

L’approche déclarative : REST API Source

Pour les API standards, REST API Source élimine le boilerplate. Vous décrivez la structure de l’API dans un dictionnaire de configuration, et dlt génère le pipeline.

import dlt
from dlt.sources.rest_api import rest_api_source
config = {
"client": {
"base_url": "https://api.example.com/v1",
"auth": {
"type": "bearer",
"token": dlt.secrets["api_token"]
}
},
"resource_defaults": {
"primary_key": "id",
"write_disposition": "merge"
},
"resources": [
{
"name": "customers",
"endpoint": {
"path": "customers",
"params": {"status": "active"}
}
},
{
"name": "orders",
"endpoint": {
"path": "orders",
"paginator": {
"type": "offset",
"limit": 100
}
}
}
]
}
source = rest_api_source(config)
pipeline = dlt.pipeline(destination="bigquery", dataset_name="api_data")
pipeline.run(source)

La structure de configuration comporte trois parties :

  • client : URL de base, authentification, en-têtes par défaut
  • resource_defaults : Paramètres partagés pour toutes les ressources (clé primaire, write disposition)
  • resources : Liste des endpoints avec leur configuration spécifique

REST API Source gère automatiquement la pagination et l’authentification selon votre configuration. Il unneste aussi les données JSON imbriquées en tables relationnelles, ce que vous devriez gérer manuellement avec RESTClient. Pour un tutoriel pas à pas de REST API Source, consultez le guide pratique de dlt.

Chargement incrémental

Les rafraîchissements complets ne passent pas à l’échelle. Au-delà de quelques milliers d’enregistrements, il faut du chargement incrémental : ne récupérer que ce qui a changé depuis la dernière exécution.

La fonction incremental() de dlt suit les valeurs de curseur entre les exécutions :

@dlt.resource(write_disposition="merge", primary_key="id")
def orders(updated_since=dlt.sources.incremental("updated_at", initial_value="2024-01-01")):
params = {"updated_after": updated_since.last_value}
for page in client.paginate("/orders", params=params):
yield page

À la première exécution, dlt récupère les enregistrements mis à jour après la valeur initiale. Aux exécutions suivantes, il utilise la valeur maximale de updated_at de l’exécution précédente. L’état est stocké automatiquement, sans base de données externe.

Les paramètres clés à connaître :

  • cursor_path : JSONPath vers le champ curseur dans chaque enregistrement
  • initial_value : Point de départ pour la première exécution
  • last_value_func : Fonction pour déterminer quelle valeur de curseur conserver (par défaut : max)
  • on_cursor_value_missing : Comportement quand un enregistrement n’a pas le champ curseur (raise, include ou exclude)
  • lag : Secondes à soustraire du curseur pour les fenêtres d’attribution (gère les données en retard)

Avec REST API Source, configurez le chargement incrémental dans la définition de l’endpoint :

{
"name": "orders",
"endpoint": {
"path": "orders",
"incremental": {
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
}
}
}

Gestion des erreurs et résilience

Les API tombent, les rate limits se déclenchent et le réseau a des ratés, mais dlt gère automatiquement les modes de défaillance courants.

Pour les réponses HTTP 429 (rate limit), dlt respecte les en-têtes Retry-After et implémente un backoff exponentiel. La configuration par défaut retente jusqu’à 5 fois avec des délais configurables :

client = RESTClient(
base_url="https://api.example.com",
request_backoff_factor=2, # Multiplicateur de backoff exponentiel
request_max_retry_delay=300 # Secondes max entre les tentatives
)

REST API Source permet de configurer le comportement pour des codes HTTP spécifiques :

{
"endpoint": {
"path": "items",
"response_actions": [
{"status_code": 404, "action": "ignore"}, # Ignorer les ressources manquantes
{"status_code": 429, "action": "retry"} # Retenter sur rate limit
]
}
}

Gestion des secrets

Coder les identifiants en dur, c’est s’exposer à un incident de sécurité tôt ou tard. dlt fournit une hiérarchie de configuration qui garde les secrets en dehors du code.

Ordre de priorité (du plus élevé au plus bas) :

  1. Variables d’environnement
  2. secrets.toml
  3. config.toml
  4. Intégrations vault
  5. Valeurs par défaut des arguments

Le nommage des variables d’environnement utilise des doubles underscores comme séparateurs :

Terminal window
export SOURCES__MY_API__API_KEY="your-secret-key"
export DESTINATION__BIGQUERY__PROJECT_ID="your-project"

Pour le développement local, utilisez secrets.toml :

[sources.my_api]
api_key = "your-secret-key"
[destination.bigquery]
project_id = "your-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."

Ne commitez jamais secrets.toml dans le contrôle de version. Dans les pipelines CI/CD, utilisez des variables d’environnement ou intégrez des gestionnaires de secrets cloud comme Google Cloud Secret Manager via la configuration vault de dlt.

Tester vos pipelines

Testez vos pipelines avant de les envoyer en production. Tout comme une stratégie de tests est essentielle pour les projets dbt, l’architecture de dlt rend le test des pipelines simple.

Pour les tests unitaires, exécutez le pipeline sur DuckDB au lieu de votre data warehouse de production :

import dlt
import pytest
def test_customers_pipeline():
pipeline = dlt.pipeline(
destination="duckdb",
dataset_name="test_data"
)
# Exécuter avec des données limitées
source = my_api_source()
source.customers.add_limit(10) # Ne récupérer que 10 enregistrements
load_info = pipeline.run(source)
# Interroger les résultats
with pipeline.sql_client() as client:
result = client.execute_sql("SELECT COUNT(*) FROM customers")
assert result[0][0] == 10

Pour les tests d’intégration, extrayez et normalisez sans charger :

def test_schema_structure():
pipeline = dlt.pipeline(destination="duckdb", dataset_name="test")
source = my_api_source()
pipeline.extract(source)
pipeline.normalize()
schema = pipeline.default_schema
assert "customers" in schema.tables
assert "id" in schema.tables["customers"]["columns"]

Options de déploiement

dlt fonctionne partout où Python fonctionne. La commande dlt deploy génère des configurations de déploiement pour les plateformes courantes.

GitHub Actions :

Terminal window
dlt deploy my_pipeline.py github-action --schedule "0 6 * * *"

Cela crée un fichier workflow qui installe les dépendances, configure les secrets depuis les secrets du dépôt GitHub et exécute votre pipeline selon le planning.

Airflow/Google Composer :

Terminal window
dlt deploy my_pipeline.py airflow-composer --secrets-format env

Le DAG généré utilise le helper PipelineTasksGroup de dlt pour créer des tâches séparées par ressource quand vous avez besoin de parallélisme.

Modal fonctionne bien pour les déploiements serverless :

import modal
import dlt
app = modal.App("my-dlt-pipeline")
@app.function(schedule=modal.Period(days=1))
def run_pipeline():
pipeline = dlt.pipeline(destination="bigquery", dataset_name="api_data")
pipeline.run(my_api_source())

Parmi les autres options : Google Cloud Functions, Cloud Run, Dagster (qui a une intégration native @dlt_assets) et Prefect.

Pièges courants et débogage

Cinq problèmes que je rencontre régulièrement :

  1. Ne pas tester localement d’abord : exécutez toujours python my_pipeline.py avant de déployer. La plupart des erreurs de configuration apparaissent immédiatement.

  2. Configuration des secrets manquante : les messages d’erreur de dlt vous indiquent précisément quelle clé manque. Vérifiez le nom attendu de la variable d’environnement.

  3. Mauvaise configuration de la pagination : testez d’abord avec une petite limite. Si vous n’obtenez qu’une seule page alors que vous en attendez plusieurs, votre configuration du paginateur est incorrecte.

  4. Conflits de schéma avec le chargement incrémental : utilisez des noms de pipeline uniques quand vous testez des variantes. L’état est stocké par nom de pipeline.

  5. Problèmes de mémoire avec les gros volumes : produisez les pages au fur et à mesure au lieu de les accumuler en mémoire. Les générateurs sont vos alliés.

Pour le débogage, activez les logs détaillés :

config.toml
log_level = "INFO"

Utilisez progress="log" pour les barres de progression dans les logs, et dlt pipeline info pour inspecter les packages de chargement et l’état.

Pour conclure

dlt offre deux chemins vers des pipelines API personnalisés. REST API Source vous amène rapidement en production quand les API suivent des patterns standards. RESTClient fournit le contrôle nécessaire quand ce n’est pas le cas.

La bibliothèque gère les préoccupations d’infrastructure (pagination, retries, évolution du schéma, état incrémental) pour que vous puissiez vous concentrer sur la logique spécifique à l’API. C’est le juste milieu entre les connecteurs managés coûteux et tout construire soi-même. Une fois vos données chargées dans BigQuery, vous pouvez ajouter des transformations dbt par-dessus pour construire vos modèles analytiques.

Pour la plupart des nouvelles intégrations, REST API Source est la voie la plus rapide. RESTClient est là quand vous en avez besoin. Dans les deux cas, vous obtenez un script Python versionnable, testable et déployable n’importe où.