ServicesÀ proposNotesContact Me contacter →
EN FR
Note

Pipeline dlt Google Ads

Construire un pipeline Google Ads vers BigQuery avec dlt — la source vérifiée, les patterns de requêtes GAQL, le chargement incrémental et les options de déploiement.

Planté
google adsbigquerydltdata engineeringetlincremental processing

La source Google Ads de dlt est un chemin Python-native pour charger les données Google Ads dans BigQuery. Vous écrivez des requêtes GAQL ; dlt gère la pagination, le rate limiting, l’inférence de schéma et le chargement incrémental. Le pipeline est versionné, testable et s’exécute partout où Python s’exécute.

Prérequis : un token développeur Google Ads. dlt utilise directement l’API Google Ads, qui en nécessite un. Les alternatives sans token développeur incluent le Data Transfer Service et Google Ads Scripts.

Quand utiliser dlt pour Google Ads

Par rapport aux connecteurs managés (Fivetran, Airbyte), les facteurs pertinents spécifiquement pour Google Ads :

  • Le pricing MAR de Fivetran est affecté par les données marketing. Les métriques publicitaires se mettent à jour de manière rétroactive en raison des fenêtres d’attribution, générant des nombres de lignes élevés à chaque cycle de synchronisation.
  • Les données Google Ads ont des volumes de lignes importants au niveau des mots-clés ou des annonces. Les grands comptes avec des milliers de campagnes actives génèrent un MAR significatif.
  • La source Google Ads de dlt est vérifiée (pas une contribution communautaire) et maintenue selon des standards de production.

Voir Économie build vs. buy des pipelines de données pour le cadre de décision complet.

Configuration

Installez dlt avec la destination BigQuery :

Terminal window
pip install "dlt[bigquery]"

Configurez les credentials dans secrets.toml :

[sources.google_ads]
developer_token = "your-22-char-developer-token"
client_id = "your-oauth-client-id"
client_secret = "your-oauth-client-secret"
refresh_token = "your-refresh-token"
customer_id = "1234567890"
[destination.bigquery]
project_id = "your-gcp-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."
client_email = "sa@your-project.iam.gserviceaccount.com"

L’authentification utilise OAuth. Vous devrez créer des credentials dans la Google Cloud Console et exécuter le flux OAuth une fois pour générer le refresh token. Après cela, le pipeline s’exécute sans surveillance.

Un pipeline de base

import dlt
from dlt.sources.google_ads import google_ads
pipeline = dlt.pipeline(
pipeline_name="google_ads",
destination="bigquery",
dataset_name="google_ads_raw"
)
source = google_ads(
customer_id="1234567890",
queries=[
{
"query": """
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.conversions,
segments.date
FROM campaign
WHERE segments.date DURING LAST_30_DAYS
""",
"table_name": "campaign_performance"
}
]
)
load_info = pipeline.run(source)
print(load_info)

Cela gère la pagination, le rate limiting et l’inférence de schéma automatiquement. Le résultat est une table campaign_performance dans BigQuery avec les champs que vous avez spécifiés.

Notez le champ metrics.cost_micros — Google rapporte les coûts en millionièmes d’unité monétaire. Votre modèle dbt de base devrait diviser par 1 000 000. Intégrer cette conversion dans le pipeline lui-même est une alternative raisonnable, mais le faire dans dbt garde la couche brute non modifiée et la transformation auditable.

Patterns de requêtes GAQL

GAQL (Google Ads Query Language) ressemble à SQL. Il interroge depuis une ressource primaire et joint implicitement les entités liées. Quelques patterns à connaître :

Performance de campagne avec segments :

SELECT
campaign.id,
campaign.name,
ad_group.id,
ad_group.name,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.conversions,
segments.date,
segments.device
FROM ad_group
WHERE segments.date DURING LAST_7_DAYS
AND campaign.status = 'ENABLED'
AND ad_group.status = 'ENABLED'
ORDER BY metrics.cost_micros DESC

Performance des mots-clés :

SELECT
campaign.id,
ad_group.id,
ad_group_criterion.keyword.text,
ad_group_criterion.keyword.match_type,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.average_cpc,
metrics.conversions,
segments.date
FROM keyword_view
WHERE segments.date DURING LAST_30_DAYS
AND campaign.status = 'ENABLED'

Rapport de termes de recherche :

SELECT
campaign.id,
ad_group.id,
search_term_view.search_term,
search_term_view.status,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.conversions,
segments.date
FROM search_term_view
WHERE segments.date DURING LAST_7_DAYS

GAQL a des contraintes. Vous ne pouvez pas faire de JOIN entre des ressources non liées dans une seule requête. Chaque requête cible une ressource primaire ; les champs liés sont disponibles implicitement quand ils ont une relation définie dans le schéma de l’API. Pour certaines requêtes complexes entre entités, vous aurez besoin de plusieurs requêtes alimentant des tables séparées, puis de jointures dans dbt.

Chargement incrémental

Exécuter le pipeline avec DURING LAST_30_DAYS à chaque exécution est inefficace. Pour les synchronisations continues, utilisez le chargement incrémental de dlt pour ne récupérer que les données nouvelles ou mises à jour.

