Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add | Add support in SQLServerBulkCopy to allow Pooled/XA Connection instances during object creation. #968

Merged
merged 10 commits into from
Mar 11, 2019
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ install:

before_script:
- docker pull mcr.microsoft.com/mssql/server:latest
- export SQLPWD=$(</dev/urandom tr -dc 'A-Za-z0-9#!' | head -c 15;)
- export SQLPWD=$(</dev/urandom tr -dc 'A-Za-z0-9' | head -c 15;)
- export mssql_jdbc_test_connection_properties="jdbc:sqlserver://localhost:1433;databaseName=master;username=sa;password=$SQLPWD"
- docker run -e 'ACCEPT_EULA=Y' -e "SA_PASSWORD=$SQLPWD" -p 1433:1433 -d mcr.microsoft.com/mssql/server:latest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.logging.Level;

import javax.sql.RowSet;
import javax.sql.XAConnection;

import microsoft.sql.DateTimeOffset;

Expand Down Expand Up @@ -270,13 +271,15 @@ class BulkColumnMetaData {
public SQLServerBulkCopy(Connection connection) throws SQLServerException {
loggerExternal.entering(loggerClassName, "SQLServerBulkCopy", connection);

if (null == connection || !(connection instanceof SQLServerConnection)) {
if (null == connection || !(connection instanceof ISQLServerConnection)) {
SQLServerException.makeFromDriverError(null, null,
SQLServerException.getErrString("R_invalidDestConnection"), null, false);
}

if (connection instanceof SQLServerConnection) {
this.connection = (SQLServerConnection) connection;
} else if (connection instanceof SQLServerConnectionPoolProxy) {
this.connection = ((SQLServerConnectionPoolProxy) connection).getWrappedConnection();
} else {
SQLServerException.makeFromDriverError(null, null,
SQLServerException.getErrString("R_invalidDestConnection"), null, false);
Expand Down Expand Up @@ -1962,9 +1965,9 @@ private void writeColumnToTdsWriter(TDSWriter tdsWriter, int bulkPrecision, int
bulkJdbcType = java.sql.Types.VARBINARY;
}
/*
* if source is encrypted and destination is unencrypted, use destination sql type to send since there is no
* way of finding if source is encrypted without accessing the resultset, send destination type if source
* resultset set is of type SQLServer and encryption is enabled
* if source is encrypted and destination is unencrypted, use destination sql type to send since there is no way
* of finding if source is encrypted without accessing the resultset, send destination type if source resultset
* set is of type SQLServer and encryption is enabled
*/
else if (null != sourceCryptoMeta) {
bulkJdbcType = destColumnMetadata.get(destColOrdinal).jdbcType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public String toString() {
bIsOpen = true;
}

SQLServerConnection getWrappedConnection() {
return wrappedConnection;
}

void checkClosed() throws SQLServerException {
if (!bIsOpen) {
SQLServerException.makeFromDriverError(null, null, SQLServerException.getErrString("R_connectionIsClosed"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ private void testBulkCopyResultSet(boolean setSelectMethod, Integer resultSetTyp
bcOperation.writeToServer(rs);
bcOperation.close();

ComparisonUtil.compareSrcTableAndDestTableIgnoreRowOrder(new DBConnection(connection), tableSrc,
tableDest);
ComparisonUtil.compareSrcTableAndDestTableIgnoreRowOrder(new DBConnection(connection), tableSrc, tableDest);
} finally {
terminateVariation();
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Stream;

Expand Down Expand Up @@ -153,7 +152,9 @@ public void execute() throws SQLException {
@DisplayName("BulkCopy:test null SQLServerBulkCopyOptions")
public void testEmptyBulkCopyOptions() {
BulkCopyTestWrapper bulkWrapper = new BulkCopyTestWrapper(connectionString);
bulkWrapper.setUsingConnection((0 == ThreadLocalRandom.current().nextInt(2)) ? true : false);
bulkWrapper.setUsingConnection((0 == random.nextInt(2)) ? true : false, ds);
bulkWrapper.setUsingXAConnection((0 == random.nextInt(2)) ? true : false, dsXA);
bulkWrapper.setUsingPooledConnection((0 == random.nextInt(2)) ? true : false, dsPool);
SQLServerBulkCopyOptions option = null;
bulkWrapper.useBulkCopyOptions(true);
bulkWrapper.setBulkOptions(option);
Expand All @@ -170,14 +171,32 @@ List<BulkCopyTestWrapper> createTestDatatestBulkCopyConstructor() {
List<BulkCopyTestWrapper> testData = new ArrayList<>();
BulkCopyTestWrapper bulkWrapper1 = new BulkCopyTestWrapper(connectionString);
bulkWrapper1.testName = testCaseName;
bulkWrapper1.setUsingConnection(true);
bulkWrapper1.setUsingConnection(true, ds);
bulkWrapper1.setUsingXAConnection(true, dsXA);
bulkWrapper1.setUsingPooledConnection(true, dsPool);
testData.add(bulkWrapper1);

BulkCopyTestWrapper bulkWrapper2 = new BulkCopyTestWrapper(connectionString);
bulkWrapper2.testName = testCaseName;
bulkWrapper2.setUsingConnection(false);
bulkWrapper2.setUsingConnection(true, ds);
bulkWrapper2.setUsingXAConnection(true, dsXA);
bulkWrapper2.setUsingPooledConnection(false, dsPool);
testData.add(bulkWrapper2);

BulkCopyTestWrapper bulkWrapper3 = new BulkCopyTestWrapper(connectionString);
bulkWrapper3.testName = testCaseName;
bulkWrapper3.setUsingConnection(false, ds);
bulkWrapper3.setUsingXAConnection(false, dsXA);
bulkWrapper3.setUsingPooledConnection(false, dsPool);
testData.add(bulkWrapper3);

BulkCopyTestWrapper bulkWrapper4 = new BulkCopyTestWrapper(connectionString);
bulkWrapper4.testName = testCaseName;
bulkWrapper4.setUsingConnection(true, ds);
bulkWrapper4.setUsingXAConnection(false, dsXA);
bulkWrapper4.setUsingPooledConnection(true, dsPool);
testData.add(bulkWrapper4);

return testData;
}

Expand All @@ -200,7 +219,9 @@ private List<BulkCopyTestWrapper> createTestDatatestBulkCopyOption() {

BulkCopyTestWrapper bulkWrapper = new BulkCopyTestWrapper(connectionString);
bulkWrapper.testName = testCaseName;
bulkWrapper.setUsingConnection((0 == ThreadLocalRandom.current().nextInt(2)) ? true : false);
bulkWrapper.setUsingConnection((0 == random.nextInt(2)) ? true : false, ds);
bulkWrapper.setUsingXAConnection((0 == random.nextInt(2)) ? true : false, dsXA);
bulkWrapper.setUsingPooledConnection((0 == random.nextInt(2)) ? true : false, dsPool);

SQLServerBulkCopyOptions option = new SQLServerBulkCopyOptions();
if (!(method.getName()).equalsIgnoreCase("setUseInternalTransaction")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -44,7 +43,9 @@ public void testISQLServerBulkRecord() throws SQLException {
BulkData Bdata = new BulkData(dstTable);

BulkCopyTestWrapper bulkWrapper = new BulkCopyTestWrapper(connectionString);
bulkWrapper.setUsingConnection((0 == ThreadLocalRandom.current().nextInt(2)) ? true : false);
bulkWrapper.setUsingConnection((0 == random.nextInt(2)) ? true : false, ds);
bulkWrapper.setUsingXAConnection((0 == random.nextInt(2)) ? true : false, dsXA);
bulkWrapper.setUsingPooledConnection((0 == random.nextInt(2)) ? true : false, dsPool);
BulkCopyTestUtil.performBulkCopy(bulkWrapper, Bdata, dstTable);
} finally {
if (null != dstTable) {
Expand All @@ -57,6 +58,8 @@ public void testISQLServerBulkRecord() throws SQLException {

class BulkData implements ISQLServerBulkRecord {

private static final long serialVersionUID = 1L;

private class ColumnMetadata {
String columnName;
int columnType;
Expand Down Expand Up @@ -101,7 +104,7 @@ private class ColumnMetadata {
for (int j = 0; j < totalColumn; j++) {
SqlType sqlType = dstTable.getSqlType(j);
if (JDBCType.BIT == sqlType.getJdbctype()) {
CurrentRow[j] = ((0 == ThreadLocalRandom.current().nextInt(2)) ? Boolean.FALSE : Boolean.TRUE);
CurrentRow[j] = ((0 == random.nextInt(2)) ? Boolean.FALSE : Boolean.TRUE);
} else {
if (j == 0) {
CurrentRow[j] = i + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ static void performBulkCopy(BulkCopyTestWrapper wrapper, DBTable sourceTable, DB
static void performBulkCopy(BulkCopyTestWrapper wrapper, DBTable sourceTable, boolean validateResult) {
DBTable destinationTable = null;
try (DBConnection con = new DBConnection(wrapper.getConnectionString());
DBConnection conBulkCopy = new DBConnection(wrapper.getDataSource());
DBStatement stmt = con.createStatement()) {

destinationTable = sourceTable.cloneSchema();
stmt.createTable(destinationTable);

try (DBResultSet srcResultSet = stmt.executeQuery("SELECT * FROM " + sourceTable.getEscapedTableName()
+ " ORDER BY " + sourceTable.getEscapedColumnName(0));
SQLServerBulkCopy bulkCopy = wrapper.isUsingConnection() ? new SQLServerBulkCopy(
(Connection) con.product()) : new SQLServerBulkCopy(wrapper.getConnectionString())) {
SQLServerBulkCopy bulkCopy = wrapper
.isUsingConnection() ? new SQLServerBulkCopy((Connection) conBulkCopy.product())
: new SQLServerBulkCopy(wrapper.getConnectionString())) {
if (wrapper.isUsingBulkCopyOptions()) {
bulkCopy.setBulkCopyOptions(wrapper.getBulkOptions());
}
Expand Down Expand Up @@ -112,11 +114,12 @@ static void performBulkCopy(BulkCopyTestWrapper wrapper, DBTable sourceTable, bo
static void performBulkCopy(BulkCopyTestWrapper wrapper, DBTable sourceTable, DBTable destinationTable,
boolean validateResult) {
try (DBConnection con = new DBConnection(wrapper.getConnectionString());
DBConnection conBulkCopy = new DBConnection(wrapper.getDataSource());
DBStatement stmt = con.createStatement();
DBResultSet srcResultSet = stmt.executeQuery("SELECT * FROM " + sourceTable.getEscapedTableName()
+ " ORDER BY " + sourceTable.getEscapedColumnName(0));
SQLServerBulkCopy bulkCopy = wrapper.isUsingConnection() ? new SQLServerBulkCopy(
(Connection) con.product()) : new SQLServerBulkCopy(wrapper.getConnectionString())) {
(Connection) conBulkCopy.product()) : new SQLServerBulkCopy(wrapper.getConnectionString())) {
if (wrapper.isUsingBulkCopyOptions()) {
bulkCopy.setBulkCopyOptions(wrapper.getBulkOptions());
}
Expand Down Expand Up @@ -156,11 +159,12 @@ static void performBulkCopy(BulkCopyTestWrapper wrapper, DBTable sourceTable, DB
static void performBulkCopy(BulkCopyTestWrapper wrapper, DBTable sourceTable, DBTable destinationTable,
boolean validateResult, boolean fail) {
try (DBConnection con = new DBConnection(wrapper.getConnectionString());
DBConnection conBulkCopy = new DBConnection(wrapper.getDataSource());
DBStatement stmt = con.createStatement();
DBResultSet srcResultSet = stmt.executeQuery("SELECT * FROM " + sourceTable.getEscapedTableName()
+ " ORDER BY " + sourceTable.getEscapedColumnName(0));
SQLServerBulkCopy bulkCopy = wrapper.isUsingConnection() ? new SQLServerBulkCopy(
(Connection) con.product()) : new SQLServerBulkCopy(wrapper.getConnectionString())) {
(Connection) conBulkCopy.product()) : new SQLServerBulkCopy(wrapper.getConnectionString())) {
try {
if (wrapper.isUsingBulkCopyOptions()) {
bulkCopy.setBulkCopyOptions(wrapper.getBulkOptions());
Expand Down Expand Up @@ -215,11 +219,12 @@ static void performBulkCopy(BulkCopyTestWrapper wrapper, DBTable sourceTable, DB
static void performBulkCopy(BulkCopyTestWrapper wrapper, DBTable sourceTable, DBTable destinationTable,
boolean validateResult, boolean fail, boolean dropDest) {
try (DBConnection con = new DBConnection(wrapper.getConnectionString());
DBConnection conBulkCopy = new DBConnection(wrapper.getDataSource());
DBStatement stmt = con.createStatement();
DBResultSet srcResultSet = stmt.executeQuery("SELECT * FROM " + sourceTable.getEscapedTableName()
+ " ORDER BY " + sourceTable.getEscapedColumnName(0));
SQLServerBulkCopy bulkCopy = wrapper.isUsingConnection() ? new SQLServerBulkCopy(
(Connection) con.product()) : new SQLServerBulkCopy(wrapper.getConnectionString())) {
(Connection) conBulkCopy.product()) : new SQLServerBulkCopy(wrapper.getConnectionString())) {
try {
if (wrapper.isUsingBulkCopyOptions()) {
bulkCopy.setBulkCopyOptions(wrapper.getBulkOptions());
Expand Down Expand Up @@ -312,7 +317,7 @@ static void validateValues(DBConnection con, DBTable sourceTable, DBTable destin
static void performBulkCopy(BulkCopyTestWrapper bulkWrapper, ISQLServerBulkRecord srcData, DBTable dstTable) {
try (DBConnection con = new DBConnection(bulkWrapper.getConnectionString());
DBStatement stmt = con.createStatement();
SQLServerBulkCopy bc = new SQLServerBulkCopy(bulkWrapper.getConnectionString());) {
SQLServerBulkCopy bc = new SQLServerBulkCopy(bulkWrapper.getConnectionString())) {
bc.setDestinationTableName(dstTable.getEscapedTableName());
bc.writeToServer(srcData);
validateValues(con, srcData, dstTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.util.LinkedList;

import com.microsoft.sqlserver.jdbc.ISQLServerDataSource;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions;


Expand All @@ -21,12 +22,22 @@ class BulkCopyTestWrapper {
/**
* <code>true</code> if SQLServerBulkCopy should use connection object
*/
private boolean isUsingConnection;
private boolean isUsingConnection = false;

/**
* <code>true</code> if SQLServerBulkCopy should use XA connection object
*/
private boolean isUsingXAConnection = false;

/**
* <code>true</code> if SQLServerBulkCopy should use pooled connection object
*/
private boolean isUsingPooledConnection = false;

/**
* <code>true</code> if SQLServerBulkCopy should include SQLServerBulkCopyOptions
*/
private boolean useBulkCopyOptions;
private boolean useBulkCopyOptions = false;

/**
* <code>true</code> if SQLServerBulkCopy should use column mapping
Expand All @@ -39,27 +50,84 @@ class BulkCopyTestWrapper {

private String connectionString;

private ISQLServerDataSource dataSource;

BulkCopyTestWrapper(String connectionString) {
this.connectionString = connectionString;
}

/**
* Returns boolean value <code>true</code> if connection object is used for testing bulk copy
*
* @return
* @return isUsingConnection
*/
public boolean isUsingConnection() {
return isUsingConnection;
}

/**
* Returns boolean value <code>true</code> if XA connection object is used for testing bulk copy
*
* @return isUsingXAConnection
*/
public boolean isUsingXAConnection() {
return isUsingXAConnection;
}

/**
* Returns boolean value <code>true</code> if pooled connection object is used for testing bulk copy
*
* @return isUsingPooledConnection
*/
public boolean isUsingPooledConnection() {
return isUsingPooledConnection;
}

/**
* @param isUsingConnection
* <code>true</code> if connection object should be passed in BulkCopy constructor <code>false</code> if
* connection string is to be passed to constructor
*/
public void setUsingConnection(boolean isUsingConnection) {
public void setUsingConnection(boolean isUsingConnection, ISQLServerDataSource ds) {
this.isUsingConnection = isUsingConnection;
testName += "isUsingConnection=" + isUsingConnection + ";";
setDataSource(ds);
}

/**
* Sets boolean property if XA Connection must be used to test bulk Copy
*
* @param isUsingXAConnection
* <code>true</code> if XA connection object should be passed in BulkCopy constructor <code>false</code> if
* otherwise
*/
public void setUsingXAConnection(boolean isUsingXAConnection, ISQLServerDataSource ds) {
if (!isUsingConnection) {
// Cannot use XA Connection if connection itself is not in use.
isUsingXAConnection = false;
}
this.isUsingXAConnection = isUsingXAConnection;
testName += "isUsingXAConnection=" + isUsingXAConnection + ";";
if (isUsingXAConnection)
setDataSource(ds);
}

/**
* Sets boolean property if Pooled Connection must be used to test bulk Copy
*
* @param isUsingPooledConnection
* <code>true</code> if Pooled connection object should be passed in BulkCopy constructor <code>false</code>
* if otherwise
*/
public void setUsingPooledConnection(boolean isUsingPooledConnection, ISQLServerDataSource ds) {
if (!isUsingConnection || isUsingXAConnection) {
// Cannot use Pooled connection if connection itself is not in use or we are already using XA Connection
isUsingPooledConnection = false;
}
this.isUsingPooledConnection = isUsingPooledConnection;
testName += "isUsingPooledConnection=" + isUsingPooledConnection + ";";
if (isUsingPooledConnection)
setDataSource(ds);
}

public boolean isUsingBulkCopyOptions() {
Expand Down Expand Up @@ -87,6 +155,14 @@ public void setConnectionString(String connectionString) {
this.connectionString = connectionString;
}

public ISQLServerDataSource getDataSource() {
return dataSource;
}

public void setDataSource(ISQLServerDataSource dataSource) {
this.dataSource = dataSource;
}

public void setUsingColumnMapping() {
this.isUsingColumnMapping = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.sql.SQLException;
import java.util.concurrent.ThreadLocalRandom;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -58,7 +57,9 @@ public void execute() throws SQLException {

private void testBulkCopyWithTimeout(int timeout) throws SQLException {
BulkCopyTestWrapper bulkWrapper = new BulkCopyTestWrapper(connectionString);
bulkWrapper.setUsingConnection((0 == ThreadLocalRandom.current().nextInt(2)) ? true : false);
bulkWrapper.setUsingConnection((0 == random.nextInt(2)) ? true : false, ds);
bulkWrapper.setUsingXAConnection((0 == random.nextInt(2)) ? true : false, dsXA);
bulkWrapper.setUsingPooledConnection((0 == random.nextInt(2)) ? true : false, dsPool);
SQLServerBulkCopyOptions option = new SQLServerBulkCopyOptions();
option.setBulkCopyTimeout(timeout);
bulkWrapper.useBulkCopyOptions(true);
Expand Down
Loading