Adrienne Vermorel
Données en retard dans dbt : des patterns qui fonctionnent vraiment
Les données en retard sont la raison pour laquelle les modèles incrémentaux dérivent par rapport à la source de vérité. Votre modèle traite les événements de la veille, les considère comme complets, et passe à la suite. Sauf que certains de ces événements n’arrivent que trois jours plus tard. Sans stratégie pour les rattraper, votre modèle incrémental diverge progressivement de la réalité.
Cet écart entre le moment où un événement se produit et le moment où il arrive dans votre data warehouse existe dans tous les pipelines de données. Comprendre quand vos données arrivent en retard, et de combien, détermine quels patterns fonctionneront pour votre cas d’usage.
Le problème fondamental
Deux timestamps comptent pour le traitement incrémental :
- Event time : quand quelque chose s’est produit (un achat, une page vue, une connexion)
- Load time : quand cet enregistrement a atterri dans votre data warehouse
Quand l’event time d’un enregistrement est antérieur au maximum d’event time déjà présent dans votre table cible, cette donnée est « en retard ». Votre filtre incrémental standard l’ignorerait complètement.
-- Ceci rate les données en retard{% if is_incremental() %}WHERE event_date > (SELECT MAX(event_date) FROM {{ this }}){% endif %}Si votre modèle a tourné hier et traité les événements jusqu’au 15 janvier, tous les événements du 14 janvier qui arrivent aujourd’hui sont ignorés. Ils sont en retard, et votre filtre ne sait pas qu’il faut aller les chercher.
Mesurer le profil de latence
Avant de choisir une stratégie, analysez vos données. Exécutez cette requête sur vos tables sources :
SELECT DATE_DIFF(DATE(_loaded_at), DATE(event_timestamp), DAY) AS days_late, COUNT(*) AS record_count, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS pct_of_totalFROM source_tableWHERE _loaded_at > event_timestampGROUP BY 1ORDER BY 1Vous obtiendrez la distribution. Peut-être que 95 % de vos données arrivent le jour même, 4 % dans les 3 jours, et 1 % s’étale sur une semaine. Cette distribution conditionne tout ce qui suit.
Le pattern du lookback window
La correction est simple : au lieu de ne traiter que les nouvelles données, retraitez une fenêtre glissante de données récentes à chaque exécution.
{% if is_incremental() %}WHERE event_date >= ( SELECT DATE_SUB(MAX(event_date), INTERVAL 3 DAY) FROM {{ this }}){% endif %}Combiné avec une configuration unique_key, ce pattern rattrape les arrivées tardives en re-mergeant les données récentes. Les enregistrements déjà présents sont mis à jour ; les nouvelles arrivées tardives sont insérées.
Choisir la taille de la fenêtre
Il n’y a pas de réponse universelle. Chaque option fait un compromis entre coût et couverture :
1 jour : coût de retraitement minimal. Rattrape les données qui arrivent avec un léger retard. Rate tout ce qui est décalé davantage.
3 jours : un point de départ courant. Couvre la majorité des arrivées tardives dans la plupart des systèmes. Le surcoût en calcul est généralement négligeable par rapport au gain en qualité des données.
7 jours : rattrape presque tout pour la plupart des sources. Commence à représenter un coût significatif sur les grandes tables.
14+ jours : couverture maximale. Impact important sur les performances. Justifié uniquement pour des sources avec une latence extrême connue, comme les flux de données tiers.
Votre analyse de latence vous indique où se situent vos données. Si 99,5 % arrivent sous 3 jours, une fenêtre de 3 jours suffit. Si vous avez une longue traîne qui s’étend sur 10 jours, vous devez décider si rattraper ces derniers 0,5 % vaut le coût.
Rendre la fenêtre configurable
Prévoyez de la flexibilité pour les backfills et les cas particuliers :
{% set lookback_days = var('lookback_days', 3) %}
{% if is_incremental() %}WHERE created_at >= ( SELECT DATE_SUB(MAX(created_at), INTERVAL {{ lookback_days }} DAY) FROM {{ this }}){% endif %}Vous pouvez alors l’ajuster au besoin :
dbt run --select my_model --vars '{"lookback_days": 14}'Vos exécutions quotidiennes restent performantes, tout en permettant des backfills ciblés.
Les incremental predicates pour les grandes tables
Sur des tables de plusieurs centaines de millions de lignes, l’opération de merge peut devenir coûteuse même avec un lookback window. Votre data warehouse scanne l’intégralité de la table de destination pour trouver les enregistrements correspondants.
Les incremental_predicates limitent ce scan :
{{ config( materialized='incremental', unique_key='id', incremental_strategy='merge', incremental_predicates=[ "DBT_INTERNAL_DEST.created_at >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)" ]) }}Ce paramètre ajoute un filtre côté destination dans l’instruction MERGE. Au lieu de scanner 500 millions de lignes pour trouver les correspondances, le data warehouse ne parcourt que les partitions de la dernière semaine.
Le piège BigQuery
Le partition pruning sur BigQuery ne fonctionne qu’avec des valeurs littérales ou constantes. Les sous-requêtes comme SELECT MAX(date) ne déclenchent pas le pruning. Si vous avez besoin de dates dynamiques, calculez-les dans une macro qui se résout en littéral avant l’exécution de la requête.
Stratégies basées sur les partitions
Pour les tables partitionnées, remplacer des partitions entières est souvent plus performant que le merge ligne par ligne. Cette approche fonctionne particulièrement bien pour les données temporelles où les arrivées tardives affectent un ensemble prévisible de partitions.
BigQuery insert_overwrite
{% set partitions_to_replace = [ 'CURRENT_DATE()', 'DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)', 'DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)'] %}
{{ config( materialized='incremental', incremental_strategy='insert_overwrite', partition_by={'field': 'event_date', 'data_type': 'date'}, partitions=partitions_to_replace) }}
SELECT event_id, event_date, user_id, event_dataFROM {{ ref('base_events') }}{% if is_incremental() %}WHERE event_date IN ({{ partitions_to_replace | join(',') }}){% endif %}Avec des partitions statiques, BigQuery sait exactement quelles partitions remplacer. On évite ainsi la requête de découverte de partitions que le mode dynamique impose.
Snowflake : delete+insert à la place
Le insert_overwrite de Snowflake remplace la table entière, pas les partitions individuelles. Pour des remplacements par partition, utilisez delete+insert avec des prédicats explicites :
{{ config( materialized='incremental', incremental_strategy='delete+insert', unique_key='event_date') }}
SELECT event_id, event_date, user_id, event_dataFROM {{ ref('base_events') }}{% if is_incremental() %}WHERE event_date >= DATEADD(DAY, -3, CURRENT_DATE){% endif %}Databricks replace_where
Databricks propose replace_where pour le remplacement de partitions basé sur un prédicat :
{{ config( materialized='incremental', incremental_strategy='replace_where', incremental_predicates=["event_date >= CURRENT_DATE - INTERVAL 3 DAYS"]) }}Ce mode remplace atomiquement les lignes qui correspondent au prédicat. Vous obtenez l’efficacité du remplacement par partition avec la flexibilité de conditions de filtre arbitraires.
La limite des fenêtres de partition
Les stratégies basées sur les partitions ont un angle mort : les données qui arrivent après la fermeture de la fenêtre de leur partition sont perdues.
Prenons un exemple : vous remplacez les 3 derniers jours de partitions à chaque exécution. Le 20 janvier, vous retraitez les partitions du 17 au 20 janvier. Des événements du 16 janvier qui arrivent le 20 janvier ne seront pas captés, car le 16 janvier n’est plus dans votre fenêtre de remplacement.
Pour la plupart des systèmes, ce cas limite est suffisamment rare pour être ignoré. Pour les données critiques, il vous faut :
- Des fenêtres de partition plus larges (avec le coût associé)
- Des full refreshes périodiques pour rattraper ce qui a glissé entre les mailles
- Une approche par merge capable de mettre à jour n’importe quelle ligne quelle que soit sa partition
Idempotence par la déduplication
Un lookback window implique de traiter les mêmes enregistrements plusieurs fois. Votre modèle doit produire des résultats identiques quel que soit le nombre d’exécutions.
La première ligne de défense est le unique_key. Les enregistrements correspondant à une clé existante sont mis à jour plutôt que dupliqués. Mais cela ne fonctionne que sur les exécutions suivantes. Votre premier full refresh traite toutes les données sans la logique de merge.
Ajoutez une déduplication explicite pour gérer les doublons à tous les stades :
SELECT event_id, event_timestamp, user_id, event_data, _loaded_atFROM {{ ref('base_events') }}{% if is_incremental() %}WHERE event_timestamp >= ( SELECT DATE_SUB(MAX(event_timestamp), INTERVAL 3 DAY) FROM {{ this }}){% endif %}QUALIFY ROW_NUMBER() OVER ( PARTITION BY event_id ORDER BY event_timestamp DESC) = 1La clause QUALIFY ne conserve que la version la plus récente de chaque enregistrement, ce qui empêche les doublons d’entrer dans votre modèle, y compris lors du premier run.
Surrogate keys pour l’unicité multi-colonnes
Quand l’unicité repose sur plusieurs colonnes, générez une surrogate key :
{{ config( unique_key='surrogate_key') }}
SELECT {{ dbt_utils.generate_surrogate_key(['user_id', 'event_date', 'event_type']) }} AS surrogate_key, user_id, event_date, event_type, event_dataFROM {{ ref('base_events') }}Cela évite les cas limites qui peuvent survenir avec les configurations unique_key composites selon les data warehouses.
Tester la gestion des données en retard
Les unit tests (dbt 1.8+) permettent de simuler des arrivées tardives :
unit_tests: - name: test_late_data_captured model: my_incremental_model overrides: macros: is_incremental: true given: - input: this rows: - {id: 1, value: 'original', updated_at: '2024-01-15'} - input: ref('source') rows: - {id: 1, value: 'updated', updated_at: '2024-01-17'} - {id: 2, value: 'late_arrival', updated_at: '2024-01-13'} expect: rows: - {id: 1, value: 'updated'} - {id: 2, value: 'late_arrival'}Le test vérifie que l’enregistrement en retard (id: 2, daté du 13 janvier alors que le modèle contient déjà des données du 15 janvier) est bien capté par le lookback window.
Pour la validation en production, comparez les résultats du modèle incrémental avec ceux d’un full refresh :
{{ audit_helper.compare_queries( a_query="SELECT * FROM model_full_refresh WHERE event_date >= CURRENT_DATE - 7", b_query="SELECT * FROM model_incremental WHERE event_date >= CURRENT_DATE - 7", primary_key='id') }}Un écart entre les deux indique que votre lookback window ne capture pas tout.
Le filet de sécurité du full refresh
Aucun lookback window ne rattrape tout indéfiniment. Les données peuvent arriver avec un retard arbitraire. Des systèmes externes peuvent subir des pannes qui retardent les données de plusieurs semaines.
Planifiez des full refreshes périodiques, hebdomadaires ou mensuels selon votre tolérance à la dérive. Pour les très grandes tables où le full refresh n’est pas réaliste, exécutez-en un manuellement après tout incident connu sur vos pipelines.
Pensez à ajouter full_refresh: false sur les modèles critiques pour éviter les reconstructions accidentelles :
{{ config( materialized='incremental', full_refresh=false) }}Ce paramètre impose le flag --full-refresh explicite, ce qui empêche les reconstructions accidentelles de plusieurs heures.
Vue d’ensemble
Voici à quoi ressemble un modèle incrémental prêt pour la production, avec gestion des arrivées tardives :
{{ config( materialized='incremental', unique_key='event_id', incremental_strategy='merge', incremental_predicates=[ "DBT_INTERNAL_DEST.event_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)" ], full_refresh=false) }}
{% set lookback_days = var('lookback_days', 3) %}
SELECT event_id, event_date, user_id, event_data, _loaded_atFROM {{ ref('base_events') }}{% if is_incremental() %}WHERE event_date >= ( SELECT DATE_SUB(MAX(event_date), INTERVAL {{ lookback_days }} DAY) FROM {{ this }}){% endif %}QUALIFY ROW_NUMBER() OVER ( PARTITION BY event_id ORDER BY _loaded_at DESC) = 1Ce modèle :
- Retraite les 3 derniers jours par défaut
- Permet un override via
--varspour les backfills - Limite le scan de la table de destination avec les predicates
- Empêche les full refreshes accidentels
- Déduplique les données sources
Le bon pattern dépend du profil de latence de vos données, de la taille de vos tables et de votre tolérance à la dérive. Commencez par mesurer le retard réel de vos données, puis choisissez l’approche la plus simple qui couvre vos besoins.