diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 18da8f563cd04..ab956c43164c7 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -42,8 +42,17 @@ def __init__( self._incremental = incremental + @property + def _get_pipeline_name_base(self): + return f"{self.inputs.job_type}_pipeline_{self.inputs.team_id}_run" + def _get_pipeline_name(self): - return f"{self.inputs.job_type}_pipeline_{self.inputs.team_id}_run_{self.inputs.schema_id}" + base = self._get_pipeline_name_base + + if self._incremental: + return f"{base}_{self.inputs.source_id}" + + return f"{base}_{self.inputs.run_id}" def _get_destination(self): if TEST: diff --git a/posthog/temporal/data_imports/pipelines/stripe/__init__.py b/posthog/temporal/data_imports/pipelines/stripe/__init__.py index 0015480781b5e..d74c04ca5e7dc 100644 --- a/posthog/temporal/data_imports/pipelines/stripe/__init__.py +++ b/posthog/temporal/data_imports/pipelines/stripe/__init__.py @@ -10,7 +10,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: resources: dict[str, EndpointResource] = { "BalanceTransaction": { "name": "BalanceTransaction", - "table_name": "balance_transaction", + "table_name": "balancetransaction", "primary_key": "id", "write_disposition": "merge", "columns": get_dlt_mapping_for_external_table("stripe_balancetransaction"), # type: ignore @@ -19,13 +19,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "path": "/v1/balance_transactions", "params": { # the parameters below can optionally be configured - "created[gte]": { - "type": "incremental", - "cursor_path": "created", - "initial_value": 0, # type: ignore - } - if is_incremental - else None, + # "created": "OPTIONAL_CONFIG", # "currency": "OPTIONAL_CONFIG", # "ending_before": "OPTIONAL_CONFIG", # "expand": "OPTIONAL_CONFIG", @@ -48,13 +42,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "path": "/v1/charges", "params": { # the parameters below can optionally be configured - "created[gte]": { - "type": "incremental", - "cursor_path": "created", - "initial_value": 0, # type: ignore - } - if is_incremental - else None, + # "created": "OPTIONAL_CONFIG", # "customer": "OPTIONAL_CONFIG", # "ending_before": "OPTIONAL_CONFIG", # "expand": "OPTIONAL_CONFIG", @@ -76,13 +64,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "path": "/v1/customers", "params": { # the parameters below can optionally be configured - "created[gte]": { - "type": "incremental", - "cursor_path": "created", - "initial_value": 0, # type: ignore - } - if is_incremental - else None, + # "created": "OPTIONAL_CONFIG", # "email": "OPTIONAL_CONFIG", # "ending_before": "OPTIONAL_CONFIG", # "expand": "OPTIONAL_CONFIG", @@ -134,13 +116,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "params": { # the parameters below can optionally be configured # "active": "OPTIONAL_CONFIG", - "created[gte]": { - "type": "incremental", - "cursor_path": "created", - "initial_value": 0, # type: ignore - } - if is_incremental - else None, + # "created": "OPTIONAL_CONFIG", # "currency": "OPTIONAL_CONFIG", # "ending_before": "OPTIONAL_CONFIG", # "expand": "OPTIONAL_CONFIG", @@ -165,13 +141,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "params": { # the parameters below can optionally be configured # "active": "OPTIONAL_CONFIG", - "created[gte]": { - "type": "incremental", - "cursor_path": "created", - "initial_value": 0, # type: ignore - } - if is_incremental - else None, + # "created": "OPTIONAL_CONFIG", # "ending_before": "OPTIONAL_CONFIG", # "expand": "OPTIONAL_CONFIG", # "ids": "OPTIONAL_CONFIG", @@ -194,13 +164,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "params": { # the parameters below can optionally be configured # "collection_method": "OPTIONAL_CONFIG", - "created[gte]": { - "type": "incremental", - "cursor_path": "created", - "initial_value": 0, # type: ignore - } - if is_incremental - else None, + # "created": "OPTIONAL_CONFIG", # "current_period_end": "OPTIONAL_CONFIG", # "current_period_start": "OPTIONAL_CONFIG", # "customer": "OPTIONAL_CONFIG", diff --git a/posthog/temporal/data_imports/pipelines/stripe/settings.py b/posthog/temporal/data_imports/pipelines/stripe/settings.py index c280391c24c96..bd1ade0d69c25 100644 --- a/posthog/temporal/data_imports/pipelines/stripe/settings.py +++ b/posthog/temporal/data_imports/pipelines/stripe/settings.py @@ -5,4 +5,4 @@ # These endpoints are converted into ExternalDataSchema objects when a source is linked. ENDPOINTS = ("BalanceTransaction", "Subscription", "Customer", "Product", "Price", "Invoice", "Charge") -INCREMENTAL_ENDPOINTS = ("BalanceTransaction", "Subscription", "Customer", "Product", "Price", "Invoice", "Charge") +INCREMENTAL_ENDPOINTS = ("Invoice",) diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index 01224cb58c410..c87ba54c3d214 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -273,18 +273,17 @@ async def setup_job_1(): job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, ) - customer_schema = await _create_schema("Customer", new_source, team) - new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)( team_id=team.id, pipeline_id=new_source.pk, status=ExternalDataJob.Status.RUNNING, rows_synced=0, - schema=customer_schema, ) new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() + customer_schema = await _create_schema("Customer", new_source, team) + inputs = ImportDataActivityInputs( team_id=team.id, run_id=str(new_job.pk), @@ -426,8 +425,6 @@ async def setup_job_1(): job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, ) - customer_schema = await _create_schema("Customer", new_source, team) - # Already canceled so it should only run once # This imitates if the job was canceled mid run new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)( @@ -435,11 +432,12 @@ async def setup_job_1(): pipeline_id=new_source.pk, status=ExternalDataJob.Status.CANCELLED, rows_synced=0, - schema=customer_schema, ) new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() + customer_schema = await _create_schema("Customer", new_source, team) + inputs = ImportDataActivityInputs( team_id=team.id, run_id=str(new_job.pk), @@ -512,18 +510,17 @@ async def setup_job_1(): job_inputs={"stripe_secret_key": "test-key", "stripe_account_id": "acct_id"}, ) - customer_schema = await _create_schema("Customer", new_source, team) - new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)( team_id=team.id, pipeline_id=new_source.pk, status=ExternalDataJob.Status.RUNNING, rows_synced=0, - schema=customer_schema, ) new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() + customer_schema = await _create_schema("Customer", new_source, team) + inputs = ImportDataActivityInputs( team_id=team.id, run_id=str(new_job.pk), @@ -678,18 +675,17 @@ async def setup_job_1(): }, ) - posthog_test_schema = await _create_schema("posthog_test", new_source, team) - new_job: ExternalDataJob = await sync_to_async(ExternalDataJob.objects.create)( team_id=team.id, pipeline_id=new_source.pk, status=ExternalDataJob.Status.RUNNING, rows_synced=0, - schema=posthog_test_schema, ) new_job = await sync_to_async(ExternalDataJob.objects.filter(id=new_job.id).prefetch_related("pipeline").get)() + posthog_test_schema = await _create_schema("posthog_test", new_source, team) + inputs = ImportDataActivityInputs( team_id=team.id, run_id=str(new_job.pk), source_id=new_source.pk, schema_id=posthog_test_schema.id ) diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index 738ed72904eb6..3fe601db74b4a 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -24,6 +24,7 @@ aget_schema_by_id, ) +from posthog.temporal.data_imports.pipelines.schemas import PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING from posthog.warehouse.models.external_data_job import ExternalDataJob from posthog.temporal.common.logger import bind_temporal_worker_logger from clickhouse_driver.errors import ServerException @@ -124,7 +125,7 @@ async def validate_schema_and_update_table( _schema_id = external_data_schema.id _schema_name: str = external_data_schema.name - incremental = external_data_schema.is_incremental + incremental = _schema_name in PIPELINE_TYPE_INCREMENTAL_ENDPOINTS_MAPPING[job.pipeline.source_type] table_name = f"{job.pipeline.prefix or ''}{job.pipeline.source_type}_{_schema_name}".lower() normalized_schema_name = NamingConvention().normalize_identifier(_schema_name) diff --git a/posthog/warehouse/models/external_data_job.py b/posthog/warehouse/models/external_data_job.py index e9504686d560d..0b40c41f11069 100644 --- a/posthog/warehouse/models/external_data_job.py +++ b/posthog/warehouse/models/external_data_job.py @@ -33,7 +33,10 @@ class Status(models.TextChoices): @property def folder_path(self) -> str: - return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema_id)}".lower().replace("-", "_") + if self.schema and self.schema.is_incremental: + return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.schema.pk)}".lower().replace("-", "_") + + return f"team_{self.team_id}_{self.pipeline.source_type}_{str(self.pk)}".lower().replace("-", "_") def url_pattern_by_schema(self, schema: str) -> str: if TEST: diff --git a/posthog/warehouse/models/external_table_definitions.py b/posthog/warehouse/models/external_table_definitions.py index 5f69620b9674c..d0e4c57e35c89 100644 --- a/posthog/warehouse/models/external_table_definitions.py +++ b/posthog/warehouse/models/external_table_definitions.py @@ -632,7 +632,6 @@ def get_dlt_mapping_for_external_table(table): return { field.name: { - "name": field.name, "data_type": HOGQL_FIELD_DLT_TYPE_MAP[type(field)], "nullable": True, }