Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bulk copy cursor issue #270

Merged
merged 9 commits into from
May 9, 2017
4 changes: 4 additions & 0 deletions src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7015,6 +7015,10 @@ final void log(Level level,
// Volatile ensures visibility to execution thread and interrupt thread
private volatile TDSWriter tdsWriter;
private volatile TDSReader tdsReader;

protected TDSWriter getTDSWriter(){
return tdsWriter;
}

// Lock to ensure atomicity when manipulating more than one of the following
// shared interrupt state variables below.
Expand Down
101 changes: 81 additions & 20 deletions src/main/java/com/microsoft/sqlserver/jdbc/SQLServerBulkCopy.java
Original file line number Diff line number Diff line change
Expand Up @@ -1529,23 +1529,42 @@ private boolean doInsertBulk(TDSCommand command) throws SQLServerException {
// Begin a manual transaction for this batch.
connection.setAutoCommit(false);
}

boolean insertRowByRow = false;

// Create and send the initial command for bulk copy ("INSERT BULK ...").
TDSWriter tdsWriter = command.startRequest(TDS.PKT_QUERY);
String bulkCmd = createInsertBulkCommand(tdsWriter);
tdsWriter.writeString(bulkCmd);
TDSParser.parse(command.startResponse(), command.getLogContext());
if (null != sourceResultSet && sourceResultSet instanceof SQLServerResultSet) {
SQLServerStatement src_stmt = (SQLServerStatement) ((SQLServerResultSet) sourceResultSet).getStatement();
int resultSetServerCursorId = ((SQLServerResultSet) sourceResultSet).getServerCursorId();

// Send the bulk data. This is the BulkLoadBCP TDS stream.
tdsWriter = command.startRequest(TDS.PKT_BULK);
if (connection.equals(src_stmt.getConnection()) && 0 != resultSetServerCursorId) {
insertRowByRow = true;
}

if (((SQLServerResultSet) sourceResultSet).isForwardOnly()) {
try {
sourceResultSet.setFetchSize(1);
}
catch (SQLException e) {
SQLServerException.makeFromDriverError(connection, sourceResultSet, e.getMessage(), e.getSQLState(), true);
}
}
}

TDSWriter tdsWriter = null;
boolean moreDataAvailable = false;

try {
// Write the COLUMNMETADATA token in the stream.
writeColumnMetaData(tdsWriter);
if (!insertRowByRow) {
tdsWriter = sendBulkCopyCommand(command);
}

// Write all ROW tokens in the stream.
moreDataAvailable = writeBatchData(tdsWriter);
try {
// Write all ROW tokens in the stream.
moreDataAvailable = writeBatchData(tdsWriter, command, insertRowByRow);
}
finally {
tdsWriter = command.getTDSWriter();
}
}
catch (SQLServerException ex) {
// Close the TDS packet before handling the exception
Expand All @@ -1564,15 +1583,18 @@ private boolean doInsertBulk(TDSCommand command) throws SQLServerException {
// reset the cryptoMeta in IOBuffer
tdsWriter.setCryptoMetaData(null);
}
// Write the DONE token in the stream. We may have to append the DONE token with every packet that is sent.
// For the current packets the driver does not generate a DONE token, but the BulkLoadBCP stream needs a DONE token
// after every packet. For now add it manually here for one packet.
// Note: This may break if more than one packet is sent.
// This is an example from https://msdn.microsoft.com/en-us/library/dd340549.aspx
writePacketDataDone(tdsWriter);

if (!insertRowByRow) {
// Write the DONE token in the stream. We may have to append the DONE token with every packet that is sent.
// For the current packets the driver does not generate a DONE token, but the BulkLoadBCP stream needs a DONE token
// after every packet. For now add it manually here for one packet.
// Note: This may break if more than one packet is sent.
// This is an example from https://msdn.microsoft.com/en-us/library/dd340549.aspx
writePacketDataDone(tdsWriter);

// Send to the server and read response.
TDSParser.parse(command.startResponse(), command.getLogContext());
// Send to the server and read response.
TDSParser.parse(command.startResponse(), command.getLogContext());
}

if (copyOptions.isUseInternalTransaction()) {
// Commit the transaction for this batch.
Expand All @@ -1582,6 +1604,28 @@ private boolean doInsertBulk(TDSCommand command) throws SQLServerException {
return moreDataAvailable;
}

private TDSWriter sendBulkCopyCommand(TDSCommand command) {
// Create and send the initial command for bulk copy ("INSERT BULK ...").
TDSWriter tdsWriter = null;
try {
tdsWriter = command.startRequest(TDS.PKT_QUERY);
String bulkCmd = createInsertBulkCommand(tdsWriter);
tdsWriter.writeString(bulkCmd);
TDSParser.parse(command.startResponse(), command.getLogContext());

// Send the bulk data. This is the BulkLoadBCP TDS stream.
tdsWriter = command.startRequest(TDS.PKT_BULK);

// Write the COLUMNMETADATA token in the stream.
writeColumnMetaData(tdsWriter);
}
catch (SQLServerException e) {
return tdsWriter;
}

return tdsWriter;
}

private void writePacketDataDone(TDSWriter tdsWriter) throws SQLServerException {
// This is an example from https://msdn.microsoft.com/en-us/library/dd340549.aspx
tdsWriter.writeByte((byte) 0xFD);
Expand Down Expand Up @@ -3151,7 +3195,9 @@ private boolean goToNextRow() throws SQLServerException {
* Writes data for a batch of rows to the TDSWriter object. Writes the following part in the BulkLoadBCP stream
* (https://msdn.microsoft.com/en-us/library/dd340549.aspx) <ROW> ... </ROW>
*/
private boolean writeBatchData(TDSWriter tdsWriter) throws SQLServerException {
private boolean writeBatchData(TDSWriter tdsWriter,
TDSCommand command,
boolean insertRowByRow) throws SQLServerException {
int batchsize = copyOptions.getBatchSize();
int row = 0;
while (true) {
Expand All @@ -3163,6 +3209,13 @@ private boolean writeBatchData(TDSWriter tdsWriter) throws SQLServerException {
// No more data available, return false so we do not execute any more batches.
if (!goToNextRow())
return false;

if (insertRowByRow) {
// read response gotten from goToNextRow()
((SQLServerResultSet) sourceResultSet).getTDSReader().readPacket();

tdsWriter = sendBulkCopyCommand(command);
}

// Write row header for each row.
tdsWriter.writeByte((byte) TDS.TDS_ROW);
Expand Down Expand Up @@ -3195,6 +3248,14 @@ private boolean writeBatchData(TDSWriter tdsWriter) throws SQLServerException {
}
}
row++;

if (insertRowByRow) {
writePacketDataDone(tdsWriter);
tdsWriter.setCryptoMetaData(null);

// Send to the server and read response.
TDSParser.parse(command.startResponse(), command.getLogContext());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ private void skipColumns(int columnsToSkip,

/** TDS reader from which row values are read */
private TDSReader tdsReader;

protected TDSReader getTDSReader() {
return tdsReader;
}

private final FetchBuffer fetchBuffer;

Expand Down
Loading