Les données Google Ads ont un défi spécifique : les métriques de conversion se mettent à jour de manière rétroactive. Un clic de la semaine dernière peut générer une conversion aujourd’hui, ce qui modifie les métriques de la semaine dernière. Vous avez besoin d’une fenêtre de lookback pour capturer ces mises à jour.

Un pattern incrémental pratique :

import dlt
from dlt.sources.google_ads import google_ads
from datetime import datetime, timedelta
def get_date_range():
end_date = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d')
# Regarder 30 jours en arrière pour capturer les mises à jour d'attribution
start_date = (datetime.today() - timedelta(days=30)).strftime('%Y-%m-%d')
return start_date, end_date
start_date, end_date = get_date_range()
source = google_ads(
customer_id="1234567890",
queries=[
{
"query": f"""
SELECT
campaign.id,
campaign.name,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.conversions,
segments.date
FROM campaign
WHERE segments.date BETWEEN '{start_date}' AND '{end_date}'
""",
"table_name": "campaign_performance"
}
]
)
pipeline = dlt.pipeline(
pipeline_name="google_ads_incremental",
destination="bigquery",
dataset_name="google_ads_raw"
)
# Utiliser merge pour l'upsert — gère les mises à jour rétroactives d'attribution
load_info = pipeline.run(
source,
write_disposition="merge",
primary_key=["campaign_id", "date"]
)

Le write_disposition="merge" avec une clé primaire composée de l’ID de campagne et de la date signifie que chaque exécution fait un upsert des 30 derniers jours. Les lignes existantes pour ces dates sont mises à jour ; les lignes en dehors de la fenêtre ne sont pas touchées. Cela gère le problème de réajustement de la fenêtre d’attribution au niveau du pipeline.

La fenêtre de lookback de 30 jours correspond à la façon dont le Data Transfer Service gère sa fenêtre de rafraîchissement. La différence : avec dlt, vous contrôlez la fenêtre explicitement.

Backfill historique

Contrairement à DTS (limité à 30 jours d’historique lors de la configuration), dlt peut récupérer des données depuis n’importe quelle date supportée par l’API. La disponibilité des données de l’API Google Ads varie selon la ressource, mais les données au niveau campagne remontent généralement à plus de 4 ans.

Pour un backfill initial, bouclez sur des plages de dates et exécutez le pipeline par morceaux :

from datetime import datetime, timedelta
def backfill_google_ads(start_date_str: str):
pipeline = dlt.pipeline(
pipeline_name="google_ads_backfill",
destination="bigquery",
dataset_name="google_ads_raw"
)
start = datetime.strptime(start_date_str, "%Y-%m-%d")
end = datetime.today() - timedelta(days=1)
chunk_days = 90 # Extraire 90 jours à la fois
current = start
while current < end:
chunk_end = min(current + timedelta(days=chunk_days), end)
source = google_ads(
customer_id="1234567890",
queries=[{
"query": f"""
SELECT campaign.id, campaign.name,
metrics.clicks, metrics.impressions,
metrics.cost_micros, metrics.conversions,
segments.date
FROM campaign
WHERE segments.date BETWEEN '{current.strftime('%Y-%m-%d')}'
AND '{chunk_end.strftime('%Y-%m-%d')}'
""",
"table_name": "campaign_performance"
}]
)
load_info = pipeline.run(
source,
write_disposition="merge",
primary_key=["campaign_id", "date"]
)
print(f"Loaded {current.date()} to {chunk_end.date()}: {load_info}")
current = chunk_end + timedelta(days=1)
backfill_google_ads("2024-01-01")

Après le backfill, passez au pattern incrémental pour les synchronisations quotidiennes. La write disposition merge signifie qu’il n’y a pas de conflit entre les données historiques et les synchronisations continues.

Comptes multiples (MCC)

Pour les configurations d’agence ou de consultant gérant plusieurs comptes clients sous un compte Manager :

customer_ids = ["1234567890", "0987654321", "1122334455"]
pipeline = dlt.pipeline(
pipeline_name="google_ads_mcc",
destination="bigquery",
dataset_name="google_ads_raw"
)
for customer_id in customer_ids:
source = google_ads(
customer_id=customer_id,
queries=[...]
)
load_info = pipeline.run(
source,
table_name_prefix=f"account_{customer_id}_"
)

Alternativement, incluez customer_id comme champ dans votre requête et écrivez tous les comptes dans les mêmes tables avec l’ID client comme clé de partition. La couche dbt filtre alors par ID client plutôt que de faire des jointures entre tables.

Déploiement

Le pipeline est portable. Développement sur votre laptop, production dans Cloud Functions, Cloud Run ou Airflow — le même code s’exécute partout.

Pour un déploiement Cloud Functions, le pattern est simple : encapsulez le pipeline dans un gestionnaire Cloud Function, déclenchez-le via Cloud Scheduler. Voir Options de déploiement dlt pour les spécifications d’infrastructure.

L’avantage par rapport aux Google Ads Scripts est l’absence de plafond de 30 minutes. Un pipeline récupérant des données au niveau des mots-clés pour un grand compte avec 50 000 mots-clés actifs peut s’exécuter aussi longtemps que nécessaire.

Pour les fondamentaux de dlt qui sous-tendent ce pipeline, voir Concepts fondamentaux de dlt et dlt et intégration BigQuery.