Adrienne Vermorel

Guide pratique de dlt

Sur mes projets data, je fais face à un défi courant : extraire des données de manière fiable depuis diverses sources vers des destinations où elles peuvent être analysées. Bien que cela semble simple, la mise en place de pipelines de données peut impliquer une infrastructure complexe, des connaissances spécialisées et une maintenance importante.

Dernièrement, j’ai utilisé dlt (Data Load Tool) – un framework Python qui simplifie le chargement de données tout en suivant les meilleures pratiques de data engineering. Ce que j’apprécie avec dlt, c’est qu’il rend la construction de pipelines de données robustes accessible, même pour des personnes comme moi qui n’ont que des compétences basiques en Python.

Pourquoi dlt se démarque

  • Approche Python d’abord : Si on connaît les bases de Python, on peut construire des pipelines de données prêts pour la production. Pas besoin d’expertise spécialisée en data engineering.
  • Aucune configuration d’infrastructure : On peut exécuter des pipelines localement pendant le développement ou les déployer dans des environnements sans serveur comme GCP Cloud Functions ou AWS Lambda sans configuration complexe.
  • Meilleures pratiques intégrées : On bénéficie du chargement incrémental, du typage des données, de la normalisation et de la gestion des erreurs sans écrire de code répétitif.

Dans ce tutoriel, on va construire plusieurs pipelines de complexité croissante en utilisant l’API GitHub. On apprendra à :

  1. Obtenir une liste des repositories appartenant à l’organisation PokeAPI
  2. Obtenir les commits des repositories listés précédemment
  3. Implémenter un chargement incrémental pour traiter efficacement uniquement les nouvelles activités des repositories

Cela devrait donner suffisamment de compétences pour utiliser dlt afin de construire ses propres pipelines de données. 🎉

Commençons par configurer notre environnement et comprendre les bases de dlt.

Prérequis pour dlt

Avant de se lancer dans la construction de pipelines de données avec dlt, configurons notre environnement. Ce tutoriel suppose une familiarité basique avec Python et l’utilisation d’une interface en ligne de commande.

Installation de Python

dlt nécessite Python 3.9 ou plus récent. Si on n’a pas Python installé :

Télécharger et installer Python : Visiter python.org et télécharger la dernière version pour son système d’exploitation.

  • Sur Windows : Exécuter l’installateur et s’assurer de cocher “Add Python to PATH”
  • Sur macOS : Utiliser l’installateur officiel ou installer via Homebrew avec brew install python
  • Sur Linux : La plupart des distributions viennent avec Python préinstallé, ou utiliser le gestionnaire de paquets (ex : apt install python3)

Vérifier l’installation : Ouvrir un terminal ou une invite de commande et exécuter :

Terminal window
python --version

ou

Terminal window
python3 --version

Mise en place d’un environnement virtuel

C’est une bonne pratique d’utiliser un environnement virtuel pour les projets Python afin de gérer les dépendances :

Créer un nouveau répertoire de projet :

Terminal window
mkdir dlt-tutorial
cd dlt-tutorial

Créer un environnement virtuel :

Terminal window
python -m venv venv

ou

Terminal window
python3 -m venv venv

Activer l’environnement virtuel :

  • Sur Windows :
    Terminal window
    venv\Scripts\activate
  • Sur macOS/Linux :
    Terminal window
    source venv/bin/activate

On devrait voir le nom de l’environnement dans l’invite, comme (venv).

Installation de dlt

Avec l’environnement virtuel activé, installer dlt et ses dépendances :

Terminal window
pip install dlt

Si on prévoit d’utiliser des destinations spécifiques (bases de données), on doit installer les packages appropriés. Dans notre cas, on utilisera DuckDB comme destination, donc on devra exécuter ceci :

Terminal window
pip install "dlt[duckdb]"

Concepts fondamentaux de dlt

Pipelines

Un pipeline est l’unité principale de travail dans dlt. Il est responsable de l’extraction des données depuis les sources, de l’application des transformations et du chargement des données dans une destination. Les pipelines gèrent :

  • La connexion aux sources de données
  • La gestion de l’évolution du schéma
  • Le suivi de l’état de chargement pour le chargement incrémental
  • La gestion des erreurs et les tentatives

Sources

Une source est l’origine des données. Dans dlt, les sources peuvent être :

  • Des APIs (comme l’API GitHub qu’on va utiliser)
  • Des bases de données
  • Des fichiers (CSV, JSON, etc.)
  • Des plateformes de streaming
  • Des objets Python personnalisés

dlt fournit des sources vérifiées pour les sources courantes, y compris une source API REST générique qu’on utilisera dans ce tutoriel.

Ressources

Une ressource est un ensemble de données spécifique au sein d’une source. Par exemple, si GitHub est notre source, alors “organizations”, “repositories” et “commits” pourraient être des ressources distinctes.

Les ressources :

  • Définissent la structure des données
  • Peuvent être normalisées en tables séparées
  • Peuvent avoir des relations parent-enfant
  • Peuvent être chargées de manière incrémentale

