Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into iox-object-store
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 2, 2022
2 parents b6c069b + 88b88d4 commit 54dd6d2
Show file tree
Hide file tree
Showing 80 changed files with 2,241 additions and 869 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.59"
readme = "README.md"

[dependencies]
arrow = { version = "16.0.0" }
arrow = { version = "17.0.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "9.0.0" }
dirs = "4.0.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub async fn main() -> Result<()> {
env::set_current_dir(&p).unwrap();
};

let mut session_config = SessionConfig::new().with_information_schema(true);
let mut session_config = SessionConfig::from_env().with_information_schema(true);

if let Some(batch_size) = args.batch_size {
session_config = session_config.with_batch_size(batch_size);
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "16.0.0" }
arrow-flight = { version = "17.0.0" }
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ jit = ["cranelift-module"]
pyarrow = ["pyo3"]

[dependencies]
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.85.0", optional = true }
object_store = { version = "0.3", optional = true }
ordered-float = "3.0"
parquet = { version = "16.0.0", features = ["arrow"], optional = true }
parquet = { version = "17.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
sqlparser = "0.18"
79 changes: 50 additions & 29 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
use crate::error::{DataFusionError, Result};
use arrow::{
array::*,
compute::kernels::cast::cast,
compute::kernels::cast::{cast, cast_with_options, CastOptions},
datatypes::{
ArrowDictionaryKeyType, ArrowNativeType, DataType, Field, Float32Type,
Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, IntervalUnit, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
DECIMAL_MAX_PRECISION,
},
util::decimal::{BasicDecimal, Decimal128},
};
use ordered_float::OrderedFloat;
use std::cmp::Ordering;
Expand Down Expand Up @@ -625,32 +626,36 @@ impl ScalarValue {

/// whether this value is null or not.
pub fn is_null(&self) -> bool {
matches!(
*self,
ScalarValue::Null
| ScalarValue::Boolean(None)
| ScalarValue::UInt8(None)
| ScalarValue::UInt16(None)
| ScalarValue::UInt32(None)
| ScalarValue::UInt64(None)
| ScalarValue::Int8(None)
| ScalarValue::Int16(None)
| ScalarValue::Int32(None)
| ScalarValue::Int64(None)
| ScalarValue::Float32(None)
| ScalarValue::Float64(None)
| ScalarValue::Date32(None)
| ScalarValue::Date64(None)
| ScalarValue::Utf8(None)
| ScalarValue::LargeUtf8(None)
| ScalarValue::List(None, _)
| ScalarValue::TimestampSecond(None, _)
| ScalarValue::TimestampMillisecond(None, _)
| ScalarValue::TimestampMicrosecond(None, _)
| ScalarValue::TimestampNanosecond(None, _)
| ScalarValue::Struct(None, _)
| ScalarValue::Decimal128(None, _, _) // For decimal type, the value is null means ScalarValue::Decimal128 is null.
)
match self {
ScalarValue::Boolean(v) => v.is_none(),
ScalarValue::Null => true,
ScalarValue::Float32(v) => v.is_none(),
ScalarValue::Float64(v) => v.is_none(),
ScalarValue::Decimal128(v, _, _) => v.is_none(),
ScalarValue::Int8(v) => v.is_none(),
ScalarValue::Int16(v) => v.is_none(),
ScalarValue::Int32(v) => v.is_none(),
ScalarValue::Int64(v) => v.is_none(),
ScalarValue::UInt8(v) => v.is_none(),
ScalarValue::UInt16(v) => v.is_none(),
ScalarValue::UInt32(v) => v.is_none(),
ScalarValue::UInt64(v) => v.is_none(),
ScalarValue::Utf8(v) => v.is_none(),
ScalarValue::LargeUtf8(v) => v.is_none(),
ScalarValue::Binary(v) => v.is_none(),
ScalarValue::LargeBinary(v) => v.is_none(),
ScalarValue::List(v, _) => v.is_none(),
ScalarValue::Date32(v) => v.is_none(),
ScalarValue::Date64(v) => v.is_none(),
ScalarValue::TimestampSecond(v, _) => v.is_none(),
ScalarValue::TimestampMillisecond(v, _) => v.is_none(),
ScalarValue::TimestampMicrosecond(v, _) => v.is_none(),
ScalarValue::TimestampNanosecond(v, _) => v.is_none(),
ScalarValue::IntervalYearMonth(v) => v.is_none(),
ScalarValue::IntervalDayTime(v) => v.is_none(),
ScalarValue::IntervalMonthDayNano(v) => v.is_none(),
ScalarValue::Struct(v, _) => v.is_none(),
}
}

/// Converts a scalar value into an 1-row array.
Expand Down Expand Up @@ -1271,7 +1276,11 @@ impl ScalarValue {
if array.is_null(index) {
ScalarValue::Decimal128(None, *precision, *scale)
} else {
ScalarValue::Decimal128(Some(array.value(index)), *precision, *scale)
ScalarValue::Decimal128(
Some(array.value(index).as_i128()),
*precision,
*scale,
)
}
}

Expand Down Expand Up @@ -1433,6 +1442,14 @@ impl ScalarValue {
})
}

