Skip to content

Commit

Permalink
Revert "feat(data-warehouse): Enable incrementality for all stripe ta…
Browse files Browse the repository at this point in the history
…bles (#2…"

This reverts commit 3f233ec.
  • Loading branch information
EDsCODE authored Jun 20, 2024
1 parent 7b9bf83 commit eccdbac
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 60 deletions.
11 changes: 10 additions & 1 deletion posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,17 @@ def __init__(

self._incremental = incremental

@property
def _get_pipeline_name_base(self):
return f"{self.inputs.job_type}_pipeline_{self.inputs.team_id}_run"

def _get_pipeline_name(self):
return f"{self.inputs.job_type}_pipeline_{self.inputs.team_id}_run_{self.inputs.schema_id}"
base = self._get_pipeline_name_base

if self._incremental:
return f"{base}_{self.inputs.source_id}"

return f"{base}_{self.inputs.run_id}"

def _get_destination(self):
if TEST:
Expand Down
50 changes: 7 additions & 43 deletions posthog/temporal/data_imports/pipelines/stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
resources: dict[str, EndpointResource] = {
"BalanceTransaction": {
"name": "BalanceTransaction",
"table_name": "balance_transaction",
"table_name": "balancetransaction",
"primary_key": "id",
"write_disposition": "merge",
"columns": get_dlt_mapping_for_external_table("stripe_balancetransaction"), # type: ignore
Expand All @@ -19,13 +19,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"path": "/v1/balance_transactions",
"params": {
# the parameters below can optionally be configured
"created[gte]": {
"type": "incremental",
"cursor_path": "created",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
# "created": "OPTIONAL_CONFIG",
# "currency": "OPTIONAL_CONFIG",
# "ending_before": "OPTIONAL_CONFIG",
# "expand": "OPTIONAL_CONFIG",
Expand All @@ -48,13 +42,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"path": "/v1/charges",
"params": {
# the parameters below can optionally be configured
"created[gte]": {
"type": "incremental",
"cursor_path": "created",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
# "created": "OPTIONAL_CONFIG",
# "customer": "OPTIONAL_CONFIG",
# "ending_before": "OPTIONAL_CONFIG",
# "expand": "OPTIONAL_CONFIG",
Expand All @@ -76,13 +64,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"path": "/v1/customers",
"params": {
# the parameters below can optionally be configured
"created[gte]": {
"type": "incremental",
"cursor_path": "created",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
# "created": "OPTIONAL_CONFIG",
# "email": "OPTIONAL_CONFIG",
# "ending_before": "OPTIONAL_CONFIG",
# "expand": "OPTIONAL_CONFIG",
Expand Down Expand Up @@ -134,13 +116,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"params": {
# the parameters below can optionally be configured
# "active": "OPTIONAL_CONFIG",
"created[gte]": {
"type": "incremental",
"cursor_path": "created",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
# "created": "OPTIONAL_CONFIG",
# "currency": "OPTIONAL_CONFIG",
# "ending_before": "OPTIONAL_CONFIG",
# "expand": "OPTIONAL_CONFIG",
Expand All @@ -165,13 +141,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"params": {
# the parameters below can optionally be configured
# "active": "OPTIONAL_CONFIG",
"created[gte]": {
"type": "incremental",
"cursor_path": "created",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
# "created": "OPTIONAL_CONFIG",
# "ending_before": "OPTIONAL_CONFIG",
# "expand": "OPTIONAL_CONFIG",
# "ids": "OPTIONAL_CONFIG",
Expand All @@ -194,13 +164,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"params": {
# the parameters below can optionally be configured
# "collection_method": "OPTIONAL_CONFIG",
"created[gte]": {
"type": "incremental",
"cursor_path": "created",
"initial_value": 0, # type: ignore
}
if is_incremental
else None,
# "created": "OPTIONAL_CONFIG",
# "current_period_end": "OPTIONAL_CONFIG",
# "current_period_start": "OPTIONAL_CONFIG",
# "customer": "OPTIONAL_CONFIG",
Expand Down
2 changes: 1 addition & 1 deletion posthog/temporal/data_imports/pipelines/stripe/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
# These endpoints are converted into ExternalDataSchema objects when a source is linked.
ENDPOINTS = ("BalanceTransaction", "Subscription", "Customer", "Product", "Price", "Invoice", "Charge")

INCREMENTAL_ENDPOINTS = ("BalanceTransaction", "Subscription", "Customer", "Product", "Price", "Invoice", "Charge")
INCREMENTAL_ENDPOINTS = ("Invoice",)
20 changes: 8 additions & 12 deletions posthog/temporal/tests/external_data/test_external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,17 @@ async def setup_job_1():
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

customer_schema = await _create_schema("Customer", new_source, team)

new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)(
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
schema=customer_schema,
)

new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()

customer_schema = await _create_schema("Customer", new_source, team)

inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=str(new_job.pk),
Expand Down Expand Up @@ -426,20 +425,19 @@ async def setup_job_1():
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

customer_schema = await _create_schema("Customer", new_source, team)

# Already canceled so it should only run once
# This imitates if the job was canceled mid run
new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)(
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.CANCELLED,
rows_synced=0,
schema=customer_schema,
)

new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()

customer_schema = await _create_schema("Customer", new_source, team)

inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=str(new_job.pk),
Expand Down Expand Up @@ -512,18 +510,17 @@ async def setup_job_1():
job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"},
)

customer_schema = await _create_schema("Customer", new_source, team)

new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)(
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
schema=customer_schema,
)

new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()

customer_schema = await _create_schema("Customer", new_source, team)

inputs = ImportDataActivityInputs(
team_id=team.id,
run_id=str(new_job.pk),
Expand Down Expand Up @@ -678,18 +675,17 @@ async def setup_job_1():
},
)

posthog_test_schema = await _create_schema("posthog_test", new_source, team)

new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)(
team_id=team.id,
pipeline_id=new_source.pk,
status=ExternalDataJob.Status.RUNNING,
rows_synced=0,
schema=posthog_test_schema,
)

new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)()

posthog_test_schema = await _create_schema("posthog_test", new_source, team)

inputs = ImportDataActivityInputs(
team_id=team.id, run_id=str(new_job.pk), source_id=new_source.pk, schema_id=posthog_test_schema.id
)
Expand Down
3 changes: 2 additions & 1 deletion posthog/warehouse/data_load/validate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
aget_schema_by_id,
)

from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING
from posthog.warehouse.models.external_data_job import ExternalDataJob
from posthog.temporal.common.logger import bind_temporal_worker_logger
from clickhouse_driver.errors import ServerException
Expand Down Expand Up @@ -124,7 +125,7 @@ async def validate_schema_and_update_table(

_schema_id = external_data_schema.id
_schema_name: str = external_data_schema.name
incremental = external_data_schema.is_incremental
incremental = _schema_name in PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING[job.pipeline.source_type]

table_name = f"{job.pipeline.prefix or ''}{job.pipeline.source_type}_{_schema_name}".lower()
normalized_schema_name = NamingConvention().normalize_identifier(_schema_name)
Expand Down
5 changes: 4 additions & 1 deletion posthog/warehouse/models/external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ class Status(models.TextChoices):

@property
def folder_path(self) -> str:
return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema_id)}".lower().replace("-", "_")
if self.schema and self.schema.is_incremental:
return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema.pk)}".lower().replace("-", "_")

return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.pk)}".lower().replace("-", "_")

def url_pattern_by_schema(self, schema: str) -> str:
if TEST:
Expand Down
1 change: 0 additions & 1 deletion posthog/warehouse/models/external_table_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,6 @@
def get_dlt_mapping_for_external_table(table):
return {
field.name: {
"name": field.name,
"data_type": HOGQL_FIELD_DLT_TYPE_MAP[type(field)],
"nullable": True,
}
Expand Down

0 comments on commit eccdbac

Please sign in to comment.