ServicesAboutNotesContact Get in touch →
EN FR
Note

dlt Google Ads Pipeline

Building a Google Ads to BigQuery pipeline with dlt — the verified source, GAQL query patterns, incremental loading, and deployment options.

Planted
google adsbigquerydltdata engineeringetlincremental processing

dlt’s Google Ads source is a Python-native path to loading Google Ads data into BigQuery. You write GAQL queries; dlt handles pagination, rate limiting, schema inference, and incremental loading. The pipeline is version-controlled, testable, and runs anywhere Python runs.

Prerequisite: a Google Ads developer token. dlt uses the Google Ads API directly, which requires one. Alternatives without a developer token include the Data Transfer Service and Google Ads Scripts.

When to use dlt for Google Ads

Compared to managed connectors (Fivetran, Airbyte), relevant factors for Google Ads specifically:

  • Fivetran’s MAR pricing is affected by marketing data. Ad metrics update retroactively due to attribution windows, generating high row counts on every sync cycle.
  • Google Ads data has large row counts at keyword or ad level. Large accounts with thousands of active campaigns generate significant MAR.
  • The dlt Google Ads source is verified (not community-contributed) and maintained to production standards.

See Build vs. Buy Data Pipeline Economics for the full decision framework.

Setup

Install dlt with the BigQuery destination:

Terminal window
pip install "dlt[bigquery]"

Configure credentials in secrets.toml:

[sources.google_ads]
developer_token = "your-22-char-developer-token"
client_id = "your-oauth-client-id"
client_secret = "your-oauth-client-secret"
refresh_token = "your-refresh-token"
customer_id = "1234567890"
[destination.bigquery]
project_id = "your-gcp-project"
private_key = "-----BEGIN PRIVATE KEY-----\n..."
client_email = "sa@your-project.iam.gserviceaccount.com"

Authentication uses OAuth. You’ll need to create credentials in the Google Cloud Console and run the OAuth flow once to generate the refresh token. After that, the pipeline runs unattended.

A Basic Pipeline

import dlt
from dlt.sources.google_ads import google_ads
pipeline = dlt.pipeline(
pipeline_name="google_ads",
destination="bigquery",
dataset_name="google_ads_raw"
)
source = google_ads(
customer_id="1234567890",
queries=[
{
"query": """
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.conversions,
segments.date
FROM campaign
WHERE segments.date DURING LAST_30_DAYS
""",
"table_name": "campaign_performance"
}
]
)
load_info = pipeline.run(source)
print(load_info)

This handles pagination, rate limiting, and schema inference automatically. The result is a campaign_performance table in BigQuery with the fields you specified.

Note the metrics.cost_micros field — Google reports cost in millionths of a currency unit. Your dbt base model should divide by 1,000,000. Building that conversion into the pipeline itself is a reasonable alternative, but doing it in dbt keeps the raw layer unmodified and the transformation auditable.

GAQL Query Patterns

GAQL (Google Ads Query Language) is SQL-like. It queries from a primary resource and joins related entities implicitly. Some patterns worth knowing:

Campaign performance with segments:

SELECT
campaign.id,
campaign.name,
ad_group.id,
ad_group.name,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.conversions,
segments.date,
segments.device
FROM ad_group
WHERE segments.date DURING LAST_7_DAYS
AND campaign.status = 'ENABLED'
AND ad_group.status = 'ENABLED'
ORDER BY metrics.cost_micros DESC

Keyword performance:

SELECT
campaign.id,
ad_group.id,
ad_group_criterion.keyword.text,
ad_group_criterion.keyword.match_type,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.average_cpc,
metrics.conversions,
segments.date
FROM keyword_view
WHERE segments.date DURING LAST_30_DAYS
AND campaign.status = 'ENABLED'

Search query report:

SELECT
campaign.id,
ad_group.id,
search_term_view.search_term,
search_term_view.status,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.conversions,
segments.date
FROM search_term_view
WHERE segments.date DURING LAST_7_DAYS

GAQL has constraints. You can’t JOIN across unrelated resources in a single query. Each query targets one primary resource; related fields are available implicitly when they have a defined relationship in the API schema. For some complex cross-entity queries, you’ll need multiple queries feeding separate tables, then join in dbt.

Incremental Loading

Running the pipeline with DURING LAST_30_DAYS on every run is inefficient. For ongoing syncs, use dlt’s incremental loading to fetch only new or updated data.

