Skip to content

Commit

Permalink
Revert back to erroring out on a null cursor value (#21825)
Browse files Browse the repository at this point in the history
* revert back to erroring out on a null cursor value

* Update dockerfiles and release notes

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
rodireich and octavia-squidington-iii authored Jan 25, 2023
1 parent 053ea74 commit 6d0177a
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
- name: AlloyDB for PostgreSQL
sourceDefinitionId: 1fa90628-2b9e-11ed-a261-0242ac120002
dockerRepository: airbyte/source-alloydb
dockerImageTag: 1.0.35
dockerImageTag: 1.0.36
documentationUrl: https://docs.airbyte.com/integrations/sources/alloydb
icon: alloydb.svg
sourceType: database
Expand Down
16 changes: 15 additions & 1 deletion airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-alloydb:1.0.35"
- dockerImage: "airbyte/source-alloydb:1.0.36"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -508,6 +508,7 @@
- "method"
- "replication_slot"
- "publication"
additionalProperties: true
properties:
method:
type: "string"
Expand Down Expand Up @@ -552,6 +553,19 @@
order: 5
min: 120
max: 1200
lsn_commit_behaviour:
type: "string"
title: "LSN commit behaviour"
description: "Determines when Airbtye should flush the LSN of processed\
\ WAL logs in the source database. `After loading Data in the destination`\
\ is default. If `While reading Data` is selected, in case of a\
\ downstream failure (while loading data into the destination),\
\ next sync would result in a full sync."
enum:
- "While reading Data"
- "After loading Data in the destination"
default: "After loading Data in the destination"
order: 6
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.35
LABEL io.airbyte.version=1.0.36
LABEL io.airbyte.name=airbyte/source-alloydb-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-alloydb/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-alloydb

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.35
LABEL io.airbyte.version=1.0.36
LABEL io.airbyte.name=airbyte/source-alloydb
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.39
LABEL io.airbyte.version=1.0.40
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.39
LABEL io.airbyte.version=1.0.40
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.MountableFile;
Expand Down Expand Up @@ -554,7 +553,7 @@ private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
"test_table",
"public",
SCHEMA_NAME,
Field.of("id", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));
Expand All @@ -579,7 +578,6 @@ private JsonNode buildConfigEscapingNeeded() {
}

@Test
@Disabled("See https://github.com/airbytehq/airbyte/issues/17150#issuecomment-1342898439, enable once communication is out")
public void tableWithNullValueCursorShouldThrowException() throws SQLException {
try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
db.start();
Expand Down Expand Up @@ -613,15 +611,14 @@ private ConfiguredAirbyteStream createTableWithNullValueCursor(final Database da
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
"test_table_null_cursor",
"12345public",
SCHEMA_NAME,
Field.of("id", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));

}

@Test
@Disabled("See https://github.com/airbytehq/airbyte/issues/17150#issuecomment-1342898439, enable once communication is out")
public void viewWithNullValueCursorShouldThrowException() throws SQLException {
try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
db.start();
Expand Down Expand Up @@ -660,7 +657,7 @@ CREATE VIEW test_view_null_cursor(id) as
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
"test_view_null_cursor",
"public",
SCHEMA_NAME,
Field.of("id", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ private void validateCursorFieldForIncrementalTables(
final Database database)
throws SQLException {
final List<InvalidCursorInfo> tablesWithInvalidCursor = new ArrayList<>();
final List<InvalidCursorInfo> tablesWithInvalidCursorToWarnAbout = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
Expand Down Expand Up @@ -219,16 +218,12 @@ private void validateCursorFieldForIncrementalTables(
}

if (!verifyCursorColumnValues(database, stream.getNamespace(), stream.getName(), cursorField.get())) {
tablesWithInvalidCursorToWarnAbout.add(
tablesWithInvalidCursor.add(
new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(),
cursorType.toString(), "Cursor column contains NULL value"));
}
}

if (!tablesWithInvalidCursorToWarnAbout.isEmpty()) {
LOGGER.warn("source-postgres detected null cursor value " + InvalidCursorInfoUtil.getInvalidCursorConfigMessage(tablesWithInvalidCursor));
}

if (!tablesWithInvalidCursor.isEmpty()) {
throw new ConfigErrorException(
InvalidCursorInfoUtil.getInvalidCursorConfigMessage(tablesWithInvalidCursor));
Expand Down
17 changes: 9 additions & 8 deletions docs/integrations/sources/alloydb.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,12 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------|
| 1.0.35 | 2022-12-14 | [20436](https://github.com/airbytehq/airbyte/pull/20346) | Consolidate date/time values mapping for JDBC sources |
| 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions |
| 1.0.17 | 2022-10-31 | [18538](https://github.com/airbytehq/airbyte/pull/18538) | Encode database name |
| 1.0.16 | 2022-10-25 | [18256](https://github.com/airbytehq/airbyte/pull/18256) | Disable allow and prefer ssl modes in CDC mode |
| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode |
| 1.0.15 | 2022-10-11 | [17782](https://github.com/airbytehq/airbyte/pull/17782) | Align with Postgres source v.1.0.15 |
| 1.0.0 | 2022-09-15 | [16776](https://github.com/airbytehq/airbyte/pull/16776) | Align with strict-encrypt version |
| 0.1.0 | 2022-09-05 | [16323](https://github.com/airbytehq/airbyte/pull/16323) | Initial commit. Based on source-postgres v.1.0.7 |
| 1.0.36 | 2023-01-24 | [21825](https://github.com/airbytehq/airbyte/pull/21825) | Put back the original change that will cause an incremental sync to error if table contains a NULL value in cursor column.|
| 1.0.35 | 2022-12-14 | [20436](https://github.com/airbytehq/airbyte/pull/20346) | Consolidate date/time values mapping for JDBC sources |
| 1.0.34 | 2022-12-13 | [20378](https://github.com/airbytehq/airbyte/pull/20378) | Improve descriptions |
| 1.0.17 | 2022-10-31 | [18538](https://github.com/airbytehq/airbyte/pull/18538) | Encode database name |
| 1.0.16 | 2022-10-25 | [18256](https://github.com/airbytehq/airbyte/pull/18256) | Disable allow and prefer ssl modes in CDC mode |
| | 2022-10-13 | [15535](https://github.com/airbytehq/airbyte/pull/16238) | Update incremental query to avoid data missing when new data is inserted at the same time as a sync starts under non-CDC incremental mode |
| 1.0.15 | 2022-10-11 | [17782](https://github.com/airbytehq/airbyte/pull/17782) | Align with Postgres source v.1.0.15 |
| 1.0.0 | 2022-09-15 | [16776](https://github.com/airbytehq/airbyte/pull/16776) | Align with strict-encrypt version |
| 0.1.0 | 2022-09-05 | [16323](https://github.com/airbytehq/airbyte/pull/16323) | Initial commit. Based on source-postgres v.1.0.7 |
Loading

0 comments on commit 6d0177a

Please sign in to comment.