/// Try to parse `value` into a ScalarValue of type `target_type`
pub fn try_from_string(value: String, target_type: &DataType) -> Result<Self> {
let value = ScalarValue::Utf8(Some(value));
let cast_options = CastOptions { safe: false };
let cast_arr = cast_with_options(&value.to_array(), target_type, &cast_options)?;
ScalarValue::try_from_array(&cast_arr, 0)
}

fn eq_array_decimal(
array: &ArrayRef,
index: usize,
Expand All @@ -1446,7 +1463,11 @@ impl ScalarValue {
}
match value {
None => array.is_null(index),
Some(v) => !array.is_null(index) && array.value(index) == *v,
Some(v) => {
!array.is_null(index)
&& array.value(index)
== Decimal128::new(precision, scale, &v.to_le_bytes())
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion

[dependencies]
ahash = { version = "0.7", default-features = false }
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
bytes = "1.1"
Expand All @@ -78,7 +78,7 @@ num_cpus = "1.13.0"
object_store = "0.3.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "16.0.0", features = ["arrow", "async"] }
parquet = { version = "17.0.0", features = ["arrow", "async"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "16.0.0", features = ["prettyprint"] }
arrow = { version = "17.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
16 changes: 8 additions & 8 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,16 +454,16 @@ impl From<InformationSchemaColumnsBuilder> for MemTable {
Field::new("table_name", DataType::Utf8, false),
Field::new("column_name", DataType::Utf8, false),
Field::new("ordinal_position", DataType::UInt64, false),
Field::new("column_default", DataType::Utf8, false),
Field::new("column_default", DataType::Utf8, true),
Field::new("is_nullable", DataType::Utf8, false),
Field::new("data_type", DataType::Utf8, false),
Field::new("character_maximum_length", DataType::UInt64, false),
Field::new("character_octet_length", DataType::UInt64, false),
Field::new("numeric_precision", DataType::UInt64, false),
Field::new("numeric_precision_radix", DataType::UInt64, false),
Field::new("numeric_scale", DataType::UInt64, false),
Field::new("datetime_precision", DataType::UInt64, false),
Field::new("interval_type", DataType::Utf8, false),
Field::new("character_maximum_length", DataType::UInt64, true),
Field::new("character_octet_length", DataType::UInt64, true),
Field::new("numeric_precision", DataType::UInt64, true),
Field::new("numeric_precision_radix", DataType::UInt64, true),
Field::new("numeric_scale", DataType::UInt64, true),
Field::new("datetime_precision", DataType::UInt64, true),
Field::new("interval_type", DataType::Utf8, true),
]);

let InformationSchemaColumnsBuilder {
Expand Down
69 changes: 62 additions & 7 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,24 @@
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use log::warn;
use std::collections::HashMap;
use std::env;

/// Configuration option "datafusion.optimizer.filter_null_join_keys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";

/// Configuration option "datafusion.execution.batch_size"
pub const OPT_BATCH_SIZE: &str = "datafusion.execution.batch_size";

/// Configuration option "datafusion.execution.coalesce_batches"
pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";

/// Configuration option "datafusion.execution.coalesce_target_batch_size"
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -115,18 +125,39 @@ impl BuiltInConfigs {
buffer-in-memory batches since creating tiny batches would results in too much metadata \
memory consumption.",
8192,
),
ConfigDefinition::new_bool(
OPT_COALESCE_BATCHES,
format!("When set to true, record batches will be examined between each operator and \
small batches will be coalesced into larger batches. This is helpful when there \
are highly selective filters or joins that could produce tiny output batches. The \
target batch size is determined by the configuration setting \
'{}'.", OPT_COALESCE_TARGET_BATCH_SIZE),
true,
),
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in conjunction with the \
configuration setting '{}'.", OPT_COALESCE_BATCHES),
4096,
)],
}
}

/// Generate documentation that can be included int he user guide
pub fn generate_config_markdown() -> String {
use std::fmt::Write as _;
let configs = Self::new();
let mut docs = "| key | type | default | description |\n".to_string();
docs += "|-----|------|---------|-------------|\n";
for config in configs.config_definitions {
docs += &format!(
"| {} | {} | {} | {} |\n",
for config in configs
.config_definitions
.iter()
.sorted_by_key(|c| c.key.as_str())
{
let _ = writeln!(
&mut docs,
"| {} | {} | {} | {} |",
config.key, config.data_type, config.default_value, config.description
);
}
Expand Down Expand Up @@ -157,6 +188,34 @@ impl ConfigOptions {
Self { options }
}

/// Create new ConfigOptions struct, taking values from environment variables where possible.
/// For example, setting DATAFUSION_EXECUTION_BATCH_SIZE to control `datafusion.execution.batch_size`.
pub fn from_env() -> Self {
let mut options = HashMap::new();
let built_in = BuiltInConfigs::new();
for config_def in &built_in.config_definitions {
let config_value = {
let mut env_key = config_def.key.replace('.', "_");
env_key.make_ascii_uppercase();
match env::var(&env_key) {
Ok(value) => match ScalarValue::try_from_string(
value.clone(),
&config_def.data_type,
) {
Ok(parsed) => parsed,
Err(_) => {
warn!("Warning: could not parse environment variable {}={} to type {}.", env_key, value, config_def.data_type);
config_def.default_value.clone()
}
},
Err(_) => config_def.default_value.clone(),
}
};
options.insert(config_def.key.clone(), config_value);
}
Self { options }
}

/// set a configuration option
pub fn set(&mut self, key: &str, value: ScalarValue) {
self.options.insert(key.to_string(), value);
Expand Down Expand Up @@ -206,10 +265,6 @@ mod test {
#[test]
fn docs() {
let docs = BuiltInConfigs::generate_config_markdown();
// uncomment this println to see the docs so they can be copy-and-pasted to
// docs/source/user-guide/configs.md until this task is automated
// in https://github.com/apache/arrow-datafusion/issues/2770
//println!("{}", docs);
let mut lines = docs.lines();
assert_eq!(
lines.next().unwrap(),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ fn paths_to_batch(
let mut fields = vec![
Field::new(FILE_PATH_COLUMN_NAME, DataType::Utf8, false),
Field::new(FILE_SIZE_COLUMN_NAME, DataType::UInt64, false),
Field::new(FILE_MODIFIED_COLUMN_NAME, DataType::Date64, false),
Field::new(FILE_MODIFIED_COLUMN_NAME, DataType::Date64, true),
];
for pn in table_partition_cols {
fields.push(Field::new(pn, DataType::Utf8, false));
Expand Down
Loading

0 comments on commit 54dd6d2

Please sign in to comment.