Destinations

Une destination est l’endroit où les données sont chargées. dlt prend en charge diverses data warehouses et bases de données :

  • Nombreuses bases de données SQL (propulsées par SQLAlchemy)
  • BigQuery
  • Snowflake
  • Redshift
  • DuckDB (idéal pour le développement)
  • Et plus…

Un pipeline GitHub simple utilisant la source API REST

Construisons notre premier pipeline dlt pour récupérer les repositories de l’API GitHub en utilisant la source API REST intégrée à dlt.

Étape 1 : Initialiser dlt

D’abord, on doit initialiser dlt dans le repository. On utilisera la source API REST et la destination DuckDB.

Terminal window
dlt init rest_api duckdb

Cela créera un répertoire .dlt dans le projet ainsi qu’un fichier rest_api_pipeline.py. Cela créera également un fichier .gitignore pour éviter de valider le fichier secret.toml dans le repository, ainsi que d’autres fichiers spécifiques à dlt.

Étape 2 : Configurer l’authentification

Pour configurer l’authentification et augmenter la limite de quota, on a besoin d’avoir un compte GitHub et de configurer un jeton d’accès personnel. Donner un nom, sélectionner Public repositories pour l’accès au repository. Pour l’accès au repository, sélectionner Public repositories et cliquer sur Generate token. Une fois qu’on a la clé, on peut l’ajouter au fichier .dlt/secrets.toml :

[sources.github]
api_key ="api-key"

Étape 3 : Créer le Pipeline

Dans le fichier rest_api_pipeline.py, supprimer le code existant et le remplacer par ce qui suit :

import dlt
from dlt.sources.rest_api import rest_api_source
from dlt.sources.helpers.rest_client.auth import APIKeyAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator
github_source = rest_api_source(
{
"client": {
"base_url": "https://api.github.com/",
"paginator": HeaderLinkPaginator(links_next_key="next"),
"auth": {
"type": "bearer",
"token": dlt.secrets["sources.github.api_key"],
},
},
"resources": [
{
"name": "orgs-pokeapi-repos",
"endpoint": {
"path": "orgs/PokeAPI/repos",
},
},
],
}
)
pipeline = dlt.pipeline(
pipeline_name="github_pipeline",
destination="duckdb",
dataset_name="github_data",
)
try:
# Run the pipeline with our source
load_info = pipeline.run(github_source.with_resources(
"orgs-pokeapi-repos"))
# Print information about the load
print(f"Load info: {load_info}")
except Exception as e:
print(f"Error occurred: {str(e)}")

Étape 4 : Exécuter le Pipeline

Exécuter le pipeline :

Terminal window
python3 rest_api_pipeline.py

Étape 5 : Examiner les Résultats

Après avoir exécuté le pipeline, dlt créera une base de données DuckDB dans le répertoire principal. On peut l’explorer en utilisant l’interface en ligne de commande DuckDB :

Terminal window
duckdb github_pipeline.duckdb

On peut voir les tables en utilisant la commande suivante :

Terminal window
SHOW ALL TABLES;

On devrait voir les tables suivantes :

  • _dlt_loads
  • _dlt_pipeline_state
  • _dlt_version
  • orgs_pokeapi_repos
  • orgs_pokeapi_repos__topics

Pour voir le schéma de la table orgs_pokeapi_repos, on peut exécuter ceci :

Terminal window
DESCRIBE github_data.orgs_pokeapi_repos;

Et pour interroger les données, on peut exécuter ceci :

Terminal window
SELECT id, name, owner__url, stargazers_count FROM github_data.orgs_pokeapi_repos LIMIT 5;

Comprendre la configuration de la source API REST

Décomposons les éléments clés de notre configuration de source API REST :

1. Configuration du Client

"client": {
"base_url": "https://api.github.com/",
"paginator": HeaderLinkPaginator(links_next_key="next"),
"auth": {
"type": "bearer",
"token": dlt.secrets["sources.github.api_key"],
},
},
  • base_url: L’URL de base pour toutes les requêtes API
  • paginator: Configuration pour gérer la pagination.
  • auth: Configuration d’authentification. Ici on utilise un jeton bearer.

Le HeaderLinkPaginator est utilisé par défaut lorsque l’API renvoie un en-tête Link. Mais il est préférable de le spécifier dans la configuration dlt. Sinon, lorsque le Header Link est manquant, dlt peut donner un avertissement indiquant qu’aucun paginateur n’est détecté. Cela peut se produire lorsqu’une seule page de résultats est renvoyée par l’API.

2. Configuration des Ressources

"resources": [
{
"name": "orgs-pokeapi-repos",
"endpoint": {
"path": "orgs/PokeAPI/repos",
},
},
],
  • name: Définit le nom de la ressource et le nom de la table résultante
  • endpoint: Configure le point de terminaison de l’API
    • path: Le chemin à ajouter à l’URL de base

Ce qui s’est passé en coulisses

