diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index 7eba90021494..968d32d5e9ce 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.4.19 + dockerImageTag: 3.4.20 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java index 39fcd4e4085a..9ee5fa7cc411 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/xmin/XminCtidUtils.java @@ -55,29 +55,26 @@ public static StreamsCategorised categoriseStreams(final StateManag rawStateMessages.forEach(stateMessage -> { final JsonNode streamState = stateMessage.getStream().getStreamState(); final StreamDescriptor streamDescriptor = stateMessage.getStream().getStreamDescriptor(); - if (streamState == null || streamDescriptor == null) { + if (streamState == null || streamDescriptor == null || !streamState.has(STATE_TYPE_KEY)) { return; } - if (streamState.has(STATE_TYPE_KEY)) { - if (streamState.get(STATE_TYPE_KEY).asText().equalsIgnoreCase("ctid")) { - statesFromCtidSync.add(stateMessage); - streamsStillInCtidSync.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace())); - } else if (streamState.get(STATE_TYPE_KEY).asText().equalsIgnoreCase("xmin")) { - if (shouldPerformFullSync(currentXminStatus, streamState)) { - final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), - streamDescriptor.getNamespace()); - LOGGER.info("Detected multiple wraparounds. Will perform a full sync for {}", pair); - streamsStillInCtidSync.add(pair); - } else { - statesFromXminSync.add(stateMessage); - } + if (streamState.get(STATE_TYPE_KEY).asText().equalsIgnoreCase("ctid")) { + statesFromCtidSync.add(stateMessage); + streamsStillInCtidSync.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace())); + } else if (streamState.get(STATE_TYPE_KEY).asText().equalsIgnoreCase("xmin")) { + if (shouldPerformFullSync(currentXminStatus, streamState)) { + final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), + streamDescriptor.getNamespace()); + LOGGER.info("Detected multiple wraparounds. Will perform a full sync for {}", pair); + streamsStillInCtidSync.add(pair); } else { - throw new ConfigErrorException("You've changed replication modes - please reset the streams in this connector"); + statesFromXminSync.add(stateMessage); } } else { - throw new RuntimeException("State type not present"); + throw new ConfigErrorException("You've changed replication modes - please reset the streams in this connector"); } + alreadySeenStreams.add(new AirbyteStreamNameNamespacePair(streamDescriptor.getName(), streamDescriptor.getNamespace())); }); } diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 5776405b3bd5..d44579bf57fa 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -310,9 +310,10 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp Expand to review | Version | Date | Pull Request | Subject | -|---------| ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| 3.4.19 | 2024-06-23 | [40223](https://github.com/airbytehq/airbyte/pull/40223) | Revert the changes introduced in version 3.4.15. | -| 3.4.18 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. | +| ------- | ---------- | -------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 3.4.20 | 2024-06-23 | [40559](https://github.com/airbytehq/airbyte/pull/40559) | Remove strict check for stream states of unknown types | +| 3.4.19 | 2024-06-23 | [40223](https://github.com/airbytehq/airbyte/pull/40223) | Revert the changes introduced in version 3.4.15. | +| 3.4.18 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. | | 3.4.17 | 2024-06-13 | [39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version | | 3.4.16 | 2024-05-29 | [39474](https://github.com/airbytehq/airbyte/pull/39474) | Adopt latest CDK. | | 3.4.15 | 2024-05-29 | [38773](https://github.com/airbytehq/airbyte/pull/38773) | Connect with adaptiveFetch=true. |