Skip to content

Commit

Permalink
Add support for batch execution (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovetodream authored Oct 18, 2024
1 parent a466539 commit 6ffe70a
Show file tree
Hide file tree
Showing 40 changed files with 1,182 additions and 239 deletions.
29 changes: 19 additions & 10 deletions Package.resolved
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
{
"pins" : [
{
"identity" : "swift-asn1",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-asn1.git",
"state" : {
"revision" : "7faebca1ea4f9aaf0cda1cef7c43aecd2311ddf6",
"version" : "1.3.0"
}
},
{
"identity" : "swift-async-algorithms",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-async-algorithms.git",
"state" : {
"revision" : "6ae9a051f76b81cc668305ceed5b0e0a7fd93d20",
"version" : "1.0.1"
"revision" : "5c8bd186f48c16af0775972700626f0b74588278",
"version" : "1.0.2"
}
},
{
Expand All @@ -32,8 +41,8 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-crypto.git",
"state" : {
"revision" : "9f95b4d033a4edd3814b48608db3f2ca90c7218b",
"version" : "3.7.0"
"revision" : "21f7878f2b39d46fd8ba2b06459ccb431cdf876c",
"version" : "3.8.1"
}
},
{
Expand All @@ -50,26 +59,26 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio.git",
"state" : {
"revision" : "9746cf80e29edfef2a39924a66731249223f42a3",
"version" : "2.72.0"
"revision" : "f7dc3f527576c398709b017584392fb58592e7f5",
"version" : "2.75.0"
}
},
{
"identity" : "swift-nio-ssl",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-ssl.git",
"state" : {
"revision" : "7b84abbdcef69cc3be6573ac12440220789dcd69",
"version" : "2.27.2"
"revision" : "d7ceaf0e4d8001cd35cdc12e42cdd281e9e564e8",
"version" : "2.28.0"
}
},
{
"identity" : "swift-nio-transport-services",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-nio-transport-services.git",
"state" : {
"revision" : "38ac8221dd20674682148d6451367f89c2652980",
"version" : "1.21.0"
"revision" : "dbace16f126fdcd80d58dc54526c561ca17327d7",
"version" : "1.22.0"
}
},
{
Expand Down
207 changes: 207 additions & 0 deletions Sources/OracleNIO/Connection/OracleConnection+BatchExecution.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the OracleNIO open source project
//
// Copyright (c) 2024 Timo Zacherl and the OracleNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE for license information
// See CONTRIBUTORS.md for the list of OracleNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Logging

extension OracleConnection {
/// Executes the statement multiple times using the specified bind collections without requiring multiple roundtrips to the database.
/// - Parameters:
/// - statement: The raw SQL statement.
/// - binds: A collection of bind parameters to execute the statement with. The statement will execute `binds.count` times.
/// - encodingContext: The ``OracleEncodingContext`` used to encode the binds. A default parameter is provided.
/// - options: A bunch of parameters to optimize the statement in different ways.
/// Normally this can be ignored, but feel free to experiment based on your needs.
/// Every option and its impact is documented.
/// - logger: The `Logger` to log statement related background events into. Defaults to logging disabled.
/// - file: The file, the statement was started in. Used for better error reporting.
/// - line: The line, the statement was started in. Used for better error reporting.
/// - Returns: A ``OracleBatchExecutionResult`` containing the amount of affected rows and other metadata the server sent.
///
/// Batch execution is useful for inserting or updating multiple rows efficiently when working with large data sets. It significally outperforms
/// repeated calls to ``execute(_:options:logger:file:line:)-9uyvp`` by reducing network transfer costs and database overheads.
/// It can also be used to execute PL/SQL statements multiple times at once.
/// ```swift
/// let binds: [(Int, String, Int)] = [
/// (1, "John", 20),
/// (2, "Jane", 30),
/// (3, "Jack", 40),
/// (4, "Jill", 50),
/// (5, "Pete", 60),
/// ]
/// try await connection.executeBatch(
/// "INSERT INTO users (id, name, age) VALUES (:1, :2, :3)",
/// binds: binds
/// )
/// ```
@discardableResult
public func executeBatch<each Bind: OracleThrowingDynamicTypeEncodable>(
_ statement: String,
binds: [(repeat (each Bind)?)],
encodingContext: OracleEncodingContext = .default,
options: StatementOptions = .init(),
logger: Logger? = nil,
file: String = #fileID, line: Int = #line
) async throws -> OracleBatchExecutionResult {
var logger = logger ?? Self.noopLogger
logger[oracleMetadataKey: .connectionID] = "\(self.id)"
logger[oracleMetadataKey: .sessionID] = "\(self.sessionID)"

var collection = OracleBindingsCollection()
for row in binds {
try collection.appendRow(repeat each row, context: encodingContext)
}

return try await _executeBatch(
statement: statement,
collection: collection,
options: options,
logger: logger,
file: file,
line: line
)
}

/// Executes the prepared statements without requiring multiple roundtrips to the database.
/// - Parameters:
/// - statements: The prepared statements to execute.
/// - options: A bunch of parameters to optimize the statement in different ways.
/// Normally this can be ignored, but feel free to experiment based on your needs.
/// Every option and its impact is documented.
/// - logger: The `Logger` to log statement related background events into. Defaults to logging disabled.
/// - file: The file, the statement was started in. Used for better error reporting.
/// - line: The line, the statement was started in. Used for better error reporting.
/// - Returns: A ``OracleBatchExecutionResult`` containing the amount of affected rows and other metadata the server sent.
///
/// Batch execution is useful for inserting or updating multiple rows efficiently when working with large data sets. It significally outperforms
/// repeated calls to ``execute(_:options:logger:file:line:)-9uyvp`` by reducing network transfer costs and database overheads.
/// It can also be used to execute PL/SQL statements multiple times at once.
/// ```swift
/// try await connection.executeBatch([
/// InsertUserStatement(id: 1, name: "John", age: 20),
/// InsertUserStatement(id: 2, name: "Jane", age: 30),
/// InsertUserStatement(id: 3, name: "Jack", age: 40),
/// InsertUserStatement(id: 4, name: "Jill", age: 50),
/// InsertUserStatement(id: 5, name: "Pete", age: 60),
/// ])
/// ```
@discardableResult
public func executeBatch<Statement: OraclePreparedStatement>(
_ statements: [Statement],
options: StatementOptions = .init(),
logger: Logger? = nil,
file: String = #fileID, line: Int = #line
) async throws -> OracleBatchExecutionResult {
if statements.isEmpty {
throw OracleSQLError.missingStatement
}

var logger = logger ?? Self.noopLogger
logger[oracleMetadataKey: .connectionID] = "\(self.id)"
logger[oracleMetadataKey: .sessionID] = "\(self.sessionID)"

var collection = OracleBindingsCollection()
for statement in statements {
try collection.appendRow(statement.makeBindings())
}
return try await _executeBatch(
statement: Statement.sql,
collection: collection,
options: options,
logger: logger,
file: file,
line: line
)
}

private func _executeBatch(
statement: String,
collection: OracleBindingsCollection,
options: StatementOptions,
logger: Logger,
file: String,
line: Int
) async throws -> OracleBatchExecutionResult {
let promise = self.channel.eventLoop.makePromise(
of: OracleRowStream.self
)
let context = StatementContext(
statement: statement,
bindCollection: collection,
options: options,
logger: logger,
promise: promise
)

self.channel.write(OracleTask.statement(context), promise: nil)

do {
let stream = try await promise.futureResult
.map({ $0.asyncSequence() })
.get()
let affectedRows = try await stream.affectedRows
let affectedRowsPerStatement = options.arrayDMLRowCounts ? stream.rowCounts : nil
let batchErrors = options.batchErrors ? stream.batchErrors : nil
let result = OracleBatchExecutionResult(
affectedRows: affectedRows,
affectedRowsPerStatement: affectedRowsPerStatement
)
if let batchErrors {
throw OracleBatchExecutionError(
result: result,
errors: batchErrors,
statement: statement,
file: file,
line: line
)
}
return result
} catch var error as OracleSQLError {
error.file = file
error.line = line
error.statement = .init(unsafeSQL: statement)
throw error // rethrow with more metadata
}
}
}

/// The result of a batch execution.
public struct OracleBatchExecutionResult: Sendable {
/// The total amount of affected rows.
public let affectedRows: Int
/// The amount of affected rows per statement.
///
/// - Note: Only available if ``StatementOptions/arrayDMLRowCounts`` is set to `true`.
///
/// For example, if five single row `INSERT` statements are executed and the fifth one fails, the following array would be returned.
/// ```swift
/// [1, 1, 1, 1, 0]
/// ```
public let affectedRowsPerStatement: [Int]?
}

/// An error that is thrown when a batch execution contains both successful and failed statements.
///
/// - Note: This error is only thrown when ``StatementOptions/batchErrors`` is set to `true`.
/// Otherwise ``OracleSQLError`` will be thrown as usual. Be aware that all the statements
/// executed before the error is thrown won't be reverted regardless of this setting.
/// They can still be reverted using a ``OracleConnection/rollback()``.
public struct OracleBatchExecutionError: Error, Sendable {
/// The result of the partially finished batch execution.
public let result: OracleBatchExecutionResult
/// A collection of errors thrown by statements in the batch execution.
public let errors: [OracleSQLError.BatchError]
public let statement: String
public let file: String
public let line: Int
}
9 changes: 9 additions & 0 deletions Sources/OracleNIO/Connection/OracleConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,15 @@ extension OracleConnection {
}

/// Execute a prepared statement.
/// - Parameters:
/// - statement: The statement to be executed.
/// - options: A bunch of parameters to optimize the statement in different ways.
/// Normally this can be ignored, but feel free to experiment based on your needs.
/// Every option and its impact is documented.
/// - logger: The `Logger` to log statement related background events into. Defaults to logging disabled.
/// - file: The file, the statement was started in. Used for better error reporting.
/// - line: The line, the statement was started in. Used for better error reporting.
/// - Returns: An async sequence of `Row`s. The result sequence can be discarded if the statement has no result.
public func execute<Statement: OraclePreparedStatement, Row>(
_ statement: Statement,
options: StatementOptions = .init(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ struct ConnectionStateMachine {

// Statement streaming
case forwardRows([DataRow])
case forwardStreamComplete([DataRow], cursorID: UInt16)
case forwardStreamComplete([DataRow], cursorID: UInt16, affectedRows: Int)
case forwardStreamError(
OracleSQLError, read: Bool, cursorID: UInt16?, clientCancelled: Bool
)
Expand Down Expand Up @@ -639,7 +639,17 @@ struct ConnectionStateMachine {
mutating func queryParameterReceived(
_ parameter: OracleBackendMessage.QueryParameter
) -> ConnectionAction {
return .wait
switch self.state {
case .statement(var statement):
return self.avoidingStateMachineCoW { machine in
let action = statement.queryParameterReceived(parameter)
machine.state = .statement(statement)
return machine.modify(with: action)
}
default:
assertionFailure("Invalid state: \(self.state)")
return self.closeConnectionAndCleanup(.unexpectedBackendMessage(.queryParameter(parameter)))
}
}

mutating func bitVectorReceived(
Expand Down Expand Up @@ -1016,7 +1026,7 @@ extension ConnectionStateMachine {
.uncleanShutdown,
.unsupportedDataType:
return true
case .statementCancelled, .nationalCharsetNotSupported:
case .statementCancelled, .nationalCharsetNotSupported, .missingStatement:
return false
case .server:
switch error.serverInfo?.number {
Expand Down Expand Up @@ -1147,8 +1157,8 @@ extension ConnectionStateMachine {
return .succeedStatement(promise, columns)
case .forwardRows(let rows):
return .forwardRows(rows)
case .forwardStreamComplete(let rows, let cursorID):
return .forwardStreamComplete(rows, cursorID: cursorID)
case .forwardStreamComplete(let rows, let cursorID, let affectedRows):
return .forwardStreamComplete(rows, cursorID: cursorID, affectedRows: affectedRows)
case .forwardStreamError(
let error, let read, let cursorID, let clientCancelled
):
Expand Down
Loading

0 comments on commit 6ffe70a

Please sign in to comment.