Dagster suit la fraîcheur des assets, pas seulement les timestamps d’exécution. Un timestamp d’exécution enregistre quand un pipeline a tourné ; la fraîcheur suit si les données sont suffisamment récentes pour leurs consommateurs. Un pipeline exécuté il y a 30 minutes mais qui a traité des données upstream périmées est récent du point de vue du timestamp d’exécution, mais potentiellement en retard de plusieurs heures sur la fraîcheur des données.
Cloud Run Jobs et Cloud Workflows déclenchent des jobs selon le temps ou des événements mais ne suivent pas si les données résultantes respectent les exigences de fraîcheur. Dagster suit cela par asset.
Politiques de fraîcheur via dbt Meta
Les attentes de fraîcheur se définissent directement dans la section meta du projet dbt. L’intégration Dagster les lit depuis le manifest, de sorte que les exigences de fraîcheur résident aux côtés des définitions de modèles :
models: - name: mrt__marketing__daily_spend meta: dagster: freshness_policy: maximum_lag_minutes: 360 # Doit avoir moins de 6 heures d'anciennetéCela indique à Dagster que mrt__marketing__daily_spend ne doit jamais avoir plus de 6 heures de retard. Si c’est le cas, l’UI affiche une alerte de fraîcheur, et des notifications peuvent être configurées via les alertes Dagster+.
La politique de fraîcheur est au niveau de l’asset, pas du pipeline. Des exigences de fraîcheur différentes peuvent être définies pour différents modèles selon leur importance métier :
- Dashboards temps réel (
maximum_lag_minutes: 60) : modèles mart alimentant des dashboards exécutifs consultés fréquemment par les parties prenantes. - Reporting quotidien (
maximum_lag_minutes: 1440) : tables d’analytique standard où un rafraîchissement nocturne est suffisant. - Agrégations hebdomadaires (
maximum_lag_minutes: 10080) : rollups historiques où une fraîcheur hebdomadaire est acceptable.
Les politiques de fraîcheur se propagent également dans le graphe d’assets. Si un modèle mart dépend d’un modèle intermédiaire périmé, le statut de fraîcheur du mart reflète la péremption upstream. Il n’est pas nécessaire de définir des politiques sur chaque modèle — en les définissant sur les modèles ayant des exigences de fraîcheur explicites, Dagster en infère les implications en amont.
Orchestration basée sur des schedules
Pour la planification basée sur cron, build_schedule_from_dbt_selection crée un schedule Dagster directement depuis la syntaxe de sélection dbt :
from dagster_dbt import build_schedule_from_dbt_selection
daily_dbt_schedule = build_schedule_from_dbt_selection( [my_dbt_assets], job_name="daily_dbt_job", cron_schedule="0 6 * * *", dbt_select="tag:daily",)Cela exécute tous les modèles dbt tagués daily à 6h du matin. La syntaxe de sélection dbt est la même que celle utilisée avec dbt run --select tag:daily, donc la stratégie de tagging existante se traduit directement.
Plusieurs schedules peuvent être définis pour différentes cadences :
hourly_schedule = build_schedule_from_dbt_selection( [my_dbt_assets], job_name="hourly_dbt_job", cron_schedule="0 * * * *", dbt_select="tag:hourly",)
daily_schedule = build_schedule_from_dbt_selection( [my_dbt_assets], job_name="daily_dbt_job", cron_schedule="0 6 * * *", dbt_select="tag:daily",)Cela offre la même flexibilité de planification que le planificateur de jobs de dbt Cloud, mais intégré à la couche d’orchestration plus large de Dagster.
Exécution event-driven avec les Sensors
Les schedules s’exécutent selon des cadences fixes. Les sensors s’exécutent en réponse à des événements. Pour les pipelines de données, l’exécution event-driven est souvent plus appropriée : exécuter dbt quand les données sont prêtes, pas quand l’horloge l’indique.
Un sensor peut surveiller un événement upstream — un sync Fivetran complété, un fichier arrivé dans GCS, un pipeline dlt terminé — et déclencher dbt uniquement quand des données fraîches sont disponibles :
from dagster import sensor, RunRequest
@sensor(job=my_dbt_job)def fivetran_complete_sensor(context): # Vérifie si le sync Fivetran s'est complété depuis la dernière vérification if fivetran_sync_completed(): yield RunRequest(run_key=f"fivetran-{context.cursor}")L’avantage pratique est d’éliminer le mode de défaillance courant de l’orchestration basée sur les schedules : exécuter dbt à 6h du matin en espérant que la charge upstream ait terminé à temps. Avec un sensor, dbt ne s’exécute que lorsque la présence des données upstream est confirmée. Plus de runs incrémentaux vides ni de données partielles dans les tables downstream.
Patterns de sensors courants
Déclencheur Cloud Storage. Surveille les fichiers arrivant dans un bucket GCS ou S3. Quand un fichier CSV ou Parquet arrive, déclenche les modèles dbt qui transforment cette source.
Déclencheur de complétion d’API. Interroge l’API d’un outil d’ingestion (Fivetran, Airbyte, dlt) pour la complétion du sync. Déclenche dbt quand le sync réussit ; alerte et passe quand il échoue.
Déclencheur cross-asset. Matérialise les modèles dbt downstream seulement après que les assets Python upstream sont complétés. C’est le pattern de sensor qui rend le graphe d’assets unifié de Dagster pratique — les assets Python d’extraction et les assets dbt de transformation se coordonnent sans état partagé ni glue externe.
Automation Conditions
Au-delà des schedules fixes et des sensors explicites, Dagster prend en charge les automation conditions (également appelées auto-materialize policies) qui expriment de façon déclarative quand un asset doit être rafraîchi :
- Sur changement upstream. Re-matérialise quand un asset upstream a de nouvelles données. C’est la version event-driven consciente du graphe d’assets.
- Sur violation de fraîcheur. Re-matérialise quand la politique de fraîcheur serait violée si l’asset n’est pas rafraîchi bientôt.
- Sur schedule cron. Rafraîchissement périodique, similaire à un schedule mais défini au niveau de l’asset plutôt qu’au niveau du job.
Les automation conditions sont plus composables que les schedules ou les sensors car elles s’attachent à des assets individuels plutôt qu’à des groupes d’assets. Un modèle mart peut déclarer « re-matérialise quand l’upstream change » tandis qu’un modèle de base déclare « re-matérialise selon un schedule cron ». Dagster résout les dépendances automatiquement.
Comparaison avec les autres approches d’orchestration
Les capacités de fraîcheur et de planification de Dagster comblent un manque laissé par les outils d’orchestration plus simples :
| Capacité | Cloud Run Jobs | Cloud Workflows | Cloud Composer | Dagster |
|---|---|---|---|---|
| Planification cron | Cloud Scheduler | Cloud Scheduler | Airflow scheduler | Intégré |
| Déclencheurs event-driven | Eventarc | Eventarc | Sensors (Airflow) | Sensors |
| Suivi de fraîcheur | Aucun | Aucun | Limité (SLAs) | Natif par asset |
| Dépendances cross-système | Manuel | Étapes séquentielles | Au niveau du DAG | Au niveau des assets |
| Coût (mensuel) | < 5 $ | < 10 $ | 300-400 $+ | 10 $+ (Dagster+) |
Le cadre de décision pour l’orchestration GCP couvre en détail les compromis entre Cloud Run, Workflows et Composer. Dagster s’inscrit différemment dans le paysage : ce n’est pas un service natif GCP, mais il fournit le suivi de fraîcheur au niveau des assets et la gestion des dépendances cross-système que les outils natifs GCP n’offrent pas.
Pour les équipes dont les besoins d’orchestration se limitent à « exécuter dbt selon un schedule cron », Cloud Run Jobs est plus simple et moins cher. Les capacités de planification et de fraîcheur de Dagster s’appliquent quand une coordination event-driven entre plusieurs systèmes est nécessaire, ou quand la fraîcheur des données doit être suivie et alertée par asset.