Skip to content

Commit

Permalink
mysql-source: remove LEGACY state flag (#33436)
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 authored Dec 19, 2023
1 parent c220cfe commit 5b915c6
Show file tree
Hide file tree
Showing 17 changed files with 41 additions and 120 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.7.1'
cdkVersionRequired = '0.7.7'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.2.4
dockerImageTag: 3.3.0
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public Collection<AutoCloseableIterator<AirbyteMessage>> readStreams(final JsonN
final AirbyteStateType supportedStateType = getSupportedStateType(config);
final StateManager stateManager =
StateManagerFactory.createStateManager(supportedStateType,
StateGeneratorUtils.deserializeInitialState(state, featureFlags.useStreamCapableState(), supportedStateType), catalog);
StateGeneratorUtils.deserializeInitialState(state, supportedStateType), catalog);
final Instant emittedAt = Instant.now();

final JdbcDatabase database = createDatabase(config);
Expand Down Expand Up @@ -380,10 +380,6 @@ private static boolean isCdc(final JsonNode config) {

@Override
protected AirbyteStateType getSupportedStateType(final JsonNode config) {
if (!featureFlags.useStreamCapableState()) {
return AirbyteStateType.LEGACY;
}

return isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
}

Expand Down Expand Up @@ -529,13 +525,15 @@ protected static String toSslJdbcParamInternal(final SslMode sslMode) {
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
// return super.createDatabase(sourceConfig, this::getConnectionProperties);
final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig);
final Map<String, String> connectionProperties = this.getConnectionProperties(sourceConfig);
// Create the data source
final DataSource dataSource = DataSourceFactory.create(
jdbcConfig.has(JdbcUtils.USERNAME_KEY) ? jdbcConfig.get(JdbcUtils.USERNAME_KEY).asText() : null,
jdbcConfig.has(JdbcUtils.PASSWORD_KEY) ? jdbcConfig.get(JdbcUtils.PASSWORD_KEY).asText() : null,
driverClass,
driverClassName,
jdbcConfig.get(JdbcUtils.JDBC_URL_KEY).asText(),
this.getConnectionProperties(sourceConfig));
connectionProperties,
getConnectionTimeout(connectionProperties, driverClassName));
// Record the data source so that it can be closed.
dataSources.add(dataSource);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.Field;
Expand All @@ -30,11 +28,6 @@ public abstract class AbstractSshMySqlSourceAcceptanceTest extends SourceAccepta
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

private JsonNode config;

public abstract Path getConfigFilePath();
Expand Down Expand Up @@ -90,9 +83,4 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import com.google.common.collect.Iterables;
import io.airbyte.cdk.db.Database;
import io.airbyte.cdk.integrations.standardtest.source.TestDataHolder;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase.BaseImage;
Expand All @@ -24,11 +22,6 @@ public class CdcBinlogsMySqlSourceDatatypeTest extends AbstractMySqlSourceDataty

private JsonNode stateAfterFirstSync;

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected JsonNode getConfig() {
return testdb.integrationTestConfigBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.Database;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase.BaseImage;

public class CdcInitialSnapshotMySqlSourceDatatypeTest extends AbstractMySqlSourceDatatypeTest {

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected JsonNode getConfig() {
return testdb.integrationTestConfigBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase.BaseImage;
Expand Down Expand Up @@ -44,11 +42,6 @@ public class CdcMySqlSourceAcceptanceTest extends SourceAcceptanceTest {

protected MySQLTestDatabase testdb;

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected String getImageName() {
return "airbyte/source-mysql:dev";
Expand Down Expand Up @@ -139,7 +132,7 @@ public void testIncrementalSyncShouldNotFailIfBinlogIsDeleted() throws Exception

// when we run incremental sync again there should be no new records. Run a sync with the latest
// state message and assert no records were emitted.
final JsonNode latestState = Jsons.jsonNode(supportsPerStream() ? stateMessages : List.of(Iterables.getLast(stateMessages)));
final JsonNode latestState = Jsons.jsonNode(List.of(Iterables.getLast(stateMessages)));
// RESET MASTER removes all binary log files that are listed in the index file,
// leaving only a single, empty binary log file with a numeric suffix of .000001
testdb.with("RESET MASTER;");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import com.mysql.cj.MysqlType;
import io.airbyte.cdk.db.Database;
import io.airbyte.cdk.integrations.standardtest.source.TestDataHolder;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.integrations.source.mysql.MySQLContainerFactory;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase;
import io.airbyte.protocol.models.JsonSchemaType;
Expand All @@ -20,11 +18,6 @@

public class MySqlDatatypeAccuracyTest extends AbstractMySqlSourceDatatypeTest {

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected JsonNode getConfig() {
return testdb.integrationTestConfigBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import io.airbyte.cdk.integrations.base.ssh.SshHelpers;
import io.airbyte.cdk.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase.BaseImage;
Expand Down Expand Up @@ -42,11 +40,6 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
.with("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
}

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

protected ContainerModifier[] getContainerModifiers() {
return ArrayUtils.toArray();
}
Expand Down Expand Up @@ -102,9 +95,4 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Override
protected boolean supportsPerStream() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,11 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.Database;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.integrations.source.mysql.MySQLContainerFactory;
import io.airbyte.integrations.source.mysql.MySQLTestDatabase;

public class MySqlSourceDatatypeTest extends AbstractMySqlSourceDatatypeTest {

@Override
protected FeatureFlags featureFlags() {
return FeatureFlagsWrapper.overridingUseStreamCapableState(super.featureFlags(), true);
}

@Override
protected JsonNode getConfig() {
return testdb.integrationTestConfigBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.cdk.db.factory.DSLContextFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.JdbcConnector;
import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.cdk.integrations.standardtest.source.performancetest.AbstractSourceFillDbWithTestData;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -60,7 +61,8 @@ protected Database setupDatabase(final String dbName) throws Exception {
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText()),
SQLDialect.MYSQL,
Map.of("zeroDateTimeBehavior", "convertToNull")));
Map.of("zeroDateTimeBehavior", "convertToNull"),
JdbcConnector.CONNECT_TIMEOUT_DEFAULT));

// It disable strict mode in the DB and allows to insert specific values.
// For example, it's possible to insert date with zero values "2021-00-00"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.cdk.db.factory.DSLContextFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.JdbcConnector;
import io.airbyte.cdk.integrations.standardtest.source.performancetest.AbstractSourcePerformanceTest;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -51,7 +52,8 @@ protected void setupDatabase(final String dbName) throws Exception {
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText()),
SQLDialect.MYSQL,
Map.of("zeroDateTimeBehavior", "convertToNull"))) {
Map.of("zeroDateTimeBehavior", "convertToNull"),
JdbcConnector.CONNECT_TIMEOUT_DEFAULT)) {

final Database database = new Database(dslContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import io.airbyte.cdk.integrations.debezium.CdcSourceTest;
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
import io.airbyte.cdk.integrations.debezium.internals.mysql.MySqlCdcTargetPosition;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -84,9 +82,7 @@ protected MySQLTestDatabase createTestDatabase() {

@Override
protected MySqlSource source() {
final var source = new MySqlSource();
source.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
return source;
return new MySqlSource();
}

@Override
Expand Down Expand Up @@ -357,7 +353,7 @@ protected void assertStateMessagesForNewTableSnapshotTest(final List<AirbyteStat
assertEquals(7, stateMessages.size());
for (int i = 0; i <= 4; i++) {
final AirbyteStateMessage stateMessage = stateMessages.get(i);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessage.getType());
assertEquals(AirbyteStateType.GLOBAL, stateMessage.getType());
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessage.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSnapshotState = stateMessage.getGlobal().getStreamStates()
Expand All @@ -382,7 +378,7 @@ protected void assertStateMessagesForNewTableSnapshotTest(final List<AirbyteStat
}

final AirbyteStateMessage secondLastSateMessage = stateMessages.get(5);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, secondLastSateMessage.getType());
assertEquals(AirbyteStateType.GLOBAL, secondLastSateMessage.getType());
assertEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
secondLastSateMessage.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSnapshotState = secondLastSateMessage.getGlobal().getStreamStates()
Expand All @@ -399,7 +395,7 @@ protected void assertStateMessagesForNewTableSnapshotTest(final List<AirbyteStat
});

final AirbyteStateMessage stateMessageEmittedAfterSecondSyncCompletion = stateMessages.get(6);
assertEquals(AirbyteStateMessage.AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType());
assertEquals(AirbyteStateType.GLOBAL, stateMessageEmittedAfterSecondSyncCompletion.getType());
assertNotEquals(stateMessageEmittedAfterFirstSyncCompletion.getGlobal().getSharedState(),
stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getSharedState());
final Set<StreamDescriptor> streamsInSyncCompletionState = stateMessageEmittedAfterSecondSyncCompletion.getGlobal().getStreamStates()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
package io.airbyte.integrations.source.mysql;

import io.airbyte.cdk.integrations.debug.DebugUtil;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlagsWrapper;

public class MySqlDebugger {

@SuppressWarnings({"unchecked", "deprecation", "resource"})
public static void main(final String[] args) throws Exception {
final MySqlSource mysqlSource = new MySqlSource();
mysqlSource.setFeatureFlags(FeatureFlagsWrapper.overridingUseStreamCapableState(new EnvVariableFeatureFlags(), true));
DebugUtil.debug(mysqlSource);
}

Expand Down
Loading

0 comments on commit 5b915c6

Please sign in to comment.