Skip to content

Commit

Permalink
feat: draft decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
lovetodream committed Jul 5, 2024
1 parent 4a2274a commit a3213b5
Show file tree
Hide file tree
Showing 10 changed files with 424 additions and 125 deletions.
172 changes: 98 additions & 74 deletions Sources/OracleNIO/ConnectionStateMachine/StatementStateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ struct StatementStateMachine {
// This state might occur, if the client cancelled the statement,
// but the server did not yet receive/process the cancellation
// marker. Due to that it might send more data without knowing yet.
// TODO: check if we need to forward row header here too
return .wait

case .initialized, .streamingAndWaiting, .error, .commandComplete:
Expand All @@ -216,82 +217,104 @@ struct StatementStateMachine {
) -> Action {
switch self.state {
case .initialized(let context):
let outBinds = context.statement.binds.metadata.compactMap(\.outContainer)
guard !outBinds.isEmpty else { preconditionFailure() }
var buffer = rowData.slice
if context.isReturning {
for outBind in outBinds {
outBind.storage.withLockedValue { $0 = nil }
let rowCount = buffer.readUB4() ?? 0
guard rowCount > 0 else {
continue
}

do {
for _ in 0..<rowCount {
try self.processBindData(
from: &buffer,
outBind: outBind,
capabilities: capabilities
)
}
} catch {
guard let error = error as? OracleSQLError else {
preconditionFailure("Unexpected error: \(error)")
}
return self.setAndFireError(error)
}
}
// let outBinds = context.statement.binds.metadata.compactMap(\.outContainer)
// guard !outBinds.isEmpty else { preconditionFailure() }
// var buffer = rowData.slice
// if context.isReturning {
// for outBind in outBinds {
// outBind.storage.withLockedValue { $0 = nil }
// let rowCount = buffer.readUB4() ?? 0
// guard rowCount > 0 else {
// continue
// }
//
// do {
// for _ in 0..<rowCount {
// try self.processBindData(
// from: &buffer,
// outBind: outBind,
// capabilities: capabilities
// )
// }
// } catch {
// guard let error = error as? OracleSQLError else {
// preconditionFailure("Unexpected error: \(error)")
// }
// return self.setAndFireError(error)
// }
// }
//
// return self.moreDataReceived(
// &buffer,
// capabilities: capabilities,
// context: context,
// describeInfo: nil
// )
// } else {
// for outBind in outBinds {
// outBind.storage.withLockedValue { $0 = nil }
// do {
// try self.processBindData(
// from: &buffer,
// outBind: outBind,
// capabilities: capabilities
// )
// } catch {
// guard let error = error as? OracleSQLError else {
// preconditionFailure("Unexpected error: \(error)")
// }
// return self.setAndFireError(error)
// }
// }
//
// return self.moreDataReceived(
// &buffer,
// capabilities: capabilities,
// context: context,
// describeInfo: nil
// )
// }
// TODO: needs stuff here
fatalError("TODO: assign bindings")
return .wait

return self.moreDataReceived(
&buffer,
capabilities: capabilities,
context: context,
describeInfo: nil
)
} else {
for outBind in outBinds {
outBind.storage.withLockedValue { $0 = nil }
do {
try self.processBindData(
from: &buffer,
outBind: outBind,
capabilities: capabilities
)
} catch {
guard let error = error as? OracleSQLError else {
preconditionFailure("Unexpected error: \(error)")
}
return self.setAndFireError(error)
}
case .streaming(let context, let describeInfo, let rowHeader, var demandStateMachine):
var out = ByteBuffer()
for column in rowData.columns {
switch column {
case .data(var buffer):
out.writeBuffer(&buffer)
case .duplicate(let index):
var data = demandStateMachine.receivedDuplicate(at: index)
try! out.writeLengthPrefixed(as: UInt8.self) { buffer in
buffer.writeBuffer(&data)
} // must work
}

return self.moreDataReceived(
&buffer,
capabilities: capabilities,
context: context,
describeInfo: nil
)
}

case .streaming(let context, let describeInfo, _, _):
var buffer = rowData.slice
let action = self.rowDataReceived0(
buffer: &buffer, capabilities: capabilities
)

switch action {
case .wait:
return self.moreDataReceived(
&buffer,
capabilities: capabilities,
context: context,
describeInfo: describeInfo
let row = DataRow(columnCount: describeInfo.columns.count, bytes: out)
demandStateMachine.receivedRow(row)
self.avoidingStateMachineCoWVoid { state in
state = .streaming(
context, describeInfo, rowHeader, demandStateMachine
)

default:
return action
}
return .wait
// let action = self.rowDataReceived0(
// buffer: &buffer, capabilities: capabilities
// )
//
// switch action {
// case .wait:
// return self.moreDataReceived(
// &buffer,
// capabilities: capabilities,
// context: context,
// describeInfo: describeInfo
// )
//
// default:
// return action
// }

case .drain:
// This state might occur, if the client cancelled the statement,
Expand Down Expand Up @@ -811,8 +834,8 @@ struct StatementStateMachine {
do {
let decodingContext = OracleBackendMessageDecoder.Context(
capabilities: capabilities)
decodingContext.statementOptions = context.options
decodingContext.columnsCount = describeInfo?.columns.count
// TODO: move this out of here
decodingContext.statementContext = context
var messages: TinySequence<OracleBackendMessage> = []
try OracleBackendMessage.decodeData(
from: &slice,
Expand All @@ -830,7 +853,8 @@ struct StatementStateMachine {
case .rowHeader(let rowHeader):
action = self.rowHeaderReceived(rowHeader)
case .rowData(let rowData):
buffer = rowData.slice
fatalError("TODO")
buffer = ByteBuffer()
action = self.rowDataReceived0(
buffer: &buffer, capabilities: capabilities
)
Expand Down
25 changes: 13 additions & 12 deletions Sources/OracleNIO/Messages/Coding/OracleBackendMessageDecoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,22 @@ struct OracleBackendMessageDecoder: ByteToMessageDecoder {

final class Context {
var capabilities: Capabilities
var performingChunkedRead = false
var statementOptions: StatementOptions? = nil
var columnsCount: Int? = nil
var performingChunkedRead = false // TODO: remove

var statementContext: StatementContext?
var bitVector: [UInt8]?
var describeInfo: DescribeInfo?

var lobContext: LOBOperationContext?

init(
capabilities: Capabilities,
performingChunkedRead: Bool = false,
statementOptions: StatementOptions? = nil,
columnsCount: Int? = nil
) {
init(capabilities: Capabilities) {
self.capabilities = capabilities
self.performingChunkedRead = performingChunkedRead
self.statementOptions = statementOptions
self.columnsCount = columnsCount
}

func clearStatementContext() {
self.statementContext = nil
self.bitVector = nil
self.describeInfo = nil
}
}

Expand Down
1 change: 0 additions & 1 deletion Sources/OracleNIO/Messages/DescribeInfo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ struct DescribeInfo: OracleBackendMessage.PayloadDecodable, Sendable, Hashable {
) throws -> DescribeInfo {
buffer.skipUB4() // max row size
let columnCount = try buffer.throwingReadUB4()
context.columnsCount = Int(columnCount)

if columnCount > 0 {
buffer.moveReaderIndex(forwardBy: 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ extension OracleBackendMessage {
context: OracleBackendMessageDecoder.Context
) throws -> OracleBackendMessage.BitVector {
let columnsCountSent = try buffer.throwingReadUB2()
guard let columnsCount = context.columnsCount else {
guard let columnsCount = context.describeInfo?.columns.count else {
preconditionFailure(
"How can we receive a bit vector without an active statement?"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ extension OracleBackendMessage {
{
buffer.moveReaderIndex(forwardBy: bytesCount)
}
if context.statementOptions!.arrayDMLRowCounts == true {
if context.statementContext!.options.arrayDMLRowCounts == true {
let numberOfRows = buffer.readUB4() ?? 0
rowCounts = []
for _ in 0..<numberOfRows {
Expand Down
Loading

0 comments on commit a3213b5

Please sign in to comment.