Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Source-postgres] : Advance Postgres LSN for PG 14 & below #33605

Merged
merged 6 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.2.26
dockerImageTag: 3.2.27
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static io.airbyte.integrations.source.postgres.PostgresType.VARCHAR;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -50,6 +51,14 @@ public class PostgresUtils {
private static final int MIN_QUEUE_SIZE = 1000;
private static final int MAX_QUEUE_SIZE = 10000;

private static final String DROP_AGGREGATE_IF_EXISTS_STATEMENT = "DROP aggregate IF EXISTS EPHEMERAL_HEARTBEAT(float4)";
private static final String CREATE_AGGREGATE_STATEMENT = "CREATE AGGREGATE EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4)";
private static final String DROP_AGGREGATE_STATEMENT = "DROP aggregate EPHEMERAL_HEARTBEAT(float4)";
private static final List<String> EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS =
List.of(DROP_AGGREGATE_IF_EXISTS_STATEMENT, CREATE_AGGREGATE_STATEMENT, DROP_AGGREGATE_STATEMENT);

private static final int POSTGRESQL_VERSION_15 = 15;

public static String getPluginValue(final JsonNode field) {
return field.has("plugin") ? field.get("plugin").asText() : PGOUTPUT_PLUGIN;
}
Expand Down Expand Up @@ -186,4 +195,15 @@ public static String prettyPrintConfiguredAirbyteStreamList(final List<Configure
return streamList.stream().map(s -> "%s.%s".formatted(s.getStream().getNamespace(), s.getStream().getName())).collect(Collectors.joining(", "));
}

public static void advanceLsn(final JdbcDatabase database) {
try {
if (database.getMetaData().getDatabaseMajorVersion() < POSTGRESQL_VERSION_15) {
database.executeWithinTransaction(EPHEMERAL_HEARTBEAT_CREATE_STATEMENTS);
LOGGER.info("Succesfully forced LSN advancement by creating & dropping an ephemeral heartbeat aggregate");
}
} catch (final Exception e) {
LOGGER.info("Failed to force LSN advancement by creating & dropping an ephemeral heartbeat aggregate.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin

// Gets the target position.
final var targetPosition = PostgresCdcTargetPosition.targetPosition(database);
// Attempt to advance LSN past the target position. For versions of Postgres before PG15, this
// ensures that there is an event that debezium will
// receive that is after the target LSN.
PostgresUtils.advanceLsn(database);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make that only execute on postgres <15 ? That allows us to request less access right post 15, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't have the correct access, we'll just fail in the advanceLsn method and log that scenario. Success/failure of this advancement should not block the sync from proceeding.

For PG14, the query that I've added in this PR requires no additional permissions. For PG15+, this same query will just fail because of permissions issues and move on.

Phase 2 hopes to address this with additional steps for the user to setup a heartbeat table

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change will make things better for 14 and below.
But for 15 and above even if we can make this query, it will not be effective in bumping the LSN.
So maybe it makes sense to avoid a constant error or questions from customers about it.
We already have code doing "legacy" ctid for postgres <14.
You can utilize that function to do things <15

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See CtidUtils.isTidRangeScanCapableDBServer()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this there is no need to ever wait for a timeout as we know the WAL got past target lsn.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true - however I did want to protect against cases where this query fails for whatever reason

final AirbyteDebeziumHandler<Long> handler = new AirbyteDebeziumHandler<>(sourceConfig,
targetPosition, false, firstRecordWaitTime, subsequentRecordWaitTime, queueSize);
final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager);
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.2.27 | 2023-12-18 | [33605](https://github.com/airbytehq/airbyte/pull/33605) | Advance Postgres LSN for PG 14 & below. |
| 3.2.26 | 2023-12-11 | [33027](https://github.com/airbytehq/airbyte/pull/32961) | Support for better debugging tools. |
| 3.2.25 | 2023-11-29 | [32961](https://github.com/airbytehq/airbyte/pull/32961) | Bump debezium wait time default to 20 min. |
| 3.2.24 | 2023-11-28 | [32686](https://github.com/airbytehq/airbyte/pull/32686) | Better logging to understand dbz closing reason attribution. |
Expand Down
Loading