Google Ads data has a specific challenge: conversion metrics update retroactively. A click from last week may generate a conversion today, which modifies last week’s metrics. You need a lookback window to catch these updates.

A practical incremental pattern:

import dlt
from dlt.sources.google_ads import google_ads
from datetime import datetime, timedelta
def get_date_range():
end_date = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d')
# Look back 30 days to catch attribution updates
start_date = (datetime.today() - timedelta(days=30)).strftime('%Y-%m-%d')
return start_date, end_date
start_date, end_date = get_date_range()
source = google_ads(
customer_id="1234567890",
queries=[
{
"query": f"""
SELECT
campaign.id,
campaign.name,
metrics.clicks,
metrics.impressions,
metrics.cost_micros,
metrics.conversions,
segments.date
FROM campaign
WHERE segments.date BETWEEN '{start_date}' AND '{end_date}'
""",
"table_name": "campaign_performance"
}
]
)
pipeline = dlt.pipeline(
pipeline_name="google_ads_incremental",
destination="bigquery",
dataset_name="google_ads_raw"
)
# Use merge to upsert — handles retroactive attribution updates
load_info = pipeline.run(
source,
write_disposition="merge",
primary_key=["campaign_id", "date"]
)

The write_disposition="merge" with a compound primary key of campaign ID and date means each run upserts the last 30 days. Existing rows for those dates get updated; rows outside the window are untouched. This handles the attribution window restatement problem at the pipeline level.

The 30-day lookback mirrors how Data Transfer Service handles its refresh window. The difference: with dlt, you control the window explicitly.

Historical Backfill

Unlike DTS (which is limited to 30 days of history at setup), dlt can pull data from any date the API supports. Google Ads API data availability varies by resource, but campaign-level data typically goes back 4+ years.

For an initial backfill, loop over date ranges and run the pipeline in chunks:

from datetime import datetime, timedelta
def backfill_google_ads(start_date_str: str):
pipeline = dlt.pipeline(
pipeline_name="google_ads_backfill",
destination="bigquery",
dataset_name="google_ads_raw"
)
start = datetime.strptime(start_date_str, "%Y-%m-%d")
end = datetime.today() - timedelta(days=1)
chunk_days = 90 # Pull 90 days at a time
current = start
while current < end:
chunk_end = min(current + timedelta(days=chunk_days), end)
source = google_ads(
customer_id="1234567890",
queries=[{
"query": f"""
SELECT campaign.id, campaign.name,
metrics.clicks, metrics.impressions,
metrics.cost_micros, metrics.conversions,
segments.date
FROM campaign
WHERE segments.date BETWEEN '{current.strftime('%Y-%m-%d')}'
AND '{chunk_end.strftime('%Y-%m-%d')}'
""",
"table_name": "campaign_performance"
}]
)
load_info = pipeline.run(
source,
write_disposition="merge",
primary_key=["campaign_id", "date"]
)
print(f"Loaded {current.date()} to {chunk_end.date()}: {load_info}")
current = chunk_end + timedelta(days=1)
backfill_google_ads("2024-01-01")

After the backfill, switch to the incremental pattern for daily syncs. The merge write disposition means there’s no conflict between the historical data and ongoing syncs.

Multiple Accounts (MCC)

For agency or consultant setups managing multiple client accounts under a Manager Account:

customer_ids = ["1234567890", "0987654321", "1122334455"]
pipeline = dlt.pipeline(
pipeline_name="google_ads_mcc",
destination="bigquery",
dataset_name="google_ads_raw"
)
for customer_id in customer_ids:
source = google_ads(
customer_id=customer_id,
queries=[...]
)
load_info = pipeline.run(
source,
table_name_prefix=f"account_{customer_id}_"
)

Alternatively, include customer_id as a field in your query and write all accounts to the same tables with customer ID as a partition key. The dbt layer then filters by customer ID rather than joining across tables.

Deployment

The pipeline is portable. Development on your laptop, production in Cloud Functions, Cloud Run, or Airflow — the same code runs everywhere.

For a Cloud Functions deployment, the pattern is straightforward: wrap the pipeline in a Cloud Function handler, trigger it via Cloud Scheduler. See dlt Deployment Options for the infrastructure specifics.

The advantage over Google Ads Scripts is no 30-minute ceiling. A pipeline pulling keyword-level data for a large account with 50,000 active keywords can run as long as it needs to.

For the dlt fundamentals that underpin this pipeline, see dlt Core Concepts and dlt and BigQuery Integration.