Skip to content

Commit

Permalink
Add support for temporary LOBs and streamline public LOB API (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
lovetodream authored Jun 30, 2024
1 parent 73c09e8 commit 4a2274a
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,7 @@ struct ConnectionStateMachine {
guard case .lobOperation(let context) = self.state else {
preconditionFailure("How can we receive LOB data in \(self.state)")
}
context.fetchedAmount = parameter.amount
context.boolFlag = parameter.boolFlag
return .wait // waiting for error
}
Expand Down Expand Up @@ -1039,7 +1040,8 @@ extension ConnectionStateMachine {
.unexpectedBackendMessage,
.serverVersionNotSupported,
.sidNotSupported,
.uncleanShutdown:
.uncleanShutdown,
.unsupportedDataType:
return true
case .statementCancelled, .nationalCharsetNotSupported:
return false
Expand Down
2 changes: 1 addition & 1 deletion Sources/OracleNIO/Constants.swift
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ enum Constants {
static let TNS_MAX_ROWID_LENGTH = 18
static let TNS_DURATION_MID: UInt32 = 0x8000_0000
static let TNS_DURATION_OFFSET: UInt8 = 60
static let TNS_DURATION_SESSION: Int64 = 10
static let TNS_DURATION_SESSION: UInt64 = 10
@usableFromInline
static let TNS_MIN_LONG_LENGTH = 0x8000
static let TNS_MAX_LONG_LENGTH: UInt32 = 0x7fff_ffff
Expand Down
195 changes: 129 additions & 66 deletions Sources/OracleNIO/Data/LOB.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import NIOCore
/// options: .init(fetchLOBs: true)
/// )
/// let lob = try lobRef.decode(of: LOB.self)
/// var offset: UInt64 = 1
/// var offset = 1
/// let chunkSize = 65536
/// while
/// buffer.readableBytes > 0,
Expand All @@ -81,71 +81,31 @@ import NIOCore
/// }
/// ```
public final class LOB: Sendable {
/// The total size of the data in the LOB.
///
/// Bytes for BLOBs and USC-2 code points for CLOBs.
/// USC-2 code points are equivalent to characters for all but supplemental characters.
public let size: UInt64
/// Reading and writing to the LOB in chunks of multiples of this size will improve performance.
public let chunkSize: UInt32
private let _size: UInt64
private let _chunkSize: UInt32

let locator: NIOLockedValueBox<[UInt8]>
private let hasMetadata: Bool

public let dbType: OracleDataType
public let oracleType: OracleDataType

init(
size: UInt64,
chunkSize: UInt32,
locator: [UInt8],
hasMetadata: Bool,
dbType: OracleDataType
oracleType: OracleDataType
) {
self.size = size
self.chunkSize = chunkSize
self._size = size
self._chunkSize = chunkSize
self.locator = .init(locator)
self.hasMetadata = hasMetadata
self.dbType = dbType
}

static func create(dbType: OracleDataType, locator: [UInt8]?) -> Self {
if let locator {
return self.init(
size: 0,
chunkSize: 0,
locator: locator,
hasMetadata: false,
dbType: dbType
)
} else {
let locator = [UInt8](repeating: 0, count: 40)
let lob = self.init(
size: 0,
chunkSize: 0,
locator: locator,
hasMetadata: false,
dbType: dbType
)
// TODO: create temp lob on db
return lob
}
}

func encoding() -> String {
let locator = self.locator.withLockedValue { $0 }
if dbType.csfrm == Constants.TNS_CS_NCHAR
|| (locator.count >= Constants.TNS_LOB_LOCATOR_OFFSET_FLAG_3
&& (locator[Constants.TNS_LOB_LOCATOR_OFFSET_FLAG_3]
& Constants.TNS_LOB_LOCATOR_VAR_LENGTH_CHARSET) != 0)
{
return Constants.TNS_ENCODING_UTF16
}
return Constants.TNS_ENCODING_UTF8
self.oracleType = oracleType
}

func _read(
offset: UInt64 = 1,
amount: UInt64? = nil,
offset: UInt64,
amount: UInt64,
on connection: OracleConnection
) async throws -> ByteBuffer? {
let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self)
Expand All @@ -158,7 +118,7 @@ public final class LOB: Sendable {
destinationOffset: 0,
operation: .read,
sendAmount: true,
amount: amount ?? .init(self.chunkSize),
amount: amount,
promise: promise
)), promise: nil)
return try await promise.futureResult.get()
Expand Down Expand Up @@ -195,13 +155,13 @@ extension LOB {
/// - Returns: An async sequence used to iterate over
/// the chunks of data read from the connection.
public func readChunks(
ofSize chunkSize: UInt64? = nil,
ofSize chunkSize: Int? = nil,
on connection: OracleConnection
) -> ReadSequence {
ReadSequence(
self,
connection: connection,
chunkSize: chunkSize ?? .init(self.chunkSize)
chunkSize: UInt64(chunkSize ?? .init(self._chunkSize))
)
}

Expand Down Expand Up @@ -236,7 +196,6 @@ extension LOB {
var chunkSize: UInt64

public mutating func next() async throws -> ByteBuffer? {
if self.offset >= self.base.size { return nil }
guard
let chunk = try await self.base._read(
offset: self.offset,
Expand Down Expand Up @@ -325,15 +284,15 @@ extension LOB {
/// This has to be the same one the LOB reference was created on.
public func write(
_ buffer: ByteBuffer,
at offset: UInt64 = 1,
at offset: Int = 1,
on connection: OracleConnection
) async throws {
let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self)
connection.channel.write(
OracleTask.lobOperation(
.init(
sourceLOB: self,
sourceOffset: offset,
sourceOffset: UInt64(offset),
destinationLOB: nil,
destinationOffset: 0,
operation: .write,
Expand All @@ -351,7 +310,7 @@ extension LOB {
/// - connection: The connection used to trim the LOB.
/// This has to be the same one the LOB reference was created on.
public func trim(
to newSize: UInt64,
to newSize: Int,
on connection: OracleConnection
) async throws {
let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self)
Expand All @@ -364,31 +323,135 @@ extension LOB {
destinationOffset: 0,
operation: .trim,
sendAmount: true,
amount: newSize,
amount: UInt64(newSize),
promise: promise
)), promise: nil)
_ = try await promise.futureResult.get()
}

/// Create a temporary LOB on the given connection.
///
/// The temporary LOB lives until the connection is closed or explicitly freed by calling
/// ``free(on:)``.
///
/// It can be inserted in a table at a later point as long as the connection lives.
public static func create(
_ oracleType: OracleDataType,
on connection: OracleConnection
) async throws -> LOB {
switch oracleType {
case .blob, .clob, .nCLOB:
let locator = [UInt8](repeating: 0, count: 40)
let lob = self.init(
size: 0,
chunkSize: 0,
locator: locator,
hasMetadata: false,
oracleType: oracleType
)
let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self)
connection.channel.write(
OracleTask.lobOperation(
.init(
sourceLOB: lob,
sourceOffset: UInt64(oracleType.csfrm),
destinationLOB: nil,
destinationOffset: UInt64(oracleType._oracleType?.rawValue ?? 0),
operation: .createTemp,
sendAmount: true,
amount: Constants.TNS_DURATION_SESSION,
promise: promise
)), promise: nil)
_ = try await promise.futureResult.get()
return lob
default:
throw OracleSQLError.unsupportedDataType
}
}

/// Frees/removes a temporary LOB from the given connection
/// with the next round trip to the database.
public func free(on connection: OracleConnection) async throws {
let handler = try await connection.channel.pipeline
.handler(type: OracleChannelHandler.self).get()
self.free(from: handler.cleanupContext)
}

/// Retrieve the total size of the data in the LOB.
///
/// Bytes for BLOBs and USC-2 code points for CLOBs.
/// USC-2 code points are equivalent to characters for all but supplemental characters.
public func size(on connection: OracleConnection) async throws -> Int {
let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self)
let context = LOBOperationContext(
sourceLOB: self,
sourceOffset: 0,
destinationLOB: nil,
destinationOffset: 0,
operation: .getLength,
sendAmount: true,
amount: 0,
promise: promise
)
connection.channel.write(OracleTask.lobOperation(context), promise: nil)
_ = try await promise.futureResult.get()
return Int(context.fetchedAmount ?? 0)
}

/// The total size of the LOB data when it was first received from the database.
///
/// It might have changed already. To get the up-to-date size use ``size(on:)``.
public var estimatedSize: Int { Int(self._size) }


/// Reading and writing to the LOB in chunks of multiples of this size will improve performance.
public func chunkSize(on connection: OracleConnection) async throws -> Int {
let promise = connection.eventLoop.makePromise(of: ByteBuffer?.self)
let context = LOBOperationContext(
sourceLOB: self,
sourceOffset: 0,
destinationLOB: nil,
destinationOffset: 0,
operation: .getChunkSize,
sendAmount: true,
amount: 0,
promise: promise
)
connection.channel.write(OracleTask.lobOperation(context), promise: nil)
_ = try await promise.futureResult.get()
return Int(context.fetchedAmount ?? Int64(self._chunkSize))
}

/// Reading and writing to the LOB in chunks of multiples of this size will improve performance.
///
/// This is the ideal chunk size at the time of fetching the LOB initially,
/// it falls back to a sensible default if the underlying value is `0`.
/// It might have changed in the meantime, to get the current chunk size use ``chunkSize(on:)``.
public var estimatedChunkSize: Int {
if self._chunkSize == 0 {
8060
} else {
Int(self._chunkSize)
}
}
}

extension LOB: OracleEncodable {
public var oracleType: OracleDataType { .blob }

public func encode<JSONEncoder: OracleJSONEncoder>(
into buffer: inout ByteBuffer,
context: OracleEncodingContext<JSONEncoder>
) {
preconditionFailure("This should not be called")
let locator = self.locator.withLockedValue { $0 }
let length = locator.count
buffer.writeUB4(UInt32(length))
ByteBuffer(bytes: locator)._encodeRaw(into: &buffer, context: context)
}

public func _encodeRaw<JSONEncoder: OracleJSONEncoder>(
into buffer: inout ByteBuffer,
context: OracleEncodingContext<JSONEncoder>
) {
let locator = self.locator.withLockedValue { $0 }
let length = locator.count
buffer.writeUB4(UInt32(length))
ByteBuffer(bytes: locator)._encodeRaw(into: &buffer, context: context)
self.encode(into: &buffer, context: context)
}
}

Expand All @@ -408,7 +471,7 @@ extension LOB: OracleDecodable {
chunkSize: chunkSize,
locator: Array(buffer: locator),
hasMetadata: true,
dbType: type
oracleType: type
)
default:
throw OracleDecodingError.Code.typeMismatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,9 @@ struct OracleFrontendMessageEncoder {
self.buffer.writeBytes(destinationLOB.locator.withLockedValue({ $0 }))
}
if context.operation == .createTemp {
if let sourceLOB = context.sourceLOB, sourceLOB.dbType.csfrm == Constants.TNS_CS_NCHAR {
if let sourceLOB = context.sourceLOB,
sourceLOB.oracleType.csfrm == Constants.TNS_CS_NCHAR
{
try self.capabilities.checkNCharsetID()
self.buffer.writeUB4(UInt32(Constants.TNS_CHARSET_UTF16))
} else {
Expand Down
12 changes: 6 additions & 6 deletions Sources/OracleNIO/Messages/OracleBackendMessage+Parameter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -149,19 +149,19 @@ extension OracleBackendMessage {
$0 = newLocator
}
}
let amount: Int64?
if context.lobContext?.operation == .createTemp {
buffer.skipUB2() // skip character set
}
let amount: Int64?
if context.lobContext?.sendAmount == true {
// skip trailing flags, amount
buffer.moveReaderIndex(forwardBy: 3)
amount = nil
} else if context.lobContext?.sendAmount == true {
amount = try buffer.throwingReadSB8()
} else {
amount = nil
}
let boolFlag: Bool?
if context.lobContext?.operation == .createTemp
|| context.lobContext?.operation == .isOpen
{
if context.lobContext?.operation == .isOpen {
// flag
boolFlag = try buffer.throwingReadInteger(as: UInt8.self) > 0
} else {
Expand Down
9 changes: 6 additions & 3 deletions Sources/OracleNIO/OracleChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,13 @@ final class OracleChannelHandler: ChannelDuplexHandler {
switch lobContext.operation {
case .read:
lobContext.promise.succeed(lobContext.data)
case .open, .isOpen, .close, .write, .trim:
case .open, .isOpen, .close, .write, .trim, .createTemp, .getLength,
.getChunkSize:
lobContext.promise.succeed(nil)
case .getLength, .getChunkSize, .createTemp, .freeTemp, .array:
fatalError("not yet supported")
case .freeTemp, .array:
preconditionFailure(
"Invalid lob operation: \(lobContext.operation)"
)
}
self.run(self.state.readyForStatementReceived(), with: context)
case .failLOBOperation(let promise, let error):
Expand Down
Loading

0 comments on commit 4a2274a

Please sign in to comment.