ServicesÀ proposNotesContact Me contacter →
EN FR
Note

Chargement incrémental dlt

Comment dlt suit l'état entre les exécutions de pipeline via le chargement incrémental basé sur curseur — le helper dlt.sources.incremental(), la configuration déclarative REST API et pourquoi l'état réside dans la destination.

Planté
dltdata engineeringetlincremental processing

Le chargement incrémental de dlt gère le suivi d’état avec une configuration minimale et stocke cet état dans l’entrepôt de destination où il peut être inspecté directement. Un pipeline qui récupère l’ensemble du dataset à chaque exécution est plus lent, plus coûteux et plus susceptible d’atteindre les limites de débit — le chargement incrémental répond aux trois problèmes.

Le mécanisme de base

dlt utilise le chargement incrémental basé sur curseur. Vous déclarez un champ curseur — typiquement un timestamp ou un ID auto-incrémental — et dlt suit la valeur maximale vue à chaque exécution. Les exécutions suivantes ne récupèrent que les enregistrements plus récents que cette valeur.

L’API pour cela est dlt.sources.incremental() :

@dlt.resource(primary_key="id")
def orders(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
):
for page in api.get_orders(since=updated_at.last_value):
yield page

Lors de la première exécution, dlt utilise initial_value comme point de départ du backfill historique. Lors des exécutions suivantes, il utilise le maximum d’updated_at vu lors de l’exécution précédente. La propriété updated_at.last_value fournit le curseur actuel à passer à l’appel API. Pas de logique de checkpoint, pas de fichier d’état, pas de table de suivi séparée — déclarez le champ curseur et dlt gère le reste.

Où réside l’état

L’état persiste dans votre entrepôt de destination, pas dans un store d’état séparé. dlt crée une table _dlt_pipeline_state dans votre dataset qui stocke les valeurs de curseur et d’autres métadonnées de pipeline entre les exécutions.

Ce choix de conception a des implications pratiques :

Il est visible. Vous pouvez interroger la table d’état directement pour voir où votre pipeline s’est arrêté. Pas de fichiers d’état en boîte noire, pas de services externes à vérifier.

Il suit vos données. Si vous restaurez une sauvegarde de base de données ou déménagez dans un environnement différent, l’état se déplace avec vos données. Pas de service d’état séparé à synchroniser.

Il est par destination. Si vous exécutez le même pipeline contre des datasets BigQuery de dev et de prod, chacun maintient son propre état. Ils n’interfèrent pas l’un avec l’autre.

Configuration incrémentale déclarative pour les APIs REST

Pour les REST API sources, vous configurez le chargement incrémental de manière déclarative plutôt qu’en écrivant de la logique Python :

{
"name": "orders",
"endpoint": {
"path": "/orders",
"params": {
"since": {
"type": "incremental",
"cursor_path": "updated_at",
"initial_value": "2024-01-01T00:00:00Z"
}
}
}
}

Cette configuration fait la même chose que l’exemple de générateur Python ci-dessus. Le cursor_path indique à dlt où trouver la valeur du curseur dans la réponse, et initial_value définit le point de départ. dlt gère le suivi et injecte la valeur du curseur actuel dans le paramètre since à chaque exécution.

La forme déclarative est particulièrement utile pour le développement de pipelines assisté par IA : vous pouvez générer ces configurations depuis la documentation API sans écrire de logique Python. Voir dlt pour le développement de pipelines assisté par IA pour le workflow complet.

Efficacité mémoire

Le pattern générateur qui rend le chargement incrémental possible gère également la mémoire. Une ressource basée sur générateur produit des pages plutôt que d’accumuler l’ensemble du dataset :

@dlt.resource(primary_key="id")
def orders(
updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01T00:00:00Z")
):
for page in api.get_orders(since=updated_at.last_value):
yield page # produit une page à la fois, pas l'ensemble du dataset

Cela signifie qu’un pipeline traitant des millions d’enregistrements ne les garde pas tous en mémoire simultanément. Chaque page est produite, normalisée et chargée avant que la page suivante ne soit récupérée. Pour les grands datasets ou les APIs qui retournent des volumes de données significatifs, c’est la différence entre un pipeline qui fonctionne et un qui plante en mémoire.

Interaction avec la write disposition

Le chargement incrémental se combine avec la write disposition merge pour les données mutables. Sans primary_key, dlt peut suivre les valeurs de curseur mais ne peut pas dédupliquer — vous obtiendriez des lignes dupliquées si un enregistrement apparaît dans plusieurs fenêtres incrémentielles.

Pour les données append-only (logs d’événements, flux de clics, tout ce qui ne se met pas à jour), le chargement incrémental avec la disposition append fonctionne proprement : suivez l’ID ou le timestamp maximum, récupérez uniquement les enregistrements plus récents, ajoutez-les. Pas de déduplication nécessaire.

Pour les entités mutables — utilisateurs, commandes, tout ce qui se met à jour — vous avez besoin à la fois du chargement incrémental (pour ne récupérer que les enregistrements modifiés) et de la disposition merge avec une primary_key (pour faire un upsert plutôt que dupliquer) :

@dlt.resource(
write_disposition="merge",
primary_key="id"
)
def users(
updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01")
):
for page in api.get_users(modified_since=updated_at.last_value):
yield page

Relation avec les modèles incrémentiels dbt

Si vous utilisez dbt en aval de dlt, vous gérez l’incrémentalité à deux couches. dlt gère la couche d’extraction — ne récupérant que les enregistrements nouveaux ou modifiés depuis la source. dbt gère la couche de transformation — ne traitant que les enregistrements nouveaux ou modifiés à travers vos modèles.

Ces couches sont indépendantes mais complémentaires. Le chargement incrémental de dlt réduit ce qui atterrit dans votre couche brute. Les modèles incrémentiels de dbt réduisent ce qui est retraité dans votre couche de transformation. Un pipeline bien structuré utilise les deux.

Le modèle mental diffère entre eux : le chargement incrémental de dlt concerne le suivi d’état de l’API (où me suis-je arrêté ?), tandis que les modèles incrémentiels de dbt concernent l’optimisation des requêtes (comment éviter de scanner la table complète ?). Les deux améliorent l’efficacité, mais ils résolvent des problèmes différents.

Quand le chargement incrémental importe

Tous les pipelines n’ont pas besoin du chargement incrémental. Pour les petits datasets qui se chargent en quelques secondes et coûtent un calcul négligeable, le remplacement complet (disposition replace) est plus simple et évite les cas limites de gestion d’état.

Le chargement incrémental est approprié quand :

  • Le dataset source est suffisamment grand pour que l’extraction complète prenne un temps significatif
  • L’API source limite le débit total de transfert de données
  • La table de destination est suffisamment grande pour que le rechargement complet soit coûteux
  • Des exécutions fréquentes du pipeline nécessitent des données quasi temps réel

Pour comprendre comment le chargement incrémental s’intègre dans la structure plus large des pipelines dlt, voir Concepts fondamentaux de dlt. Pour les considérations spécifiques à BigQuery concernant les charges incrémentielles et le staging, voir dlt et intégration BigQuery.