Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-mssql/mysql/postgres] Fix and cleanup oc map #42024

Merged
merged 24 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3ab8fd7
fix and cleanup oc map
xiaohansong Jul 16, 2024
d466e11
fix npr on pkinfo or ocinfo
xiaohansong Jul 16, 2024
c060f7f
add postgres fix
xiaohansong Jul 17, 2024
94cbb9e
Merge remote-tracking branch 'origin/master' into xiaohan/ocinfo
xiaohansong Jul 18, 2024
4a79a3e
format
xiaohansong Jul 18, 2024
f3e9a65
pr comment address
xiaohansong Jul 19, 2024
89b29ac
format
xiaohansong Jul 19, 2024
adf9cab
cleanup
xiaohansong Jul 19, 2024
0ff5d62
✨Source Dremio: Make Connector Compatible with Builder (#38692)
btkcodedev Jul 19, 2024
d28eceb
feat(airbyte-cdk): add new Decoders: `JsonlDecoder` and `IterableDec…
artem1205 Jul 19, 2024
b3cbd43
🤖 minor bump Python CDK to version 3.9.0
artem1205 Jul 19, 2024
fb1b782
🤖 Cut version 3.9.0 of source-declarative-manifest
artem1205 Jul 19, 2024
3bb9187
live-test: update connnection-retriever (#42112)
alafanechere Jul 19, 2024
0854f26
🐛 Source Braintree: Adds pagination to braintree streams (#42096)
AGPapa Jul 19, 2024
dc5a9fa
Rename scope for `contact_lists` to an available one (#42113)
marcua Jul 19, 2024
f7454e0
Cleanup an unused route in the SME ingress config. (#42116)
bgroff Jul 19, 2024
7603af6
Toggle useLocalCdk = false (#42120)
akashkulk Jul 19, 2024
99c42e3
[source-google-search-console] - Migrate to CDK v1.8.0 (#42073)
pnilan Jul 19, 2024
793481a
[source-recharge] - Migrate to CDK v3.7.0 (#42076)
pnilan Jul 19, 2024
b1a4fdb
Merge remote-tracking branch 'origin/master' into xiaohan/ocinfo
xiaohansong Jul 19, 2024
80da00b
fix extra deletion
xiaohansong Jul 19, 2024
842171c
Merge remote-tracking branch 'origin/master' into xiaohan/ocinfo
xiaohansong Jul 22, 2024
915ae93
bump up cdk
xiaohansong Jul 22, 2024
ca7cafc
format and bump connector version
xiaohansong Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.42.3
version=0.42.4
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
Assertions.assertEquals((MODEL_RECORDS.size), recordsFromFirstBatch.size)
assertExpectedRecords(HashSet(MODEL_RECORDS), recordsFromFirstBatch, HashSet())
}

protected open fun validateStreamStateInResumableFullRefresh(streamStateToBeTested: JsonNode) {}

@Test
Expand Down Expand Up @@ -1402,6 +1403,18 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
Assertions.assertEquals(12, recordsFromFirstBatch.size)

stateAfterFirstBatch.map { state -> assertStateDoNotHaveDuplicateStreams(state) }

// Test for recovery - it should be able to resume using any previous state. Using the 3rd
// state to test.
val recoveryState = Jsons.jsonNode(listOf(stateAfterFirstBatch[2]))

val recoverySyncIterator =
source().read(config()!!, fullRefreshConfiguredCatalog, recoveryState)
val dataFromRecoverySync = AutoCloseableIterators.toListAndClose(recoverySyncIterator)
val recordsFromRecoverySync = extractRecordMessages(dataFromRecoverySync)
val stateAfterRecoverySync = extractStateMessages(dataFromRecoverySync)
Assertions.assertEquals(9, stateAfterRecoverySync.size)
Assertions.assertEquals(9, recordsFromRecoverySync.size)
}

protected open fun assertStateMessagesForNewTableSnapshotTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.42.3'
cdkVersionRequired = '0.42.4'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.1
dockerImageTag: 4.1.2
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ protected void initializeForStateManager(final JdbcDatabase database,
final MssqlCursorBasedStateManager cursorBasedStateManager = new MssqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
final InitialLoadStreams initialLoadStreams = streamsForInitialOrderedColumnLoad(cursorBasedStateManager, catalog);
initialLoadStateManager = new MssqlInitialLoadStreamStateManager(catalog, initialLoadStreams,
initPairToOrderedColumnInfoMap(database, initialLoadStreams, tableNameToTable, getQuoteString()));
initPairToOrderedColumnInfoMap(database, catalog, tableNameToTable, getQuoteString()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateManager {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlInitialLoadGlobalStateManager.class);
private final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo;
private StateManager stateManager;
private final CdcState initialCdcState;
// Only one global state is emitted, which is fanned out into many entries in the DB by platform. As
Expand Down Expand Up @@ -145,11 +144,6 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
.withGlobal(generateGlobalState(streamStates));
}

@Override
public OrderedColumnInfo getOrderedColumnInfo(final AirbyteStreamNameNamespacePair pair) {
return pairToOrderedColInfo.get(pair);
}

private DbStreamState getFinalState(final AirbyteStreamNameNamespacePair pair) {
Preconditions.checkNotNull(pair);
Preconditions.checkNotNull(pair.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public abstract class MssqlInitialLoadStateManager implements SourceStateMessage
public static final String STATE_TYPE_KEY = "state_type";
public static final String ORDERED_COL_STATE_TYPE = "ordered_column";
protected Map<AirbyteStreamNameNamespacePair, OrderedColumnLoadStatus> pairToOrderedColLoadStatus;
protected Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo;

private OrderedColumnLoadStatus ocStatus;

Expand Down Expand Up @@ -61,7 +62,9 @@ public OrderedColumnLoadStatus getOrderedColumnLoadStatus(final AirbyteStreamNam
* @param pair pair
* @return load status
*/
public abstract OrderedColumnInfo getOrderedColumnInfo(final AirbyteStreamNameNamespacePair pair);
public OrderedColumnInfo getOrderedColumnInfo(final AirbyteStreamNameNamespacePair pair) {
return pairToOrderedColInfo.get(pair);
}

static Map<AirbyteStreamNameNamespacePair, OrderedColumnLoadStatus> initPairToOrderedColumnLoadStatusMap(
final Map<io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair, OrderedColumnLoadStatus> pairToOcStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
public class MssqlInitialLoadStreamStateManager extends MssqlInitialLoadStateManager {

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlInitialLoadStateManager.class);
private final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo;

public MssqlInitialLoadStreamStateManager(final ConfiguredAirbyteCatalog catalog,
final InitialLoadStreams initialLoadStreams,
Expand Down Expand Up @@ -51,11 +50,6 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
.withStream(getAirbyteStreamState(pair, finalState));
}

@Override
public OrderedColumnInfo getOrderedColumnInfo(final AirbyteStreamNameNamespacePair pair) {
return pairToOrderedColInfo.get(pair);
}

@Override
public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) {
AirbyteStreamNameNamespacePair pair =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public static MssqlInitialLoadGlobalStateManager getMssqlInitialLoadGlobalStateM
final CdcState initialStateToBeUsed = getCdcState(database, catalog, stateManager, savedOffsetStillPresentOnServer);

return new MssqlInitialLoadGlobalStateManager(initialLoadStreams,
initPairToOrderedColumnInfoMap(database, initialLoadStreams, tableNameToTable, quoteString),
initPairToOrderedColumnInfoMap(database, catalog, tableNameToTable, quoteString),
stateManager, catalog, initialStateToBeUsed);
}

Expand Down Expand Up @@ -389,13 +389,13 @@ public static InitialLoadStreams cdcStreamsForInitialOrderedColumnLoad(final Cdc

public static Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> initPairToOrderedColumnInfoMap(
final JdbcDatabase database,
final InitialLoadStreams initialLoadStreams,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
final String quoteString) {
final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOcInfoMap = new HashMap<>();
// For every stream that is in initial ordered column sync, we want to maintain information about
// the current ordered column info associated with the stream
initialLoadStreams.streamsForInitialLoad.forEach(stream -> {
catalog.getStreams().forEach(stream -> {
final AirbyteStreamNameNamespacePair pair =
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
final Optional<OrderedColumnInfo> ocInfo = getOrderedColumnInfo(database, stream, tableNameToTable, quoteString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.42.3'
cdkVersionRequired = '0.42.4'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.6.2
dockerImageTag: 3.6.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ protected void initializeForStateManager(final JdbcDatabase database,
final InitialLoadStreams initialLoadStreams = streamsForInitialPrimaryKeyLoad(cursorBasedStateManager, catalog);
initialLoadStateManager =
new MySqlInitialLoadStreamStateManager(catalog, initialLoadStreams,
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, getQuoteString()));
initPairToPrimaryKeyInfoMap(database, catalog, tableNameToTable, getQuoteString()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.InitialLoadStreams;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.PrimaryKeyInfo;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteGlobalState;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
Expand Down Expand Up @@ -156,11 +155,6 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
.withGlobal(generateGlobalState(streamStates));
}

@Override
public PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyLoadStatus.get(pair);
}

@Override
public PrimaryKeyInfo getPrimaryKeyInfo(final AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyInfo.get(pair);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public void updatePrimaryKeyLoadState(final AirbyteStreamNameNamespacePair pair,

// Returns the previous state emitted, represented as a {@link PrimaryKeyLoadStatus} associated with
// the stream.
public abstract PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final AirbyteStreamNameNamespacePair pair);
public PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyLoadStatus.get(pair);
}

// Returns the current {@PrimaryKeyInfo}, associated with the stream. This includes the data type &
// the column name associated with the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.InitialLoadStreams;
import io.airbyte.integrations.source.mysql.initialsync.MySqlInitialReadUtil.PrimaryKeyInfo;
import io.airbyte.integrations.source.mysql.internal.models.PrimaryKeyLoadStatus;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
Expand Down Expand Up @@ -57,11 +56,6 @@ public PrimaryKeyInfo getPrimaryKeyInfo(final io.airbyte.protocol.models.Airbyte
return pairToPrimaryKeyInfo.get(pair);
}

@Override
public PrimaryKeyLoadStatus getPrimaryKeyLoadStatus(final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair) {
return pairToPrimaryKeyLoadStatus.get(pair);
}

@Override
public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirbyteStream stream) {
AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public static MySqlInitialLoadGlobalStateManager getMySqlInitialLoadGlobalStateM
cdcStreamsForInitialPrimaryKeyLoad(stateManager.getCdcStateManager(), catalog, savedOffsetStillPresentOnServer);

return new MySqlInitialLoadGlobalStateManager(initialLoadStreams,
initPairToPrimaryKeyInfoMap(database, initialLoadStreams, tableNameToTable, quoteString),
initPairToPrimaryKeyInfoMap(database, catalog, tableNameToTable, quoteString),
stateManager, catalog, savedOffsetStillPresentOnServer, getDefaultCdcState(database, catalog));
}

Expand Down Expand Up @@ -508,14 +508,14 @@ public static List<ConfiguredAirbyteStream> identifyStreamsForCursorBased(final
// currently undergoing initial primary key syncs.
public static Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyInfo> initPairToPrimaryKeyInfoMap(
final JdbcDatabase database,
final InitialLoadStreams initialLoadStreams,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<MysqlType>>> tableNameToTable,
final String quoteString) {
final Map<io.airbyte.protocol.models.AirbyteStreamNameNamespacePair, PrimaryKeyInfo> pairToPkInfoMap = new HashMap<>();
// For every stream that was in primary initial key sync, we want to maintain information about the
// current primary key info associated with the
// stream
initialLoadStreams.streamsForInitialLoad().forEach(stream -> {
catalog.getStreams().forEach(stream -> {
final io.airbyte.protocol.models.AirbyteStreamNameNamespacePair pair =
new io.airbyte.protocol.models.AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace());
final Optional<PrimaryKeyInfo> pkInfo = getPrimaryKeyInfo(database, stream, tableNameToTable, quoteString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.42.3'
cdkVersionRequired = '0.42.4'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.6.4
dockerImageTag: 3.6.5
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
Expand All @@ -30,12 +31,13 @@ public abstract class CtidStateManager implements SourceStateMessageProducer<Air
protected final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus;
protected Function<AirbyteStreamNameNamespacePair, JsonNode> streamStateForIncrementalRunSupplier;

protected String lastCtid;
protected Map<AirbyteStreamNameNamespacePair, String> pairToLastCtid;
protected FileNodeHandler fileNodeHandler;

protected CtidStateManager(final Map<AirbyteStreamNameNamespacePair, CtidStatus> pairToCtidStatus) {
this.pairToCtidStatus = pairToCtidStatus;
this.streamStateForIncrementalRunSupplier = namespacePair -> Jsons.emptyObject();
this.pairToLastCtid = new HashMap<>();
}

public CtidStatus getCtidStatus(final AirbyteStreamNameNamespacePair pair) {
Expand Down Expand Up @@ -81,6 +83,7 @@ public AirbyteStateMessage generateStateMessageAtCheckpoint(final ConfiguredAirb
protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespacePair pair) {
final Long fileNode = fileNodeHandler.getFileNode(pair);
assert fileNode != null;
final String lastCtid = pairToLastCtid.get(pair);
// If the table is empty, lastCtid will be set to zero for the final state message.
final String lastCtidInState = (Objects.nonNull(lastCtid)
&& StringUtils.isNotBlank(lastCtid)) ? lastCtid : Ctid.ZERO.toString();
Expand All @@ -98,7 +101,9 @@ protected CtidStatus generateCtidStatusForState(final AirbyteStreamNameNamespace
@Override
public AirbyteMessage processRecordMessage(final ConfiguredAirbyteStream stream, AirbyteMessageWithCtid message) {
if (Objects.nonNull(message.ctid())) {
this.lastCtid = message.ctid();
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());
pairToLastCtid.put(pair, message.ctid());
}
return message.recordMessage();
}
Expand All @@ -121,6 +126,9 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
*/
@Override
public boolean shouldEmitStateMessage(final ConfiguredAirbyteStream stream) {
final AirbyteStreamNameNamespacePair pair = new AirbyteStreamNameNamespacePair(stream.getStream().getName(),
stream.getStream().getNamespace());
final String lastCtid = pairToLastCtid.get(pair);
return Objects.nonNull(lastCtid)
&& StringUtils.isNotBlank(lastCtid);
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.2 | 2024-07-22 | [42024](https://github.com/airbytehq/airbyte/pull/42024) | Fix a NPE bug on resuming from a failed attempt. |
| 4.1.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. |
| 4.1.0 | 2024-07-17 | [42078](https://github.com/airbytehq/airbyte/pull/42078) | WASS analytics + bug fixes. |
| 4.0.36 | 2024-07-17 | [41648](https://github.com/airbytehq/airbyte/pull/41648) | Implement WASS. |
Expand Down
Loading
Loading