Skip to content

Commit

Permalink
Merge pull request #270 from v-xiangs/BulkCopy_Cursor_Issue
Browse files Browse the repository at this point in the history
fix bulk copy cursor issue
  • Loading branch information
xiangyushawn authored May 9, 2017
2 parents 5b2868c + 655c696 commit 75ecbcb
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 20 deletions.
4 changes: 4 additions & 0 deletions src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7023,6 +7023,10 @@ final void log(Level level,
// Volatile ensures visibility to execution thread and interrupt thread
private volatile TDSWriter tdsWriter;
private volatile TDSReader tdsReader;

protected TDSWriter getTDSWriter(){
return tdsWriter;
}

// Lock to ensure atomicity when manipulating more than one of the following
// shared interrupt state variables below.
Expand Down
103 changes: 83 additions & 20 deletions src/main/java/com/microsoft/sqlserver/jdbc/SQLServerBulkCopy.java
Original file line number Diff line number Diff line change
Expand Up @@ -1529,25 +1529,48 @@ private boolean doInsertBulk(TDSCommand command) throws SQLServerException {
// Begin a manual transaction for this batch.
connection.setAutoCommit(false);
}

boolean insertRowByRow = false;

// Create and send the initial command for bulk copy ("INSERT BULK ...").
TDSWriter tdsWriter = command.startRequest(TDS.PKT_QUERY);
String bulkCmd = createInsertBulkCommand(tdsWriter);
tdsWriter.writeString(bulkCmd);
TDSParser.parse(command.startResponse(), command.getLogContext());
if (null != sourceResultSet && sourceResultSet instanceof SQLServerResultSet) {
SQLServerStatement src_stmt = (SQLServerStatement) ((SQLServerResultSet) sourceResultSet).getStatement();
int resultSetServerCursorId = ((SQLServerResultSet) sourceResultSet).getServerCursorId();

// Send the bulk data. This is the BulkLoadBCP TDS stream.
tdsWriter = command.startRequest(TDS.PKT_BULK);
if (connection.equals(src_stmt.getConnection()) && 0 != resultSetServerCursorId) {
insertRowByRow = true;
}

if (((SQLServerResultSet) sourceResultSet).isForwardOnly()) {
try {
sourceResultSet.setFetchSize(1);
}
catch (SQLException e) {
SQLServerException.makeFromDriverError(connection, sourceResultSet, e.getMessage(), e.getSQLState(), true);
}
}
}

TDSWriter tdsWriter = null;
boolean moreDataAvailable = false;

try {
// Write the COLUMNMETADATA token in the stream.
writeColumnMetaData(tdsWriter);
if (!insertRowByRow) {
tdsWriter = sendBulkCopyCommand(command);
}

// Write all ROW tokens in the stream.
moreDataAvailable = writeBatchData(tdsWriter);
try {
// Write all ROW tokens in the stream.
moreDataAvailable = writeBatchData(tdsWriter, command, insertRowByRow);
}
finally {
tdsWriter = command.getTDSWriter();
}
}
catch (SQLServerException ex) {
if (null == tdsWriter) {
tdsWriter = command.getTDSWriter();
}

// Close the TDS packet before handling the exception
writePacketDataDone(tdsWriter);

Expand All @@ -1561,18 +1584,25 @@ private boolean doInsertBulk(TDSCommand command) throws SQLServerException {
throw ex;
}
finally {
if (null == tdsWriter) {
tdsWriter = command.getTDSWriter();
}

// reset the cryptoMeta in IOBuffer
tdsWriter.setCryptoMetaData(null);
}
// Write the DONE token in the stream. We may have to append the DONE token with every packet that is sent.
// For the current packets the driver does not generate a DONE token, but the BulkLoadBCP stream needs a DONE token
// after every packet. For now add it manually here for one packet.
// Note: This may break if more than one packet is sent.
// This is an example from https://msdn.microsoft.com/en-us/library/dd340549.aspx
writePacketDataDone(tdsWriter);

if (!insertRowByRow) {
// Write the DONE token in the stream. We may have to append the DONE token with every packet that is sent.
// For the current packets the driver does not generate a DONE token, but the BulkLoadBCP stream needs a DONE token
// after every packet. For now add it manually here for one packet.
// Note: This may break if more than one packet is sent.
// This is an example from https://msdn.microsoft.com/en-us/library/dd340549.aspx
writePacketDataDone(tdsWriter);

// Send to the server and read response.
TDSParser.parse(command.startResponse(), command.getLogContext());
// Send to the server and read response.
TDSParser.parse(command.startResponse(), command.getLogContext());
}

if (copyOptions.isUseInternalTransaction()) {
// Commit the transaction for this batch.
Expand All @@ -1582,6 +1612,22 @@ private boolean doInsertBulk(TDSCommand command) throws SQLServerException {
return moreDataAvailable;
}

private TDSWriter sendBulkCopyCommand(TDSCommand command) throws SQLServerException {
// Create and send the initial command for bulk copy ("INSERT BULK ...").
TDSWriter tdsWriter = command.startRequest(TDS.PKT_QUERY);
String bulkCmd = createInsertBulkCommand(tdsWriter);
tdsWriter.writeString(bulkCmd);
TDSParser.parse(command.startResponse(), command.getLogContext());

// Send the bulk data. This is the BulkLoadBCP TDS stream.
tdsWriter = command.startRequest(TDS.PKT_BULK);

// Write the COLUMNMETADATA token in the stream.
writeColumnMetaData(tdsWriter);

return tdsWriter;
}

private void writePacketDataDone(TDSWriter tdsWriter) throws SQLServerException {
// This is an example from https://msdn.microsoft.com/en-us/library/dd340549.aspx
tdsWriter.writeByte((byte) 0xFD);
Expand Down Expand Up @@ -3151,7 +3197,9 @@ private boolean goToNextRow() throws SQLServerException {
* Writes data for a batch of rows to the TDSWriter object. Writes the following part in the BulkLoadBCP stream
* (https://msdn.microsoft.com/en-us/library/dd340549.aspx) <ROW> ... </ROW>
*/
private boolean writeBatchData(TDSWriter tdsWriter) throws SQLServerException {
private boolean writeBatchData(TDSWriter tdsWriter,
TDSCommand command,
boolean insertRowByRow) throws SQLServerException {
int batchsize = copyOptions.getBatchSize();
int row = 0;
while (true) {
Expand All @@ -3163,6 +3211,13 @@ private boolean writeBatchData(TDSWriter tdsWriter) throws SQLServerException {
// No more data available, return false so we do not execute any more batches.
if (!goToNextRow())
return false;

if (insertRowByRow) {
// read response gotten from goToNextRow()
((SQLServerResultSet) sourceResultSet).getTDSReader().readPacket();

tdsWriter = sendBulkCopyCommand(command);
}

// Write row header for each row.
tdsWriter.writeByte((byte) TDS.TDS_ROW);
Expand Down Expand Up @@ -3195,6 +3250,14 @@ private boolean writeBatchData(TDSWriter tdsWriter) throws SQLServerException {
}
}
row++;

if (insertRowByRow) {
writePacketDataDone(tdsWriter);
tdsWriter.setCryptoMetaData(null);

// Send to the server and read response.
TDSParser.parse(command.startResponse(), command.getLogContext());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ private void skipColumns(int columnsToSkip,

/** TDS reader from which row values are read */
private TDSReader tdsReader;

protected TDSReader getTDSReader() {
return tdsReader;
}

private final FetchBuffer fetchBuffer;

Expand Down
Loading

0 comments on commit 75ecbcb

Please sign in to comment.