Skip to content

Commit

Permalink
🐛[source-google-ads] Fixing an error in code that checks custom GAQL …
Browse files Browse the repository at this point in the history
…queries (#37840)

Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com>
Co-authored-by: Anatolii Yatsuk <tolikyatsuk@gmail.com>
  • Loading branch information
4 people authored May 27, 2024
1 parent 4665198 commit 154c3f1
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 253487c0-2246-43ba-a21f-5116b20a2c50
dockerImageTag: 3.4.3
dockerImageTag: 3.5.0
dockerRepository: airbyte/source-google-ads
documentationUrl: https://docs.airbyte.com/integrations/sources/google-ads
githubIssueLabel: source-google-ads
Expand Down
311 changes: 151 additions & 160 deletions airbyte-integrations/connectors/source-google-ads/poetry.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "3.4.3"
version = "3.5.0"
name = "source-google-ads"
description = "Source implementation for Google Ads."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.utils import AirbyteTracedException
from pendulum import parse, today
from pendulum import duration, parse, today

from .custom_query_stream import CustomQuery, IncrementalCustomQuery
from .google_ads import GoogleAds
Expand Down Expand Up @@ -151,6 +151,13 @@ def is_custom_query_incremental(query: GAQL) -> bool:
time_segment_in_select, time_segment_in_where = ["segments.date" in clause for clause in [query.fields, query.where]]
return time_segment_in_select and not time_segment_in_where

@staticmethod
def set_retention_period_and_slice_duration(stream: IncrementalCustomQuery, query: GAQL) -> IncrementalCustomQuery:
if query.resource_name == "click_view":
stream.days_of_data_storage = 90
stream.slice_duration = duration(days=0)
return stream

def create_custom_query_stream(
self,
google_api: GoogleAds,
Expand All @@ -173,7 +180,8 @@ def create_custom_query_stream(
incremental_config = non_manager_incremental_config

if is_incremental:
return IncrementalCustomQuery(config=single_query_config, **incremental_config)
incremental_query_stream = IncrementalCustomQuery(config=single_query_config, **incremental_config)
return self.set_retention_period_and_slice_duration(incremental_query_stream, query)
else:
return CustomQuery(config=single_query_config, api=google_api, customers=customers)

Expand All @@ -199,12 +207,19 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
continue

# Add segments.date to where clause of incremental custom queries if they are not present.
# The same will be done during read, but with start and end date from config
# The same will be done during read, but with start and end date from config.
if self.is_custom_query_incremental(query):
query = IncrementalCustomQuery.insert_segments_date_expr(query, "1980-01-01", "1980-01-01")
# Set default date value 1 month ago, as some tables have a limited lookback time frame.
month_back = today().subtract(months=1).to_date_string()
start_date = config.get("start_date", month_back)

query_date = month_back if start_date > month_back else start_date

query = IncrementalCustomQuery.insert_segments_date_expr(query, query_date, query_date)

query = query.set_limit(1)
try:
logger.info(f"Running the query for account {customer.id}: {query}")
response = google_api.send_request(
str(query),
customer_id=customer.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import pendulum
import pytest
from airbyte_cdk.models import AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, DestinationSyncMode, SyncMode
from pendulum import today
from pendulum import duration, today
from source_google_ads.custom_query_stream import IncrementalCustomQuery
from source_google_ads.google_ads import GoogleAds
from source_google_ads.models import CustomerModel
Expand Down Expand Up @@ -194,95 +194,95 @@ def stream_instance(query, api_mock, **kwargs):
[
(
"""
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions
FROM campaign
WHERE campaign.status = 'PAUSED'
AND metrics.impressions > 100
ORDER BY campaign.status
""",
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions
FROM campaign
WHERE campaign.status = 'PAUSED'
AND metrics.impressions > 100
ORDER BY campaign.status
""",
"""
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions,
segments.date
FROM campaign
WHERE campaign.status = 'PAUSED'
AND metrics.impressions > 100
AND segments.date BETWEEN '1980-01-01' AND '2000-01-01'
ORDER BY campaign.status
""",
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions,
segments.date
FROM campaign
WHERE campaign.status = 'PAUSED'
AND metrics.impressions > 100
AND segments.date BETWEEN '1980-01-01' AND '2000-01-01'
ORDER BY campaign.status
""",
),
(
"""
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions
FROM campaign
ORDER BY campaign.status
""",
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions
FROM campaign
ORDER BY campaign.status
""",
"""
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions,
segments.date
FROM campaign
WHERE segments.date BETWEEN '1980-01-01' AND '2000-01-01'
ORDER BY campaign.status
""",
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions,
segments.date
FROM campaign
WHERE segments.date BETWEEN '1980-01-01' AND '2000-01-01'
ORDER BY campaign.status
""",
),
(
"""
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions
FROM campaign
WHERE campaign.status = 'PAUSED'
AND metrics.impressions > 100
""",
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions
FROM campaign
WHERE campaign.status = 'PAUSED'
AND metrics.impressions > 100
""",
"""
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions,
segments.date
FROM campaign
WHERE campaign.status = 'PAUSED'
AND metrics.impressions > 100
AND segments.date BETWEEN '1980-01-01' AND '2000-01-01'
""",
SELECT
campaign.id,
campaign.name,
campaign.status,
metrics.impressions,
segments.date
FROM campaign
WHERE campaign.status = 'PAUSED'
AND metrics.impressions > 100
AND segments.date BETWEEN '1980-01-01' AND '2000-01-01'
""",
),
(
"""
SELECT
campaign.accessible_bidding_strategy,
segments.ad_destination_type,
campaign.start_date,
campaign.end_date
FROM campaign
""",
SELECT
campaign.accessible_bidding_strategy,
segments.ad_destination_type,
campaign.start_date,
campaign.end_date
FROM campaign
""",
"""
SELECT
campaign.accessible_bidding_strategy,
segments.ad_destination_type,
campaign.start_date,
campaign.end_date,
segments.date
FROM campaign
WHERE segments.date BETWEEN '1980-01-01' AND '2000-01-01'
""",
SELECT
campaign.accessible_bidding_strategy,
segments.ad_destination_type,
campaign.start_date,
campaign.end_date,
segments.date
FROM campaign
WHERE segments.date BETWEEN '1980-01-01' AND '2000-01-01'
""",
),
],
)
Expand Down Expand Up @@ -519,3 +519,18 @@ def test_get_customers(mocker, customer_status_filter, expected_ids, send_reques

assert len(customers) == len(expected_ids)
assert {customer.id for customer in customers} == set(expected_ids)


def test_set_retention_period_and_slice_duration(mock_fields_meta_data):
query = GAQL.parse("SELECT click_view.gclid, click_view.area_of_interest_city FROM click_view")
stream = IncrementalCustomQuery(
api=mock_fields_meta_data,
conversion_window_days=14,
start_date="1980-01-01",
config={"query": query, "table_name": "whatever_table"},
customers=[],
)
updated_stream = SourceGoogleAds.set_retention_period_and_slice_duration(stream, query)

assert updated_stream.days_of_data_storage == 90
assert updated_stream.slice_duration == duration(days=0)
5 changes: 3 additions & 2 deletions docs/integrations/sources/google-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,9 @@ Due to a limitation in the Google Ads API which does not allow getting performan
## Changelog

| Version | Date | Pull Request | Subject |
| :------- | :--------- | :------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------- |
| 3.4.3 | 2024-05-20 | [38270](https://github.com/airbytehq/airbyte/pull/38270) | Replace AirbyteLogger with logging.Logger |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| `3.5.0` | 2024-05-22 | [37840](https://github.com/airbytehq/airbyte/pull/37840) | Fix custom GAQL queries default value |
| `3.4.3` | 2024-05-20 | [38270](https://github.com/airbytehq/airbyte/pull/38270) | Replace AirbyteLogger with logging.Logger |
| `3.4.2` | 2024-04-24 | [36638](https://github.com/airbytehq/airbyte/pull/36638) | Schema descriptions and CDK 0.80.0 |
| `3.4.1` | 2024-04-08 | [36891](https://github.com/airbytehq/airbyte/pull/36891) | Optimize `check` method |
| `3.4.0` | 2024-03-19 | [36267](https://github.com/airbytehq/airbyte/pull/36267) | Pin airbyte-cdk version to `^0` |
Expand Down

0 comments on commit 154c3f1

Please sign in to comment.