Skip to content

Commit

Permalink
Fix decoding issue when reading a huge amount of rows with a lot of d…
Browse files Browse the repository at this point in the history
…ata in each one (#75)
  • Loading branch information
lovetodream authored Nov 13, 2024
1 parent 6d72308 commit e9f8439
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 81 deletions.
17 changes: 11 additions & 6 deletions Sources/OracleNIO/Extensions/ByteBuffer/ByteBuffer+OSON.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
import NIOCore

extension ByteBuffer {
mutating func readOSON() throws -> ByteBuffer? {
guard let length = self.readUB4(), length > 0 else {
mutating func throwingReadOSON() throws -> ByteBuffer? {
let length = try self.throwingReadUB4()
guard length > 0 else {
return ByteBuffer(bytes: [0])
}
self.skipUB8() // size (unused)
self.skipUB4() // chunk size (unused)
let data = self.readOracleSlice()
self.skipRawBytesChunked() // lob locator (unused)
try self.throwingSkipUB8() // size (unused)
try self.throwingSkipUB4() // chunk size (unused)
guard let data = self.readOracleSlice() else {
return nil
}
if !self.skipRawBytesChunked() { // lob locator (unused)
return nil
}
return data
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the OracleNIO open source project
//
// Copyright (c) 2024 Timo Zacherl and the OracleNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE for license information
// See CONTRIBUTORS.md for the list of OracleNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

extension ByteBuffer {
@inline(__always)
mutating func throwingMoveReaderIndex(forwardBy: Int, file: String = #fileID, line: Int = #line) throws {
if self.readableBytes < forwardBy {
throw OraclePartialDecodingError.expectedAtLeastNRemainingBytes(
forwardBy,
actual: self.readableBytes,
file: file,
line: line
)
}
self.moveReaderIndex(forwardBy: forwardBy)
}
}
48 changes: 39 additions & 9 deletions Sources/OracleNIO/Extensions/ByteBuffer/ByteBuffer+UB.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
import struct NIOCore.ByteBuffer

extension ByteBuffer {
mutating func throwingSkipUB1(file: String = #fileID, line: Int = #line) throws {
try self.throwingMoveReaderIndex(forwardBy: 1, file: file, line: line)
}

mutating func skipUB2() {
guard let length = readUBLength() else { return }
guard length <= 2 else { preconditionFailure() }
self.moveReaderIndex(forwardBy: Int(length))
skipUB(2)
}

mutating func throwingSkipUB2(file: String = #fileID, line: Int = #line) throws {
try throwingSkipUB(4, file: file, line: line)
}

mutating func readUB2() -> UInt16? {
Expand Down Expand Up @@ -77,9 +83,11 @@ extension ByteBuffer {
}

mutating func skipUB4() {
guard let length = readUBLength() else { return }
guard length <= 4 else { preconditionFailure() }
self.moveReaderIndex(forwardBy: Int(length))
skipUB(4)
}

mutating func throwingSkipUB4(file: String = #fileID, line: Int = #line) throws {
try throwingSkipUB(4, file: file, line: line)
}

mutating func readUB8() -> UInt64? {
Expand Down Expand Up @@ -115,9 +123,11 @@ extension ByteBuffer {
}

mutating func skipUB8() {
guard let length = readUBLength() else { return }
guard length <= 8 else { fatalError() }
self.moveReaderIndex(forwardBy: Int(length))
skipUB(8)
}

mutating func throwingSkipUB8(file: String = #fileID, line: Int = #line) throws {
try throwingSkipUB(8, file: file, line: line)
}

mutating func readUBLength() -> UInt8? {
Expand Down Expand Up @@ -175,6 +185,26 @@ extension ByteBuffer {
self.writeInteger(integer)
}
}

@inline(__always)
private mutating func skipUB(_ maxLength: Int) {
guard let length = readUBLength() else { return }
guard length <= maxLength else { preconditionFailure() }
self.moveReaderIndex(forwardBy: Int(length))
}

@inline(__always)
private mutating func throwingSkipUB(_ maxLength: Int, file: String = #fileID, line: Int = #line) throws {
guard let length = readUBLength().flatMap(Int.init) else {
throw OraclePartialDecodingError.expectedAtLeastNRemainingBytes(
MemoryLayout<UInt8>.size,
actual: self.readableBytes,
file: file, line: line
)
}
guard length <= maxLength else { preconditionFailure() }
try self.throwingMoveReaderIndex(forwardBy: length, file: file, line: line)
}
}

extension ByteBuffer {
Expand Down
19 changes: 17 additions & 2 deletions Sources/OracleNIO/Messages/Coding/OraclePartialDecodingError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@
//===----------------------------------------------------------------------===//

struct OraclePartialDecodingError: Error {
enum Category {
case expectedAtLeastNRemainingBytes
case fieldNotDecodable
case unsupportedDataType
case unknownMessageID
case unknownControlType
}

let category: Category

/// A textual description of the error.
let description: String

Expand All @@ -27,6 +37,7 @@ struct OraclePartialDecodingError: Error {
file: String = #fileID, line: Int = #line
) -> Self {
OraclePartialDecodingError(
category: .expectedAtLeastNRemainingBytes,
description: "Expected at least '\(expected)' remaining bytes. But found \(actual).",
file: file, line: line
)
Expand All @@ -36,24 +47,27 @@ struct OraclePartialDecodingError: Error {
type: Any.Type, file: String = #fileID, line: Int = #line
) -> Self {
OraclePartialDecodingError(
category: .fieldNotDecodable,
description: "Could not read '\(type)' from ByteBuffer.", file: file, line: line)
}

static func unsupportedDataType(
type: _TNSDataType, file: String = #fileID, line: Int = #line
) -> Self {
OraclePartialDecodingError(
category: .unsupportedDataType,
description: "Could not process unsupported data type '\(type)'.",
file: file, line: line
)
}

static func unknownMessageIDReceived(
static func unknownMessageID(
messageID: UInt8,
file: String = #fileID,
line: Int = #line
) -> Self {
OraclePartialDecodingError(
category: .unknownMessageID,
description: """
Received a message with messageID '\(messageID)'. There is no \
message type associated with this message identifier.
Expand All @@ -63,12 +77,13 @@ struct OraclePartialDecodingError: Error {
)
}

static func unknownControlTypeReceived(
static func unknownControlType(
controlType: UInt16,
file: String = #fileID,
line: Int = #line
) -> Self {
OraclePartialDecodingError(
category: .unknownControlType,
description: """
Received a control packet with control type '\(controlType)'.
This is unhandled and should be reported, please file an issue.
Expand Down
99 changes: 47 additions & 52 deletions Sources/OracleNIO/Messages/OracleBackendMessage+BackendError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ extension OracleBackendMessage {
// error number
let length = try buffer.throwingReadInteger(as: UInt16.self)
// length of error message
buffer.moveReaderIndex(forwardBy: 2) // skip flags
let errorMessage: String?
if number != 0 && length > 0 {
errorMessage = buffer.readString(length: Int(length))
} else {
errorMessage = nil
}
try buffer.throwingMoveReaderIndex(forwardBy: 2) // skip flags
let errorMessage: String? =
if number != 0 && length > 0 {
try buffer.throwingReadString(length: Int(length))
} else {
nil
}
return .init(
number: UInt32(number),
isWarning: true,
Expand All @@ -52,99 +52,94 @@ extension OracleBackendMessage {
from buffer: inout ByteBuffer,
context: OracleBackendMessageDecoder.Context
) throws -> OracleBackendMessage.BackendError {
_ = try buffer.throwingReadUB4() // end of call status
buffer.skipUB2() // end to end seq#
buffer.skipUB4() // current row number
buffer.skipUB2() // error number
buffer.skipUB2() // array elem error
buffer.skipUB2() // array elem error
let cursorID = buffer.readUB2() // cursor id
let errorPosition = buffer.readUB2() // error position
buffer.moveReaderIndex(forwardBy: 1) // sql type
buffer.moveReaderIndex(forwardBy: 1) // fatal?
buffer.moveReaderIndex(forwardBy: 1) // flags
buffer.moveReaderIndex(forwardBy: 1) // user cursor options
buffer.moveReaderIndex(forwardBy: 1) // UDI parameter
buffer.moveReaderIndex(forwardBy: 1) // warning flag
try buffer.throwingSkipUB4() // end of call status
try buffer.throwingSkipUB2() // end to end seq#
try buffer.throwingSkipUB4() // current row number
try buffer.throwingSkipUB2() // error number
try buffer.throwingSkipUB2() // array elem error
try buffer.throwingSkipUB2() // array elem error
let cursorID = try buffer.throwingReadUB2() // cursor id
let errorPosition = try buffer.throwingReadUB2() // error position
try buffer.throwingSkipUB1() // sql type
try buffer.throwingSkipUB1() // fatal?
try buffer.throwingSkipUB1() // flags
try buffer.throwingSkipUB1() // user cursor options
try buffer.throwingSkipUB1() // UDI parameter
try buffer.throwingSkipUB1() // warning flag
let rowID = try RowID(from: &buffer, type: .rowID, context: .default)
buffer.skipUB4() // OS error
buffer.moveReaderIndex(forwardBy: 1) // statement number
buffer.moveReaderIndex(forwardBy: 1) // call number
buffer.skipUB2() // padding
buffer.skipUB4() // success iters
if let byteCount = buffer.readUB4(), byteCount > 0 {
try buffer.throwingSkipUB4() // OS error
try buffer.throwingSkipUB1() // statement number
try buffer.throwingSkipUB1() // call number
try buffer.throwingSkipUB2() // padding
try buffer.throwingSkipUB4() // success iters
let byteCount = try buffer.throwingReadUB4()
if byteCount > 0 {
buffer.skipRawBytesChunked() // oerrdd (logical rowid)
}

// batch error codes
let numberOfCodes =
try buffer.throwingReadUB2() // batch error codes array
let numberOfCodes = try buffer.throwingReadUB2() // batch error codes array
var batch = [OracleError]()
if numberOfCodes > 0 {
let firstByte = try buffer.throwingReadInteger(as: UInt8.self)
for _ in 0..<numberOfCodes {
if firstByte == Constants.TNS_LONG_LENGTH_INDICATOR {
buffer.skipUB4() // chunk length ignored
try buffer.throwingSkipUB4() // chunk length ignored
}
let errorCode = try buffer.throwingReadUB2()
batch.append(.init(code: Int(errorCode)))
}
if firstByte == Constants.TNS_LONG_LENGTH_INDICATOR {
buffer.moveReaderIndex(forwardBy: 1) // ignore end marker
try buffer.throwingSkipUB1() // ignore end marker
}
}

// batch error offsets
let numberOfOffsets =
try buffer.throwingReadUB2() // batch error row offset array
let numberOfOffsets = try buffer.throwingReadUB2() // batch error row offset array
if numberOfOffsets > 0 {
let firstByte = try buffer.throwingReadInteger(as: UInt8.self)
for i in 0..<numberOfOffsets {
if firstByte == Constants.TNS_LONG_LENGTH_INDICATOR {
buffer.skipUB4() // chunked length ignored
try buffer.throwingSkipUB4() // chunked length ignored
}
let offset = try buffer.throwingReadUB4()
batch[Int(i)].offset = Int(offset)
}
if firstByte == Constants.TNS_LONG_LENGTH_INDICATOR {
buffer.moveReaderIndex(forwardBy: 1) // ignore end marker
try buffer.throwingSkipUB1() // ignore end marker
}
}

// batch error messages
let numberOfMessages =
try buffer.throwingReadUB2() // batch error messages array
let numberOfMessages = try buffer.throwingReadUB2() // batch error messages array
if numberOfMessages > 0 {
buffer.moveReaderIndex(forwardBy: 1) // ignore packet size
try buffer.throwingSkipUB1() // ignore packet size
for i in 0..<numberOfMessages {
buffer.skipUB2() // skip chunk length
try buffer.throwingSkipUB2() // skip chunk length
let errorMessage =
try buffer
.readString()
.trimmingCharacters(in: .whitespaces)
batch[Int(i)].message = errorMessage
buffer.moveReaderIndex(forwardBy: 2) // ignore end marker
try buffer.throwingMoveReaderIndex(forwardBy: 2) // ignore end marker
}
}

let number = try buffer.throwingReadUB4()
let rowCount = buffer.readUB8()
let rowCount = try buffer.throwingReadUB8()

// fields added with 20c
if context.capabilities.ttcFieldVersion >= Constants.TNS_CCAP_FIELD_VERSION_20_1 {
buffer.skipUB4() // sql type
buffer.skipUB4() // server checksum
try buffer.throwingSkipUB4() // sql type
try buffer.throwingSkipUB4() // server checksum
}

let errorMessage: String?
if number != 0 {
errorMessage =
try buffer
.readString()
.trimmingCharacters(in: .whitespaces)
} else {
errorMessage = nil
}
let errorMessage: String? =
if number != 0 {
try buffer.readString().trimmingCharacters(in: .whitespaces)
} else {
nil
}

return .init(
number: number,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ extension OracleBackendMessage {
columnValue = .init(bytes: [0]) // empty buffer
}
case .json:
switch try buffer.readOSON() {
switch try buffer.throwingReadOSON() {
case .some(let slice):
columnValue = slice
case .none:
Expand Down
Loading

0 comments on commit e9f8439

Please sign in to comment.