Skip to content

Commit

Permalink
Add support for explicit open and close on LOBs (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovetodream authored Jun 30, 2024
1 parent 8459fe2 commit 73c09e8
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -825,9 +825,10 @@ struct ConnectionStateMachine {
mutating func lobParameterReceived(parameter: OracleBackendMessage.LOBParameter)
-> ConnectionAction
{
guard case .lobOperation = self.state else {
guard case .lobOperation(let context) = self.state else {
preconditionFailure("How can we receive LOB data in \(self.state)")
}
context.boolFlag = parameter.boolFlag
return .wait // waiting for error
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/OracleNIO/Constants.swift
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ enum Constants {
static let TNS_LOB_LOCATOR_FLAGS_VAR_LENGTH_CHARSET = 0x80

// MARK: Other LOB constants
static let TNS_LOB_OPEN_READ_WRITE = 2
static let TNS_LOB_OPEN_READ_WRITE: UInt64 = 2
static let TNS_LOB_PREFETCH_FLAG: UInt32 = 0x2000000

// MARK: Base JSON constants
Expand Down
65 changes: 65 additions & 0 deletions Sources/OracleNIO/Data/LOB.swift
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,71 @@ extension LOB {
}
}

/// Open the LOB for multiple ``write(_:at:on:)`` for improved performance.
///
/// If this is not called before writing, every write operation opens and closes the LOB internally.
///
/// Call ``close(on:)`` after you are done writing to the LOB.
///
/// - Parameter connection: The connection used to open the LOB.
/// This has to be the same one the LOB reference was created on.
public func open(on connection: OracleConnection) async throws {
let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self)
connection.channel.write(
OracleTask.lobOperation(
.init(
sourceLOB: self,
sourceOffset: 0,
destinationLOB: nil,
destinationOffset: 0,
operation: .open,
sendAmount: true,
amount: Constants.TNS_LOB_OPEN_READ_WRITE,
promise: promise
)), promise: nil)
_ = try await promise.futureResult.get()
}
/// Checks if the LOB is currently open for ``write(_:at:on:)`` operations.
///
/// - Parameter connection: The connection used to check the status of the LOB on.
/// This has to be the same one the LOB reference was created on.
public func isOpen(on connection: OracleConnection) async throws -> Bool {
let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self)
let context = LOBOperationContext(
sourceLOB: self,
sourceOffset: 0,
destinationLOB: nil,
destinationOffset: 0,
operation: .isOpen,
sendAmount: false,
amount: 0,
promise: promise
)
connection.channel.write(OracleTask.lobOperation(context), promise: nil)
_ = try await promise.futureResult.get()
return context.boolFlag ?? false
}
/// Closes the LOB if it is currently open for ``write(_:at:on:)`` operations.
///
/// - Parameter connection: The connection used to close the LOB.
/// This has to be the same one the LOB reference was created on.
public func close(on connection: OracleConnection) async throws {
let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self)
connection.channel.write(
OracleTask.lobOperation(
.init(
sourceLOB: self,
sourceOffset: 0,
destinationLOB: nil,
destinationOffset: 0,
operation: .close,
sendAmount: false,
amount: 0,
promise: promise
)), promise: nil)
_ = try await promise.futureResult.get()
}

/// Write data to the LOB starting on the specified offset.
/// - Parameters:
/// - buffer: The chunk of data which should be written to the LOB.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ extension OracleBackendMessage {
if context.lobContext?.operation == .createTemp
|| context.lobContext?.operation == .isOpen
{
let temp16 = try buffer.throwingReadUB2() // flag
boolFlag = temp16 > 0
// flag
boolFlag = try buffer.throwingReadInteger(as: UInt8.self) > 0
} else {
boolFlag = nil
}
Expand Down
5 changes: 2 additions & 3 deletions Sources/OracleNIO/OracleChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,9 @@ final class OracleChannelHandler: ChannelDuplexHandler {
switch lobContext.operation {
case .read:
lobContext.promise.succeed(lobContext.data)
case .write, .trim:
case .open, .isOpen, .close, .write, .trim:
lobContext.promise.succeed(nil)
case .getLength, .getChunkSize, .createTemp, .freeTemp, .open, .close,
.isOpen, .array:
case .getLength, .getChunkSize, .createTemp, .freeTemp, .array:
fatalError("not yet supported")
}
self.run(self.state.readyForStatementReceived(), with: context)
Expand Down
1 change: 1 addition & 0 deletions Sources/OracleNIO/OracleTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ final class LOBOperationContext {
let amount: UInt64
let promise: EventLoopPromise<ByteBuffer?>

var boolFlag: Bool?
var data: ByteBuffer?

init(
Expand Down
29 changes: 29 additions & 0 deletions Tests/IntegrationTests/LOBTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,35 @@ final class LOBTests: XCTIntegrationTest {
try await validateLOB(expected: buffer, on: connection)
}

func testWriteLOBStreamWithExplicitOpenAndClose() async throws {
let data = try Data(contentsOf: fileURL)
var buffer = ByteBuffer(data: data)
let lobRef = OracleRef(dataType: .blob, isReturnBind: true)
try await connection.execute(
"INSERT INTO test_simple_blob (id, content) VALUES (1, empty_blob()) RETURNING content INTO \(lobRef)",
options: .init(fetchLOBs: true)
)
let lob = try lobRef.decode(of: LOB.self)
var offset: UInt64 = 1
let chunkSize = 65536
try await lob.open(on: connection)
while buffer.readableBytes > 0,
let slice =
buffer
.readSlice(length: min(chunkSize, buffer.readableBytes))
{
try await lob.write(slice, at: offset, on: connection)
offset += UInt64(slice.readableBytes)
}
let isOpen = try await lob.isOpen(on: connection)
XCTAssertTrue(isOpen)
if isOpen {
try await lob.close(on: connection)
}
buffer.moveReaderIndex(to: 0)
try await validateLOB(expected: buffer, on: connection)
}

func testTrimLOB() async throws {
let data = try Data(contentsOf: fileURL)
let buffer = ByteBuffer(data: data)
Expand Down

0 comments on commit 73c09e8

Please sign in to comment.