Skip to content

Commit

Permalink
[Source-postgres] : Advance Postgres LSN for PG 14 & below (airbytehq…
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored and jatinyadav-cc committed Feb 26, 2024
1 parent 3b1f50d commit d8cdbb2
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.2.26
dockerImageTag: 3.2.27
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.airbyte.integrations.source.postgres.PostgresType.VARCHAR;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -50,6 +51,14 @@ public class PostgresUtils {
private static final int MIN_QUEUE_SIZE = 1000;
private static final int MAX_QUEUE_SIZE = 10000;

private static final String DROP_AGGREGATE_IF_EXISTS_STATEMENT = "DROP aggregate IF EXISTS EPHEMERAL_HEARTBEAT(float4)";
private static final String CREATE_AGGREGATE_STATEMENT = "CREATE AGGREGATE EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4)";
private static final String DROP_AGGREGATE_STATEMENT = "DROP aggregate EPHEMERAL_HEARTBEAT(float4)";
private static final List<String> EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS =
List.of(DROP_AGGREGATE_IF_EXISTS_STATEMENT, CREATE_AGGREGATE_STATEMENT, DROP_AGGREGATE_STATEMENT);

private static final int POSTGRESQL_VERSION_15 = 15;

public static String getPluginValue(final JsonNode field) {
return field.has("plugin") ? field.get("plugin").asText() : PGOUTPUT_PLUGIN;
}
Expand Down Expand Up @@ -186,4 +195,15 @@ public static String prettyPrintConfiguredAirbyteStreamList(final List<Configure
return streamList.stream().map(s -> "%s.%s".formatted(s.getStream().getNamespace(), s.getStream().getName())).collect(Collectors.joining(", "));
}

public static void advanceLsn(final JdbcDatabase database) {
try {
if (database.getMetaData().getDatabaseMajorVersion() < POSTGRESQL_VERSION_15) {
database.executeWithinTransaction(EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS);
LOGGER.info("Succesfully forced LSN advancement by creating & dropping an ephemeral heartbeat aggregate");
}
} catch (final Exception e) {
LOGGER.info("Failed to force LSN advancement by creating & dropping an ephemeral heartbeat aggregate.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin

// Gets the target position.
final var targetPosition = PostgresCdcTargetPosition.targetPosition(database);
// Attempt to advance LSN past the target position. For versions of Postgres before PG15, this
// ensures that there is an event that debezium will
// receive that is after the target LSN.
PostgresUtils.advanceLsn(database);
final AirbyteDebeziumHandler<Long> handler = new AirbyteDebeziumHandler<>(sourceConfig,
targetPosition, false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize);
final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager);
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.2.27 | 2023-12-18 | [33605](https://github.com/airbytehq/airbyte/pull/33605) | Advance Postgres LSN for PG 14 & below. |
| 3.2.26 | 2023-12-11 | [33027](https://github.com/airbytehq/airbyte/pull/32961) | Support for better debugging tools. |
| 3.2.25 | 2023-11-29 | [32961](https://github.com/airbytehq/airbyte/pull/32961) | Bump debezium wait time default to 20 min. |
| 3.2.24 | 2023-11-28 | [32686](https://github.com/airbytehq/airbyte/pull/32686) | Better logging to understand dbz closing reason attribution. |
Expand Down

0 comments on commit d8cdbb2

Please sign in to comment.