Lorsqu’on a exécuté le pipeline, dlt a effectué plusieurs tâches :

  1. Requêtes API: dlt a envoyé des requêtes HTTP à l’API GitHub
  2. Gestion de la pagination: dlt a automatiquement récupéré toutes les pages de données
  3. Inférence de schéma: dlt a analysé la structure des données et créé des tables appropriées
  4. Normalisation des données: Les données imbriquées complexes ont été aplaties dans une structure relationnelle (on a obtenu deux tables: la principale -> orgs_pokeapi_repos, et une table associée avec les sujets -> orgs_pokeapi_repos__topics)
  5. Chargement des données: dlt a chargé les données dans la destination spécifiée, dans notre cas dans un fichier DuckDB

Utiliser le résultat d’une ressource pour la configuration d’une autre

Ensuite, on va lister tous les commits appartenant aux repositories de l’organisation PokéAPI. Pour cela, on devra utiliser le résultat de l’endpoint /orgs/PokeAPI/repos afin de faire l’appel à /orgs/PokeAPI/{repo}/commits

On va mettre à jour la liste des ressources :

"resources": [
{
"name": "pokeapi_repos",
"endpoint": {
"path": "orgs/PokeAPI/repos",
},
},
{
"name": "pokeapi_repos_commits",
"endpoint": {
"path": "repos/PokeAPI/{resources.pokeapi_repos.name}/commits",
},
},
],

Et quand on regarde le fichier github_pipeline.duckdb, on a maintenant la table suivante : github_data.pokeapi_repos_commits avec tous les commits de ce repository.

Implémenter le chargement incrémental avec l’API GitHub

Jusqu’à présent, on a construit un pipeline qui extrait tous les commits des repositories PokeAPI. Mais que faire si on veut régulièrement mettre à jour notre base de données avec seulement de nouveaux commits, sans tout retraiter ? Pour cela, on peut utiliser le chargement incrémental.

Pour ce faire, on doit déterminer dans l’API deux choses :

  • quels paramètres de date/timestamp on peut envoyer à l’API pour exclure les commits avant une certaine date (dans GitHub, c’est since)
  • quel champ renvoyé par l’endpoint spécifie la date/timestamp du commit (on peut utiliser commit.author.date)

Comprendre le chargement incrémental dans dlt

Le chargement incrémental est une technique où seules les données nouvelles ou modifiées sont traitées à chaque exécution du pipeline. Cette approche :

  • Réduit le temps de traitement et l’utilisation des ressources
  • Minimise les requêtes API (et c’est important pour les API qui ont des limites de quota)
  • Crée un pipeline de données globalement plus efficace

dlt gère la complexité du chargement incrémental grâce à ses capacités de gestion d’état :

  1. Il suit ce qui a déjà été chargé
  2. Il utilise l’état des ressources pour déterminer quoi charger ensuite
  3. Il gère cela automatiquement avec juste quelques modifications de configuration

Ajouter des paramètres à notre ressource

Modifions notre pipeline pour inclure un modèle de chargement incrémental. D’abord, on va ajouter la configuration pour la ressource des commits :

{
"name": "pokeapi_repos_commits",
"endpoint": {
"path": "/repos/PokeAPI/{resources.pokeapi_repos.name}/commits",
"params": {
"since": {
"type": "incremental",
"cursor_path": "commit.author.date",
"initial_value": "2024-01-01T00:00:00Z"
}
}
},
},
  • Dans l’objet params, on passe since le paramètre que GitHub attend pour filtrer les données basées sur un horodatage.
  • On spécifie que ce paramètre est de type incremental.
  • On fournit le cursor_path commit.author.date pour suivre le timestamp de la réponse du point de terminaison.
  • Et pour initial_value, on fournit la valeur qui initialisera l’état de chargement incrémental. Il est important que le type de valeur corresponde au type du champ dans les données.

Exécuter le pipeline

Maintenant quand on exécute le pipeline pour la première fois, dlt traite toutes les données depuis le 2024-01-01 pour la ressource de commits. Mais lors des exécutions suivantes, il ne traitera que les commits depuis la dernière exécution.

Comment dlt gère l’état incrémental

En coulisses, dlt maintient des informations d’état dans la destination. Cela inclut :

  1. Métadonnées de chargement : Informations sur chaque exécution de pipeline stockées dans _dlt_loads
  2. État du pipeline : Les paramètres d’état pour chaque ressource dans _dlt_pipeline_state
  3. Versionnement des données : Quelle version de chaque enregistrement a été chargée dans quelle exécution

Avec cela, on peut implémenter le chargement incrémental de données avec un minimum de code.

Pour résumer

Dans cet article, on a appris à connaître dlt et ses concepts les plus importants. On a déployé un pipeline basique depuis GitHub vers DuckDB. On a ensuite utilisé le résultat d’un endpoint pour configurer un autre endpoint en récupérant les noms des repositories afin d’obtenir les commits de ces repositories. Enfin, on a implémenté le chargement incrémental pour obtenir uniquement les nouvelles données.