From e16dff4b3c3b14bdce7acd9945d8bd1e2570624a Mon Sep 17 00:00:00 2001 From: ZhangCheng Date: Tue, 16 Aug 2022 09:14:50 +0800 Subject: [PATCH] =?UTF-8?q?Support=20distributed=20transactions=20across?= =?UTF-8?q?=20multiple=20logical=20database(#19=E2=80=A6=20(#20114)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Support distributed transactions across multiple logical database(#19894) * Fix test case * Generate data source name * Add test case * Remove final * JDBC does not support operations across multiple logical databases in transaction * Fix equals usage --- .../statement/ShardingSphereStatement.java | 21 +++++++++++++++++++ .../transaction/ConnectionTransaction.java | 5 ++++- .../transaction/core/ResourceDataSource.java | 5 ++++- .../transaction/rule/TransactionRule.java | 5 ++++- .../spi/ShardingSphereTransactionManager.java | 3 ++- .../core/ResourceDataSourceTest.java | 16 +++++++++++--- ...ardingSphereTransactionManagerFixture.java | 2 +- ...ardingSphereTransactionManagerFixture.java | 2 +- ...ataATShardingSphereTransactionManager.java | 4 ++-- ...TShardingSphereTransactionManagerTest.java | 8 ++++--- .../XAShardingSphereTransactionManager.java | 4 ++-- ...AShardingSphereTransactionManagerTest.java | 18 ++++++++-------- .../datasource/JDBCBackendDataSource.java | 2 +- .../backend/session/ConnectionSession.java | 4 ---- .../session/ConnectionSessionTest.java | 4 ++-- 15 files changed, 71 insertions(+), 32 deletions(-) diff --git a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java index e513f29f18aae..bfbf283526f8f 100644 --- a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java +++ b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSphereStatement.java @@ -37,6 +37,7 @@ import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext; import org.apache.shardingsphere.infra.binder.statement.dml.InsertStatementContext; import org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext; +import org.apache.shardingsphere.infra.binder.type.TableAvailable; import org.apache.shardingsphere.infra.config.props.ConfigurationProperties; import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey; import org.apache.shardingsphere.infra.context.kernel.KernelProcessor; @@ -78,6 +79,7 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.dal.DALStatement; import org.apache.shardingsphere.traffic.engine.TrafficEngine; import org.apache.shardingsphere.traffic.rule.TrafficRule; +import org.apache.shardingsphere.transaction.TransactionHolder; import java.sql.Connection; import java.sql.ResultSet; @@ -156,6 +158,7 @@ public ResultSet executeQuery(final String sql) throws SQLException { ResultSet result; try { LogicSQL logicSQL = createLogicSQL(sql); + checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(), connection.getDatabaseName()); trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, logicSQL); @@ -235,6 +238,7 @@ private DriverExecutionPrepareEngine createDriver public int executeUpdate(final String sql) throws SQLException { try { LogicSQL logicSQL = createLogicSQL(sql); + checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(), connection.getDatabaseName()); trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, logicSQL); @@ -263,6 +267,7 @@ public int executeUpdate(final String sql, final int autoGeneratedKeys) throws S } try { LogicSQL logicSQL = createLogicSQL(sql); + checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(), connection.getDatabaseName()); trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, logicSQL); @@ -289,6 +294,7 @@ public int executeUpdate(final String sql, final int[] columnIndexes) throws SQL returnGeneratedKeys = true; try { LogicSQL logicSQL = createLogicSQL(sql); + checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(), connection.getDatabaseName()); trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, logicSQL); @@ -315,6 +321,7 @@ public int executeUpdate(final String sql, final String[] columnNames) throws SQ returnGeneratedKeys = true; try { LogicSQL logicSQL = createLogicSQL(sql); + checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(), connection.getDatabaseName()); trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, logicSQL); @@ -431,6 +438,7 @@ protected Optional getSaneResult(final SQLStatement sqlStatement, final private boolean execute0(final String sql, final ExecuteCallback callback) throws SQLException { try { LogicSQL logicSQL = createLogicSQL(sql); + checkSameDatabaseNameInTransaction(logicSQL.getSqlStatementContext(), connection.getDatabaseName()); trafficInstanceId = getInstanceIdAndSet(logicSQL).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, logicSQL); @@ -455,6 +463,19 @@ private boolean execute0(final String sql, final ExecuteCallback callback) throw } } + private void checkSameDatabaseNameInTransaction(final SQLStatementContext sqlStatementContext, final String connectionDatabaseName) { + if (!TransactionHolder.isTransaction()) { + return; + } + if (sqlStatementContext instanceof TableAvailable) { + ((TableAvailable) sqlStatementContext).getTablesContext().getDatabaseName().ifPresent(databaseName -> { + if (!databaseName.equals(connectionDatabaseName)) { + throw new ShardingSphereException("JDBC does not support operations across multiple logical databases in transaction."); + } + }); + } + } + private JDBCExecutionUnit createTrafficExecutionUnit(final String trafficInstanceId, final LogicSQL logicSQL) throws SQLException { DriverExecutionPrepareEngine prepareEngine = createDriverExecutionPrepareEngine(); ExecutionUnit executionUnit = new ExecutionUnit(trafficInstanceId, new SQLUnit(logicSQL.getSql(), logicSQL.getParameters())); diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java index a7bb4d30a0504..5d8fc65377181 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/ConnectionTransaction.java @@ -35,6 +35,8 @@ public final class ConnectionTransaction { private final TransactionType transactionType; + private final String databaseName; + @Setter @Getter private volatile boolean rollbackOnly; @@ -46,6 +48,7 @@ public ConnectionTransaction(final String databaseName, final TransactionRule ru } public ConnectionTransaction(final String databaseName, final TransactionType transactionType, final TransactionRule rule) { + this.databaseName = databaseName; this.transactionType = transactionType; transactionManager = rule.getResource().getTransactionManager(transactionType); TransactionTypeHolder.set(transactionType); @@ -87,7 +90,7 @@ public boolean isHoldTransaction(final boolean autoCommit) { * @throws SQLException SQL exception */ public Optional getConnection(final String dataSourceName) throws SQLException { - return isInTransaction() ? Optional.of(transactionManager.getConnection(dataSourceName)) : Optional.empty(); + return isInTransaction() ? Optional.of(transactionManager.getConnection(this.databaseName, dataSourceName)) : Optional.empty(); } /** diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/ResourceDataSource.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/ResourceDataSource.java index 886290b21755f..d85fb9f7a8a66 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/ResourceDataSource.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/core/ResourceDataSource.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.transaction.core; +import com.google.common.base.Preconditions; import lombok.Getter; import javax.sql.DataSource; @@ -34,8 +35,10 @@ public final class ResourceDataSource { private final DataSource dataSource; public ResourceDataSource(final String originalName, final DataSource dataSource) { + String[] databaseAndDataSourceName = originalName.split("\\."); + Preconditions.checkState(2 == databaseAndDataSourceName.length, String.format("Database and data source name must be provided,`%s`.", originalName)); this.originalName = originalName; this.dataSource = dataSource; - uniqueResourceName = ResourceIDGenerator.getInstance().nextId() + originalName; + this.uniqueResourceName = ResourceIDGenerator.getInstance().nextId() + databaseAndDataSourceName[1]; } } diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java index 9749083bc2fcb..d1e542c51be75 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/rule/TransactionRule.java @@ -72,7 +72,10 @@ private synchronized ShardingSphereTransactionManagerEngine createTransactionMan Map dataSourceMap = new HashMap<>(databases.size()); Set databaseTypes = new HashSet<>(); for (Entry entry : databases.entrySet()) { - dataSourceMap.putAll(entry.getValue().getResource().getDataSources()); + ShardingSphereDatabase database = entry.getValue(); + database.getResource().getDataSources().forEach((key, value) -> { + dataSourceMap.put(database.getName() + "." + key, value); + }); if (null != entry.getValue().getResource().getDatabaseType()) { databaseTypes.add(entry.getValue().getResource().getDatabaseType()); } diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereTransactionManager.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereTransactionManager.java index e56bd238456e7..702a5208c0966 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereTransactionManager.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/main/java/org/apache/shardingsphere/transaction/spi/ShardingSphereTransactionManager.java @@ -56,11 +56,12 @@ public interface ShardingSphereTransactionManager extends AutoCloseable { /** * Get transactional connection. * + * @param databaseName database name * @param dataSourceName data source name * @return connection * @throws SQLException SQL exception */ - Connection getConnection(String dataSourceName) throws SQLException; + Connection getConnection(String databaseName, String dataSourceName) throws SQLException; /** * Begin transaction. diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/ResourceDataSourceTest.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/ResourceDataSourceTest.java index eb88846db62bb..f376edd652e1f 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/ResourceDataSourceTest.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/ResourceDataSourceTest.java @@ -27,12 +27,22 @@ public final class ResourceDataSourceTest { + private static final String DATABASE_NAME = "sharding_db"; + + private static final String DATA_SOURCE_NAME = "fooDataSource"; + @Test public void assertNewInstance() { - ResourceDataSource actual = new ResourceDataSource("fooDataSource", new MockedDataSource()); - assertThat(actual.getOriginalName(), is("fooDataSource")); + String originalName = DATABASE_NAME + "." + DATA_SOURCE_NAME; + ResourceDataSource actual = new ResourceDataSource(originalName, new MockedDataSource()); + assertThat(actual.getOriginalName(), is(originalName)); assertThat(actual.getDataSource(), instanceOf(MockedDataSource.class)); assertTrue(actual.getUniqueResourceName().startsWith("resource")); - assertTrue(actual.getUniqueResourceName().endsWith("fooDataSource")); + assertTrue(actual.getUniqueResourceName().endsWith(DATA_SOURCE_NAME)); + } + + @Test(expected = IllegalStateException.class) + public void assertDataSourceNameOnlyFailure() { + new ResourceDataSource(DATA_SOURCE_NAME, new MockedDataSource()); } } diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/OtherShardingSphereTransactionManagerFixture.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/OtherShardingSphereTransactionManagerFixture.java index 807486266c5e5..effa0645d1531 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/OtherShardingSphereTransactionManagerFixture.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/OtherShardingSphereTransactionManagerFixture.java @@ -42,7 +42,7 @@ public boolean isInTransaction() { } @Override - public Connection getConnection(final String dataSourceName) { + public Connection getConnection(final String databaseName, final String dataSourceName) { return null; } diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/ShardingSphereTransactionManagerFixture.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/ShardingSphereTransactionManagerFixture.java index 1acabde96a42b..5f52e4b56912b 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/ShardingSphereTransactionManagerFixture.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-core/src/test/java/org/apache/shardingsphere/transaction/core/fixture/ShardingSphereTransactionManagerFixture.java @@ -49,7 +49,7 @@ public boolean isInTransaction() { } @Override - public Connection getConnection(final String dataSourceName) { + public Connection getConnection(final String databaseName, final String dataSourceName) { return null; } diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/main/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManager.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/main/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManager.java index 7e1d3710566f8..6b075db19053b 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/main/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManager.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/main/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManager.java @@ -90,9 +90,9 @@ public boolean isInTransaction() { } @Override - public Connection getConnection(final String dataSourceName) throws SQLException { + public Connection getConnection(final String databaseName, final String dataSourceName) throws SQLException { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); - return dataSourceMap.get(dataSourceName).getConnection(); + return dataSourceMap.get(databaseName + "." + dataSourceName).getConnection(); } @Override diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/test/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManagerTest.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/test/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManagerTest.java index 6fba061a6c56b..033b85c60f303 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/test/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManagerTest.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-base/shardingsphere-transaction-base-seata-at/src/test/java/org/apache/shardingsphere/transaction/base/seata/at/SeataATShardingSphereTransactionManagerTest.java @@ -61,6 +61,8 @@ public final class SeataATShardingSphereTransactionManagerTest { private static final MockSeataServer MOCK_SEATA_SERVER = new MockSeataServer(); + private static final String DATA_SOURCE_UNIQUE_NAME = "sharding_db.foo_ds"; + private final SeataATShardingSphereTransactionManager seataTransactionManager = new SeataATShardingSphereTransactionManager(); private final Queue requestQueue = MOCK_SEATA_SERVER.getMessageHandler().getRequestQueue(); @@ -84,7 +86,7 @@ public static void after() { @Before public void setUp() { - seataTransactionManager.init(DatabaseTypeFactory.getInstance("MySQL"), Collections.singletonList(new ResourceDataSource("foo_ds", new MockedDataSource())), "Seata"); + seataTransactionManager.init(DatabaseTypeFactory.getInstance("MySQL"), Collections.singletonList(new ResourceDataSource(DATA_SOURCE_UNIQUE_NAME, new MockedDataSource())), "Seata"); } @After @@ -102,13 +104,13 @@ public void tearDown() { public void assertInit() { Map actual = getDataSourceMap(); assertThat(actual.size(), is(1)); - assertThat(actual.get("foo_ds"), instanceOf(DataSourceProxy.class)); + assertThat(actual.get(DATA_SOURCE_UNIQUE_NAME), instanceOf(DataSourceProxy.class)); assertThat(seataTransactionManager.getTransactionType(), is(TransactionType.BASE)); } @Test public void assertGetConnection() throws SQLException { - Connection actual = seataTransactionManager.getConnection("foo_ds"); + Connection actual = seataTransactionManager.getConnection("sharding_db", "foo_ds"); assertThat(actual, instanceOf(ConnectionProxy.class)); } diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java index 2b196d25c5486..61153058a032f 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/main/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManager.java @@ -74,9 +74,9 @@ public boolean isInTransaction() { } @Override - public Connection getConnection(final String dataSourceName) throws SQLException { + public Connection getConnection(final String databaseName, final String dataSourceName) throws SQLException { try { - return cachedDataSources.get(dataSourceName).getConnection(); + return cachedDataSources.get(databaseName + "." + dataSourceName).getConnection(); } catch (final SystemException | RollbackException ex) { throw new SQLException(ex); } diff --git a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/test/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManagerTest.java b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/test/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManagerTest.java index 57a23e2b9c3e9..a0159a960b5eb 100644 --- a/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/test/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManagerTest.java +++ b/shardingsphere-kernel/shardingsphere-transaction/shardingsphere-transaction-type/shardingsphere-transaction-xa/shardingsphere-transaction-xa-core/src/test/java/org/apache/shardingsphere/transaction/xa/XAShardingSphereTransactionManagerTest.java @@ -82,9 +82,9 @@ public void assertIsInTransaction() { @Test public void assertGetConnection() throws SQLException { xaTransactionManager.begin(); - Connection actual1 = xaTransactionManager.getConnection("ds1"); - Connection actual2 = xaTransactionManager.getConnection("ds2"); - Connection actual3 = xaTransactionManager.getConnection("ds3"); + Connection actual1 = xaTransactionManager.getConnection("demo_ds_1", "ds1"); + Connection actual2 = xaTransactionManager.getConnection("demo_ds_2", "ds2"); + Connection actual3 = xaTransactionManager.getConnection("demo_ds_3", "ds3"); assertThat(actual1, instanceOf(Connection.class)); assertThat(actual2, instanceOf(Connection.class)); assertThat(actual3, instanceOf(Connection.class)); @@ -93,10 +93,10 @@ public void assertGetConnection() throws SQLException { @Test public void assertGetConnectionOfNestedTransaction() throws SQLException { - ThreadLocal> transactions = getEnlistedTransactions(getCachedDataSources().get("ds1")); + ThreadLocal> transactions = getEnlistedTransactions(getCachedDataSources().get("demo_ds_1.ds1")); xaTransactionManager.begin(); assertTrue(transactions.get().isEmpty()); - xaTransactionManager.getConnection("ds1"); + xaTransactionManager.getConnection("demo_ds_1", "ds1"); assertThat(transactions.get().size(), is(1)); executeNestedTransaction(transactions); assertThat(transactions.get().size(), is(1)); @@ -106,7 +106,7 @@ public void assertGetConnectionOfNestedTransaction() throws SQLException { private void executeNestedTransaction(final ThreadLocal> transactions) throws SQLException { xaTransactionManager.begin(); - xaTransactionManager.getConnection("ds1"); + xaTransactionManager.getConnection("demo_ds_1", "ds1"); assertThat(transactions.get().size(), is(2)); xaTransactionManager.commit(false); assertThat(transactions.get().size(), is(1)); @@ -153,9 +153,9 @@ private ThreadLocal> getEnlistedTransactions(final private Collection createResourceDataSources(final DatabaseType databaseType) { List result = new LinkedList<>(); - result.add(new ResourceDataSource("ds1", DataSourceUtils.build(HikariDataSource.class, databaseType, "demo_ds_1"))); - result.add(new ResourceDataSource("ds2", DataSourceUtils.build(HikariDataSource.class, databaseType, "demo_ds_2"))); - result.add(new ResourceDataSource("ds3", DataSourceUtils.build(AtomikosDataSourceBean.class, databaseType, "demo_ds_3"))); + result.add(new ResourceDataSource("demo_ds_1.ds1", DataSourceUtils.build(HikariDataSource.class, databaseType, "demo_ds_1"))); + result.add(new ResourceDataSource("demo_ds_2.ds2", DataSourceUtils.build(HikariDataSource.class, databaseType, "demo_ds_2"))); + result.add(new ResourceDataSource("demo_ds_3.ds3", DataSourceUtils.build(AtomikosDataSourceBean.class, databaseType, "demo_ds_3"))); return result; } } diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/datasource/JDBCBackendDataSource.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/datasource/JDBCBackendDataSource.java index ed631b4b91f34..ae329b3366f2c 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/datasource/JDBCBackendDataSource.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/datasource/JDBCBackendDataSource.java @@ -106,7 +106,7 @@ private List createConnections(final String databaseName, final Stri private Connection createConnection(final String databaseName, final String dataSourceName, final DataSource dataSource, final TransactionType transactionType) throws SQLException { TransactionRule transactionRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class); ShardingSphereTransactionManager transactionManager = transactionRule.getResource().getTransactionManager(transactionType); - Connection result = isInTransaction(transactionManager) ? transactionManager.getConnection(dataSourceName) : dataSource.getConnection(); + Connection result = isInTransaction(transactionManager) ? transactionManager.getConnection(databaseName, dataSourceName) : dataSource.getConnection(); if (dataSourceName.contains(".")) { String catalog = dataSourceName.split("\\.")[1]; result.setCatalog(catalog); diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java index f9cba6704fee0..a86edd93b1046 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSession.java @@ -23,7 +23,6 @@ import lombok.Setter; import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey; import org.apache.shardingsphere.infra.database.type.DatabaseType; -import org.apache.shardingsphere.infra.exception.ShardingSphereException; import org.apache.shardingsphere.infra.executor.sql.prepare.driver.ExecutorStatementManager; import org.apache.shardingsphere.infra.metadata.user.Grantee; import org.apache.shardingsphere.infra.session.ConnectionContext; @@ -104,9 +103,6 @@ public void setCurrentDatabase(final String databaseName) { if (null != databaseName && databaseName.equals(this.databaseName)) { return; } - if (transactionStatus.isInTransaction()) { - throw new ShardingSphereException("Failed to switch database, please terminate current transaction."); - } if (statementManager instanceof JDBCBackendStatement) { ((JDBCBackendStatement) statementManager).setDatabaseName(databaseName); } diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java index cb9739aa703dd..2dc57a7c3804f 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/session/ConnectionSessionTest.java @@ -78,8 +78,8 @@ public void assertFailedSwitchTransactionTypeWhileBegin() throws SQLException { connectionSession.getTransactionStatus().setTransactionType(TransactionType.XA); } - @Test(expected = ShardingSphereException.class) - public void assertFailedSwitchSchemaWhileBegin() throws SQLException { + @Test + public void assertSwitchSchemaWhileBegin() throws SQLException { connectionSession.setCurrentDatabase("db"); JDBCBackendTransactionManager transactionManager = new JDBCBackendTransactionManager(backendConnection); transactionManager.begin();