Skip to content

Commit

Permalink
workaround for invalid indices
Browse files Browse the repository at this point in the history
  • Loading branch information
pacman82 committed Dec 16, 2023
1 parent 3322aa3 commit 65ffe85
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 14 deletions.
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
mod date_time;
mod decimal;
mod error;

mod odbc_writer;
mod quirks;
mod reader;
mod schema;

Expand All @@ -57,6 +57,7 @@ pub use odbc_api;
pub use self::{
error::Error,
odbc_writer::{insert_into_table, insert_statement_from_schema, OdbcWriter, WriterError},
quirks::Quirks,
reader::{
BufferAllocationOptions, ColumnFailure, ConcurrentOdbcReader, OdbcReader, OdbcReaderBuilder,
},
Expand Down
34 changes: 34 additions & 0 deletions src/quirks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/// A (non exhaustive) description of the non ODBC API conformant behavior of ODBC drivers.
/// Workarounds which are intended to help application developers seperate between the descision of
/// how to deal with non conformity from the knowledge which driver behaves weird in exactly which
/// way.
///
/// For example it wants to avoid an if statement specifying "if the database is DB2 please use
/// terminating zeroes instead of indiactors to determine string lengths" and seperate this into
/// "IBM DB2 returns memory garbage for indicators" and another part of the application decides,
/// this is how I deal with memory garbage indicicators.
#[non_exhaustive]
#[derive(Clone, PartialEq, Eq)]
pub struct Quirks {
/// IBM DB2 has been observered that the length indicators returned from memory are garbage for
/// strings. It seems to be preferable to rely on the terminating zero exclusively to determine
/// string length. This behavior seems to so far only manifest with variadic string fields.
/// See: <https://github.com/pacman82/arrow-odbc-py/issues/68> and also
/// <https://github.com/pacman82/odbc-api/issues/398>
pub indicators_returned_from_bulk_fetch_are_memory_garbage: bool,
}

impl Quirks {
/// A new instance describing an ODBC driver without quirks
pub fn new() -> Self {
Quirks {
indicators_returned_from_bulk_fetch_are_memory_garbage: false,
}
}
}

impl Default for Quirks {
fn default() -> Self {
Self::new()
}
}
9 changes: 7 additions & 2 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ mod to_record_batch;

use self::{decimal::Decimal, map_odbc_to_arrow::MapOdbcToArrow};

use crate::date_time::{
days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
use crate::{
date_time::{
days_since_epoch, ms_since_epoch, ns_since_epoch, seconds_since_epoch, us_since_epoch,
},
Quirks,
};

pub use self::{
Expand Down Expand Up @@ -120,6 +123,7 @@ pub fn choose_column_strategy(
query_metadata: &mut impl ResultSetMetadata,
col_index: u16,
buffer_allocation_options: BufferAllocationOptions,
quirks: &Quirks,
) -> Result<Box<dyn ReadStrategy>, ColumnFailure> {
let strat: Box<dyn ReadStrategy> = match field.data_type() {
ArrowDataType::Boolean => {
Expand Down Expand Up @@ -152,6 +156,7 @@ pub fn choose_column_strategy(
sql_type,
lazy_display_size,
buffer_allocation_options.max_text_size,
quirks.indicators_returned_from_bulk_fetch_are_memory_garbage,
)?
}
ArrowDataType::Decimal128(precision, scale @ 0..) => {
Expand Down
19 changes: 16 additions & 3 deletions src/reader/odbc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow::{
};
use odbc_api::{buffers::ColumnarAnyBuffer, BlockCursor, Cursor};

use crate::{BufferAllocationOptions, ConcurrentOdbcReader, Error};
use crate::{BufferAllocationOptions, ConcurrentOdbcReader, Error, Quirks};

use super::{odbc_batch_stream::OdbcBatchStream, to_record_batch::ToRecordBatch};

Expand Down Expand Up @@ -359,6 +359,7 @@ pub struct OdbcReaderBuilder {
max_text_size: Option<usize>,
max_binary_size: Option<usize>,
fallibale_allocations: bool,
quirks: Quirks,
}

impl OdbcReaderBuilder {
Expand All @@ -379,6 +380,7 @@ impl OdbcReaderBuilder {
max_text_size: None,
max_binary_size: None,
fallibale_allocations: false,
quirks: Quirks::new(),
}
}

Expand Down Expand Up @@ -458,6 +460,13 @@ impl OdbcReaderBuilder {
self
}

/// Shims are workarounds which can make arrow ODBC use different implementations in order to
/// compensate for ODBC drivers which violate the ODBC specification.
pub fn with_shims(&mut self, quirks: Quirks) -> &mut Self {
self.quirks = quirks;
self
}

/// No matter if the user explicitly specified a limit in row size, a memory limit, both or
/// neither. In order to construct a reader we need to decide on the buffer size in rows.
fn buffer_size_in_rows(&self, bytes_per_row: usize) -> Result<usize, Error> {
Expand Down Expand Up @@ -495,8 +504,12 @@ impl OdbcReaderBuilder {
max_binary_size: self.max_binary_size,
fallibale_allocations: self.fallibale_allocations,
};
let converter =
ToRecordBatch::new(&mut cursor, self.schema.clone(), buffer_allocation_options)?;
let converter = ToRecordBatch::new(
&mut cursor,
self.schema.clone(),
buffer_allocation_options,
&self.quirks,
)?;
let bytes_per_row = converter.row_size_in_bytes();
let buffer_size_in_rows = self.buffer_size_in_rows(bytes_per_row)?;
let row_set_buffer =
Expand Down
57 changes: 52 additions & 5 deletions src/reader/text.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{char::decode_utf16, cmp::min, num::NonZeroUsize, sync::Arc};
use std::{char::decode_utf16, cmp::min, ffi::CStr, num::NonZeroUsize, sync::Arc};

use arrow::array::{ArrayRef, StringBuilder};
use odbc_api::{
Expand All @@ -16,6 +16,7 @@ pub fn choose_text_strategy(
sql_type: OdbcDataType,
lazy_display_size: impl FnOnce() -> Result<Option<NonZeroUsize>, odbc_api::Error>,
max_text_size: Option<usize>,
assume_indicators_are_memory_garbage: bool,
) -> Result<Box<dyn ReadStrategy>, ColumnFailure> {
let apply_buffer_limit = |len| match (len, max_text_size) {
(None, None) => Err(ColumnFailure::ZeroSizedColumn { sql_type }),
Expand All @@ -40,7 +41,10 @@ pub fn choose_text_strategy(
.transpose()
.map_err(|source| ColumnFailure::UnknownStringLength { sql_type, source })?;
let octet_len = apply_buffer_limit(octet_len.map(NonZeroUsize::get))?;
narrow_text_strategy(octet_len)
// So far only Linux users seemed to have complained about panics due to garbage indices?
// Linux usually would use UTF-8, so we only invest work in working around this for narrow
// strategies
narrow_text_strategy(octet_len, assume_indicators_are_memory_garbage)
};

Ok(strategy)
Expand All @@ -50,8 +54,15 @@ fn wide_text_strategy(u16_len: usize) -> Box<dyn ReadStrategy> {
Box::new(WideText::new(u16_len))
}

fn narrow_text_strategy(octet_len: usize) -> Box<dyn ReadStrategy> {
Box::new(NarrowText::new(octet_len))
fn narrow_text_strategy(
octet_len: usize,
assume_indicators_are_memory_garbage: bool,
) -> Box<dyn ReadStrategy> {
if assume_indicators_are_memory_garbage {
Box::new(NarrowUseTerminatingZero::new(octet_len))
} else {
Box::new(NarrowText::new(octet_len))
}
}

/// Strategy requesting the text from the database as UTF-16 (Wide characters) and emmitting it as
Expand Down Expand Up @@ -125,9 +136,45 @@ impl ReadStrategy for NarrowText {
for value in view.iter() {
builder.append_option(value.map(|bytes| {
std::str::from_utf8(bytes)
.expect("ODBC column had been expected to return valid utf8, but did not.")
.expect("ODBC driver had been expected to return valid utf8, but did not.")
}));
}
Ok(Arc::new(builder.finish()))
}
}

pub struct NarrowUseTerminatingZero {
/// Maximum string length in u8, excluding terminating zero
max_str_len: usize,
}

impl NarrowUseTerminatingZero {
pub fn new(max_str_len: usize) -> Self {
Self { max_str_len }
}
}

impl ReadStrategy for NarrowUseTerminatingZero {
fn buffer_desc(&self) -> BufferDesc {
BufferDesc::Text {
max_str_len: self.max_str_len,
}
}

fn fill_arrow_array(&self, column_view: AnySlice) -> Result<ArrayRef, MappingError> {
let view = column_view.as_text_view().unwrap();
let mut builder = StringBuilder::with_capacity(view.len(), self.max_str_len * view.len());
// We can not use view.iter() since its implementation relies on the indicator buffer being
// correct. This read strategy is a workaround for the indicators being incorrect, though.
for bytes in view.raw_value_buffer().chunks_exact(self.max_str_len + 1) {
let c_str = CStr::from_bytes_until_nul(bytes)
.expect("ODBC driver must return strings terminated by zero");
let str = c_str
.to_str()
.expect("ODBC driver had been expected to return valid utf8, but did not.");
let option = (!str.is_empty()).then_some(str);
builder.append_option(option);
}
Ok(Arc::new(builder.finish()))
}
}
5 changes: 3 additions & 2 deletions src/reader/to_record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow::{
use log::info;
use odbc_api::{buffers::ColumnarAnyBuffer, ResultSetMetadata};

use crate::{arrow_schema_from, BufferAllocationOptions, ColumnFailure, Error};
use crate::{arrow_schema_from, BufferAllocationOptions, ColumnFailure, Error, Quirks};

use super::{choose_column_strategy, MappingError, ReadStrategy};

Expand All @@ -27,6 +27,7 @@ impl ToRecordBatch {
cursor: &mut impl ResultSetMetadata,
schema: Option<SchemaRef>,
buffer_allocation_options: BufferAllocationOptions,
quirks: &Quirks,
) -> Result<Self, Error> {
// Infer schema if not given by the user
let schema = if let Some(schema) = schema {
Expand All @@ -41,7 +42,7 @@ impl ToRecordBatch {
.enumerate()
.map(|(index, field)| {
let col_index = (index + 1).try_into().unwrap();
choose_column_strategy(field, cursor, col_index, buffer_allocation_options)
choose_column_strategy(field, cursor, col_index, buffer_allocation_options, quirks)
.map_err(|cause| cause.into_crate_error(field.name().clone(), index))
})
.collect::<Result<_, _>>()?;
Expand Down
34 changes: 33 additions & 1 deletion tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_odbc::{
Connection, ConnectionOptions, Cursor, CursorImpl, Environment, IntoParameter,
StatementConnection,
},
ColumnFailure, Error, OdbcReaderBuilder, OdbcWriter, WriterError,
ColumnFailure, Error, OdbcReaderBuilder, OdbcWriter, Quirks, WriterError,
};

use stdext::function_name;
Expand Down Expand Up @@ -300,6 +300,38 @@ fn fetch_varchar() {
assert!(array_vals.is_null(2));
}

/// IBM DB2 ODBC driver reports memory garbage instead of indicators. We want to activate a
/// workaround which utilizes the terminating zeroes instead of indicators to determine string
/// length and NULL.
/// Ideally this test would run against a DB2 database. Yet the test setup proves to be a lot of
/// effort. Instead this test verifies that the workaround also works with Microsoft SQL Server.
#[test]
fn fetch_varchar_using_terminating_zeroes_to_indicate_string_length() {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let cursor = cursor_over(table_name, "VARCHAR(50)", "('Hello'),('Bonjour'),(NULL)");

let mut quirks = Quirks::new();
quirks.indicators_returned_from_bulk_fetch_are_memory_garbage = true;
let mut reader = OdbcReaderBuilder::new()
.with_max_num_rows_per_batch(3)
.with_shims(quirks)
.build(cursor)
.unwrap()
.into_concurrent()
.unwrap();
let record_batch = reader.next().unwrap().unwrap();
let array_vals = record_batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

// Assert that the correct values are found within the arrow batch
assert_eq!("Hello", array_vals.value(0));
assert_eq!("Bonjour", array_vals.value(1));
assert!(array_vals.is_null(2));
}

/// Fill a record batch of Strings from a nvarchar source column
#[test]
fn fetch_nvarchar() {
Expand Down

0 comments on commit 65ffe85

Please sign in to comment.