From 73c09e8b172df20258a09f41df1d98c6b95e6c8e Mon Sep 17 00:00:00 2001 From: Timo <38291523+lovetodream@users.noreply.github.com> Date: Sun, 30 Jun 2024 14:48:28 +0200 Subject: [PATCH] Add support for explicit open and close on LOBs (#50) --- .../ConnectionStateMachine.swift | 3 +- Sources/OracleNIO/Constants.swift | 2 +- Sources/OracleNIO/Data/LOB.swift | 65 +++++++++++++++++++ .../OracleBackendMessage+Parameter.swift | 4 +- Sources/OracleNIO/OracleChannelHandler.swift | 5 +- Sources/OracleNIO/OracleTask.swift | 1 + Tests/IntegrationTests/LOBTests.swift | 29 +++++++++ 7 files changed, 102 insertions(+), 7 deletions(-) diff --git a/Sources/OracleNIO/ConnectionStateMachine/ConnectionStateMachine.swift b/Sources/OracleNIO/ConnectionStateMachine/ConnectionStateMachine.swift index d8d95b3..9638298 100644 --- a/Sources/OracleNIO/ConnectionStateMachine/ConnectionStateMachine.swift +++ b/Sources/OracleNIO/ConnectionStateMachine/ConnectionStateMachine.swift @@ -825,9 +825,10 @@ struct ConnectionStateMachine { mutating func lobParameterReceived(parameter: OracleBackendMessage.LOBParameter) -> ConnectionAction { - guard case .lobOperation = self.state else { + guard case .lobOperation(let context) = self.state else { preconditionFailure("How can we receive LOB data in \(self.state)") } + context.boolFlag = parameter.boolFlag return .wait // waiting for error } diff --git a/Sources/OracleNIO/Constants.swift b/Sources/OracleNIO/Constants.swift index 41ee236..6e79f6b 100644 --- a/Sources/OracleNIO/Constants.swift +++ b/Sources/OracleNIO/Constants.swift @@ -159,7 +159,7 @@ enum Constants { static let TNS_LOB_LOCATOR_FLAGS_VAR_LENGTH_CHARSET = 0x80 // MARK: Other LOB constants - static let TNS_LOB_OPEN_READ_WRITE = 2 + static let TNS_LOB_OPEN_READ_WRITE: UInt64 = 2 static let TNS_LOB_PREFETCH_FLAG: UInt32 = 0x2000000 // MARK: Base JSON constants diff --git a/Sources/OracleNIO/Data/LOB.swift b/Sources/OracleNIO/Data/LOB.swift index 759bd82..0b02436 100644 --- a/Sources/OracleNIO/Data/LOB.swift +++ b/Sources/OracleNIO/Data/LOB.swift @@ -252,6 +252,71 @@ extension LOB { } } + /// Open the LOB for multiple ``write(_:at:on:)`` for improved performance. + /// + /// If this is not called before writing, every write operation opens and closes the LOB internally. + /// + /// Call ``close(on:)`` after you are done writing to the LOB. + /// + /// - Parameter connection: The connection used to open the LOB. + /// This has to be the same one the LOB reference was created on. + public func open(on connection: OracleConnection) async throws { + let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self) + connection.channel.write( + OracleTask.lobOperation( + .init( + sourceLOB: self, + sourceOffset: 0, + destinationLOB: nil, + destinationOffset: 0, + operation: .open, + sendAmount: true, + amount: Constants.TNS_LOB_OPEN_READ_WRITE, + promise: promise + )), promise: nil) + _ = try await promise.futureResult.get() + } + /// Checks if the LOB is currently open for ``write(_:at:on:)`` operations. + /// + /// - Parameter connection: The connection used to check the status of the LOB on. + /// This has to be the same one the LOB reference was created on. + public func isOpen(on connection: OracleConnection) async throws -> Bool { + let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self) + let context = LOBOperationContext( + sourceLOB: self, + sourceOffset: 0, + destinationLOB: nil, + destinationOffset: 0, + operation: .isOpen, + sendAmount: false, + amount: 0, + promise: promise + ) + connection.channel.write(OracleTask.lobOperation(context), promise: nil) + _ = try await promise.futureResult.get() + return context.boolFlag ?? false + } + /// Closes the LOB if it is currently open for ``write(_:at:on:)`` operations. + /// + /// - Parameter connection: The connection used to close the LOB. + /// This has to be the same one the LOB reference was created on. + public func close(on connection: OracleConnection) async throws { + let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self) + connection.channel.write( + OracleTask.lobOperation( + .init( + sourceLOB: self, + sourceOffset: 0, + destinationLOB: nil, + destinationOffset: 0, + operation: .close, + sendAmount: false, + amount: 0, + promise: promise + )), promise: nil) + _ = try await promise.futureResult.get() + } + /// Write data to the LOB starting on the specified offset. /// - Parameters: /// - buffer: The chunk of data which should be written to the LOB. diff --git a/Sources/OracleNIO/Messages/OracleBackendMessage+Parameter.swift b/Sources/OracleNIO/Messages/OracleBackendMessage+Parameter.swift index 146c6b5..f8f21a0 100644 --- a/Sources/OracleNIO/Messages/OracleBackendMessage+Parameter.swift +++ b/Sources/OracleNIO/Messages/OracleBackendMessage+Parameter.swift @@ -162,8 +162,8 @@ extension OracleBackendMessage { if context.lobContext?.operation == .createTemp || context.lobContext?.operation == .isOpen { - let temp16 = try buffer.throwingReadUB2() // flag - boolFlag = temp16 > 0 + // flag + boolFlag = try buffer.throwingReadInteger(as: UInt8.self) > 0 } else { boolFlag = nil } diff --git a/Sources/OracleNIO/OracleChannelHandler.swift b/Sources/OracleNIO/OracleChannelHandler.swift index d1b58e4..a819766 100644 --- a/Sources/OracleNIO/OracleChannelHandler.swift +++ b/Sources/OracleNIO/OracleChannelHandler.swift @@ -514,10 +514,9 @@ final class OracleChannelHandler: ChannelDuplexHandler { switch lobContext.operation { case .read: lobContext.promise.succeed(lobContext.data) - case .write, .trim: + case .open, .isOpen, .close, .write, .trim: lobContext.promise.succeed(nil) - case .getLength, .getChunkSize, .createTemp, .freeTemp, .open, .close, - .isOpen, .array: + case .getLength, .getChunkSize, .createTemp, .freeTemp, .array: fatalError("not yet supported") } self.run(self.state.readyForStatementReceived(), with: context) diff --git a/Sources/OracleNIO/OracleTask.swift b/Sources/OracleNIO/OracleTask.swift index a0acefd..1a558bd 100644 --- a/Sources/OracleNIO/OracleTask.swift +++ b/Sources/OracleNIO/OracleTask.swift @@ -53,6 +53,7 @@ final class LOBOperationContext { let amount: UInt64 let promise: EventLoopPromise + var boolFlag: Bool? var data: ByteBuffer? init( diff --git a/Tests/IntegrationTests/LOBTests.swift b/Tests/IntegrationTests/LOBTests.swift index 3ad54bf..2e9b177 100644 --- a/Tests/IntegrationTests/LOBTests.swift +++ b/Tests/IntegrationTests/LOBTests.swift @@ -135,6 +135,35 @@ final class LOBTests: XCTIntegrationTest { try await validateLOB(expected: buffer, on: connection) } + func testWriteLOBStreamWithExplicitOpenAndClose() async throws { + let data = try Data(contentsOf: fileURL) + var buffer = ByteBuffer(data: data) + let lobRef = OracleRef(dataType: .blob, isReturnBind: true) + try await connection.execute( + "INSERT INTO test_simple_blob (id, content) VALUES (1, empty_blob()) RETURNING content INTO \(lobRef)", + options: .init(fetchLOBs: true) + ) + let lob = try lobRef.decode(of: LOB.self) + var offset: UInt64 = 1 + let chunkSize = 65536 + try await lob.open(on: connection) + while buffer.readableBytes > 0, + let slice = + buffer + .readSlice(length: min(chunkSize, buffer.readableBytes)) + { + try await lob.write(slice, at: offset, on: connection) + offset += UInt64(slice.readableBytes) + } + let isOpen = try await lob.isOpen(on: connection) + XCTAssertTrue(isOpen) + if isOpen { + try await lob.close(on: connection) + } + buffer.moveReaderIndex(to: 0) + try await validateLOB(expected: buffer, on: connection) + } + func testTrimLOB() async throws { let data = try Data(contentsOf: fileURL) let buffer = ByteBuffer(data: data)