From b05e985a00faf653e8a619e0281c0f64b0aad0aa Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Tue, 19 Dec 2023 08:18:50 -0800 Subject: [PATCH] [Source-postgres] : Advance Postgres LSN for PG 14 & below (#33605) --- .../connectors/source-postgres/metadata.yaml | 2 +- .../source/postgres/PostgresUtils.java | 20 +++++++++++++++++++ .../cdc/PostgresCdcCtidInitializer.java | 4 ++++ docs/integrations/sources/postgres.md | 1 + 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index 0b3d1bd43d4be..49afab9e881e8 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.2.26 + dockerImageTag: 3.2.27 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java index 707c72a2d127e..bfd4903cef9f5 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresUtils.java @@ -23,6 +23,7 @@ import static io.airbyte.integrations.source.postgres.PostgresType.VARCHAR; import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; import java.time.Duration; import java.util.List; @@ -50,6 +51,14 @@ public class PostgresUtils { private static final int MIN_QUEUE_SIZE = 1000; private static final int MAX_QUEUE_SIZE = 10000; + private static final String DROP_AGGREGATE_IF_EXISTS_STATEMENT = "DROP aggregate IF EXISTS EPHEMERAL_HEARTBEAT(float4)"; + private static final String CREATE_AGGREGATE_STATEMENT = "CREATE AGGREGATE EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4)"; + private static final String DROP_AGGREGATE_STATEMENT = "DROP aggregate EPHEMERAL_HEARTBEAT(float4)"; + private static final List EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS = + List.of(DROP_AGGREGATE_IF_EXISTS_STATEMENT, CREATE_AGGREGATE_STATEMENT, DROP_AGGREGATE_STATEMENT); + + private static final int POSTGRESQL_VERSION_15 = 15; + public static String getPluginValue(final JsonNode field) { return field.has("plugin") ? field.get("plugin").asText() : PGOUTPUT_PLUGIN; } @@ -186,4 +195,15 @@ public static String prettyPrintConfiguredAirbyteStreamList(final List "%s.%s".formatted(s.getStream().getNamespace(), s.getStream().getName())).collect(Collectors.joining(", ")); } + public static void advanceLsn(final JdbcDatabase database) { + try { + if (database.getMetaData().getDatabaseMajorVersion() < POSTGRESQL_VERSION_15) { + database.executeWithinTransaction(EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS); + LOGGER.info("Succesfully forced LSN advancement by creating & dropping an ephemeral heartbeat aggregate"); + } + } catch (final Exception e) { + LOGGER.info("Failed to force LSN advancement by creating & dropping an ephemeral heartbeat aggregate."); + } + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java index da7ed8c7bd96b..4077af63a8d91 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java @@ -172,6 +172,10 @@ public static List> cdcCtidIteratorsCombin // Gets the target position. final var targetPosition = PostgresCdcTargetPosition.targetPosition(database); + // Attempt to advance LSN past the target position. For versions of Postgres before PG15, this + // ensures that there is an event that debezium will + // receive that is after the target LSN. + PostgresUtils.advanceLsn(database); final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler<>(sourceConfig, targetPosition, false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize); final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager); diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index e94d760b94588..7987857c6dda3 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -291,6 +291,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.2.27 | 2023-12-18 | [33605](https://github.com/airbytehq/airbyte/pull/33605) | Advance Postgres LSN for PG 14 & below. | | 3.2.26 | 2023-12-11 | [33027](https://github.com/airbytehq/airbyte/pull/32961) | Support for better debugging tools. | | 3.2.25 | 2023-11-29 | [32961](https://github.com/airbytehq/airbyte/pull/32961) | Bump debezium wait time default to 20 min. | | 3.2.24 | 2023-11-28 | [32686](https://github.com/airbytehq/airbyte/pull/32686) | Better logging to understand dbz closing reason attribution. |