Skip to content

Commit 351cf3a

Browse files
authored
feat: upgrade arrow to 50 and datafusion to 35 (#1868)
1 parent 9c74ef3 commit 351cf3a

File tree

8 files changed

+74
-44
lines changed

8 files changed

+74
-44
lines changed

python/Cargo.toml

+4-4
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ name = "lance"
1111
crate-type = ["cdylib"]
1212

1313
[dependencies]
14-
arrow = { version = "49.0.0", features = ["pyarrow"] }
15-
arrow-array = "49.0"
16-
arrow-data = "49.0"
17-
arrow-schema = "49.0"
14+
arrow = { version = "50.0.0", features = ["pyarrow"] }
15+
arrow-array = "50.0"
16+
arrow-data = "50.0"
17+
arrow-schema = "50.0"
1818
object_store = "0.9.0"
1919
async-trait = "0.1"
2020
chrono = "0.4.31"

python/src/arrow.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17-
use arrow::ffi_stream::{export_reader_into_raw, FFI_ArrowArrayStream};
17+
use arrow::ffi_stream::FFI_ArrowArrayStream;
1818
use arrow::pyarrow::*;
1919
use arrow_array::{RecordBatch, RecordBatchIterator, RecordBatchReader};
2020
use arrow_schema::{DataType, Field, Schema};
@@ -105,8 +105,7 @@ pub fn reader_to_pyarrow(
105105
py: Python,
106106
reader: Box<dyn RecordBatchReader + Send>,
107107
) -> PyResult<PyObject> {
108-
let mut stream = FFI_ArrowArrayStream::empty();
109-
unsafe { export_reader_into_raw(reader, &mut stream) };
108+
let mut stream = FFI_ArrowArrayStream::new(reader);
110109

111110
let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
112111
let module = py.import("pyarrow")?;

rust/Cargo.toml

+16-16
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,17 @@ lance-test-macros = { version = "=0.9.9", path = "./lance-test-macros" }
5353
lance-testing = { version = "=0.9.9", path = "./lance-testing" }
5454
approx = "0.5.1"
5555
# Note that this one does not include pyarrow
56-
arrow = { version = "49.0.0", optional = false, features = ["prettyprint"] }
57-
arrow-arith = "49.0"
58-
arrow-array = "49.0"
59-
arrow-buffer = "49.0"
60-
arrow-cast = "49.0"
61-
arrow-data = "49.0"
62-
arrow-ipc = { version = "49.0", features = ["zstd"] }
63-
arrow-ord = "49.0"
64-
arrow-row = "49.0"
65-
arrow-schema = "49.0"
66-
arrow-select = "49.0"
56+
arrow = { version = "50.0.0", optional = false, features = ["prettyprint"] }
57+
arrow-arith = "50.0"
58+
arrow-array = "50.0"
59+
arrow-buffer = "50.0"
60+
arrow-cast = "50.0"
61+
arrow-data = "50.0"
62+
arrow-ipc = { version = "50.0", features = ["zstd"] }
63+
arrow-ord = "50.0"
64+
arrow-row = "50.0"
65+
arrow-schema = "50.0"
66+
arrow-select = "50.0"
6767
async-recursion = "1.0"
6868
async-trait = "0.1"
6969
aws-config = "0.56"
@@ -77,13 +77,13 @@ bytes = "1.4"
7777
byteorder = "1.5"
7878
chrono = "0.4.23"
7979
criterion = { version = "0.5", features = ["async", "async_tokio"] }
80-
datafusion = { version = "34.0.0", default-features = false, features = [
80+
datafusion = { version = "35.0.0", default-features = false, features = [
8181
"regex_expressions",
8282
] }
83-
datafusion-common = "34.0"
84-
datafusion-sql = "34.0"
85-
datafusion-expr = "34.0"
86-
datafusion-physical-expr = "34.0"
83+
datafusion-common = "35.0"
84+
datafusion-sql = "35.0"
85+
datafusion-expr = "35.0"
86+
datafusion-physical-expr = "35.0"
8787
either = "1.0"
8888
futures = "0.3"
8989
http = "0.2.9"

rust/lance-datafusion/src/expr.rs

+43-17
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use std::sync::Arc;
1818

1919
use arrow::compute::cast;
20-
use arrow_array::{cast::AsArray, FixedSizeListArray};
20+
use arrow_array::{cast::AsArray, ArrayRef};
2121
use arrow_schema::DataType;
2222
use datafusion_common::ScalarValue;
2323

@@ -233,25 +233,51 @@ pub fn safe_coerce_scalar(value: &ScalarValue, ty: &DataType) -> Option<ScalarVa
233233
_ => None,
234234
},
235235
ScalarValue::Null => Some(value.clone()),
236-
// In Arrow 50.0.0, we will have support to cast from list -> FSL, and we
237-
// can remove the special case below.
238-
ScalarValue::List(values) if matches!(ty, DataType::FixedSizeList(_, _)) => {
239-
let values = values.as_list::<i32>().values();
240-
if let DataType::FixedSizeList(field, size) = ty {
241-
let new_values = cast(values, field.data_type()).ok()?;
242-
Some(ScalarValue::FixedSizeList(Arc::new(
243-
FixedSizeListArray::new(field.clone(), *size, new_values, None),
244-
)))
245-
} else {
246-
unreachable!()
236+
ScalarValue::List(values) => {
237+
let values = values.clone() as ArrayRef;
238+
let new_values = cast(&values, ty).ok()?;
239+
match ty {
240+
DataType::List(_) => {
241+
Some(ScalarValue::List(Arc::new(new_values.as_list().clone())))
242+
}
243+
DataType::LargeList(_) => Some(ScalarValue::LargeList(Arc::new(
244+
new_values.as_list().clone(),
245+
))),
246+
DataType::FixedSizeList(_, _) => Some(ScalarValue::FixedSizeList(Arc::new(
247+
new_values.as_fixed_size_list().clone(),
248+
))),
249+
_ => None,
250+
}
251+
}
252+
ScalarValue::LargeList(values) => {
253+
let values = values.clone() as ArrayRef;
254+
let new_values = cast(&values, ty).ok()?;
255+
match ty {
256+
DataType::List(_) => {
257+
Some(ScalarValue::List(Arc::new(new_values.as_list().clone())))
258+
}
259+
DataType::LargeList(_) => Some(ScalarValue::LargeList(Arc::new(
260+
new_values.as_list().clone(),
261+
))),
262+
DataType::FixedSizeList(_, _) => Some(ScalarValue::FixedSizeList(Arc::new(
263+
new_values.as_fixed_size_list().clone(),
264+
))),
265+
_ => None,
247266
}
248267
}
249-
ScalarValue::List(values) | ScalarValue::FixedSizeList(values) => {
250-
let new_values = cast(values, ty).ok()?;
268+
ScalarValue::FixedSizeList(values) => {
269+
let values = values.clone() as ArrayRef;
270+
let new_values = cast(&values, ty).ok()?;
251271
match ty {
252-
DataType::List(_) => Some(ScalarValue::List(new_values)),
253-
DataType::LargeList(_) => Some(ScalarValue::LargeList(new_values)),
254-
DataType::FixedSizeList(_, _) => Some(ScalarValue::FixedSizeList(new_values)),
272+
DataType::List(_) => {
273+
Some(ScalarValue::List(Arc::new(new_values.as_list().clone())))
274+
}
275+
DataType::LargeList(_) => Some(ScalarValue::LargeList(Arc::new(
276+
new_values.as_list().clone(),
277+
))),
278+
DataType::FixedSizeList(_, _) => Some(ScalarValue::FixedSizeList(Arc::new(
279+
new_values.as_fixed_size_list().clone(),
280+
))),
255281
_ => None,
256282
}
257283
}

rust/lance-index/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ datafusion.workspace = true
2323
datafusion-common.workspace = true
2424
datafusion-expr.workspace = true
2525
datafusion-physical-expr.workspace = true
26+
datafusion-sql.workspace = true
2627
futures.workspace = true
2728
half.workspace = true
2829
lance-arrow.workspace = true

rust/lance/src/dataset/scanner.rs

-1
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,6 @@ impl Scanner {
560560
PhysicalGroupBy::new_single(Vec::new()),
561561
vec![count_expr],
562562
vec![None],
563-
vec![None],
564563
plan,
565564
plan_schema,
566565
)?);

rust/lance/src/io/exec/planner.rs

-1
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,6 @@ impl Planner {
561561
Ok(datafusion::physical_expr::create_physical_expr(
562562
expr,
563563
df_schema.as_ref(),
564-
&self.schema,
565564
&Default::default(),
566565
)?)
567566
}

rust/lance/src/io/exec/scan.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -305,9 +305,15 @@ impl ExecutionPlan for LanceScanExec {
305305

306306
fn with_new_children(
307307
self: Arc<Self>,
308-
_children: Vec<Arc<dyn ExecutionPlan>>,
308+
children: Vec<Arc<dyn ExecutionPlan>>,
309309
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
310-
todo!()
310+
if children.is_empty() {
311+
Ok(self)
312+
} else {
313+
Err(DataFusionError::Internal(
314+
"LanceScanExec cannot be assigned children".to_string(),
315+
))
316+
}
311317
}
312318

313319
fn execute(

0 commit comments

Comments
 (0)