Skip to content

Commit db5281c

Browse files
authored
fix: temporarily disable spilling when training indices on string columns (#3469)
Until we upgrade to the next DF release (46) we cannot rely on spilling when working with string data. Users continue to get errors unrelated to the size of the spill pool or the amount of data they have. This disables spilling entirely on string columns (which is the typical workaround) until we get a stable solution.
1 parent c69a5a2 commit db5281c

File tree

3 files changed

+35
-8
lines changed

3 files changed

+35
-8
lines changed

python/python/tests/test_scalar_index.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -233,25 +233,26 @@ def gen_string(idx: int):
233233
# environment variable. This test ensures that the environment variable
234234
# is respected.
235235
def test_lance_mem_pool_env_var(tmp_path):
236-
strings = pa.array([f"string-{i}" * 10 for i in range(100 * 1024)])
237-
table = pa.Table.from_arrays([strings], ["str"])
236+
ints = pa.array([i * 10 for i in range(100 * 1024)])
237+
table = pa.Table.from_arrays([ints], ["int"])
238238
dataset = lance.write_dataset(table, tmp_path)
239239

240240
# Should succeed
241-
dataset.create_scalar_index("str", index_type="BTREE")
241+
dataset.create_scalar_index("int", index_type="BTREE")
242242

243243
try:
244244
# Should fail if we intentionally use a very small memory pool
245245
os.environ["LANCE_MEM_POOL_SIZE"] = "1024"
246246
with pytest.raises(Exception):
247-
dataset.create_scalar_index("str", index_type="BTREE", replace=True)
247+
dataset.create_scalar_index("int", index_type="BTREE", replace=True)
248248

249249
# Should succeed again since bypassing spilling takes precedence
250250
os.environ["LANCE_BYPASS_SPILLING"] = "1"
251-
dataset.create_scalar_index("str", index_type="BTREE", replace=True)
251+
dataset.create_scalar_index("int", index_type="BTREE", replace=True)
252252
finally:
253253
del os.environ["LANCE_MEM_POOL_SIZE"]
254-
del os.environ["LANCE_BYPASS_SPILLING"]
254+
if "LANCE_BYPASS_SPILLING" in os.environ:
255+
del os.environ["LANCE_BYPASS_SPILLING"]
255256

256257

257258
@pytest.mark.parametrize("with_position", [True, False])

rust/lance-index/src/scalar/btree.rs

+9-1
Original file line numberDiff line numberDiff line change
@@ -1265,6 +1265,13 @@ impl TrainingSource for BTreeUpdater {
12651265
self: Box<Self>,
12661266
chunk_size: u32,
12671267
) -> Result<SendableRecordBatchStream> {
1268+
let data_type = self.new_data.schema().field(0).data_type().clone();
1269+
// Datafusion currently has bugs with spilling on string columns
1270+
// See https://github.com/apache/datafusion/issues/10073
1271+
//
1272+
// One we upgrade we can remove this
1273+
let use_spilling = !matches!(data_type, DataType::Utf8 | DataType::LargeUtf8);
1274+
12681275
let new_input = Arc::new(OneShotExec::new(self.new_data));
12691276
let old_input = Self::into_old_input(self.index);
12701277
debug_assert_eq!(
@@ -1285,10 +1292,11 @@ impl TrainingSource for BTreeUpdater {
12851292
LexOrdering::new(vec![sort_expr]),
12861293
all_data,
12871294
));
1295+
12881296
let unchunked = execute_plan(
12891297
ordered,
12901298
LanceExecutionOptions {
1291-
use_spilling: true,
1299+
use_spilling,
12921300
..Default::default()
12931301
},
12941302
)?;

rust/lance/src/index/scalar.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,24 @@ impl TrainingRequest {
6262
) -> Result<SendableRecordBatchStream> {
6363
let mut scan = self.dataset.scan();
6464

65+
let column_field =
66+
self.dataset
67+
.schema()
68+
.field(&self.column)
69+
.ok_or(Error::InvalidInput {
70+
source: format!("No column with name {}", self.column).into(),
71+
location: location!(),
72+
})?;
73+
74+
// Datafusion currently has bugs with spilling on string columns
75+
// See https://github.com/apache/datafusion/issues/10073
76+
//
77+
// One we upgrade we can remove this
78+
let use_spilling = !matches!(
79+
column_field.data_type(),
80+
DataType::Utf8 | DataType::LargeUtf8
81+
);
82+
6583
let ordering = match sort {
6684
true => Some(vec![ColumnOrdering::asc_nulls_first(self.column.clone())]),
6785
false => None,
@@ -74,7 +92,7 @@ impl TrainingRequest {
7492

7593
let batches = scan
7694
.try_into_dfstream(LanceExecutionOptions {
77-
use_spilling: true,
95+
use_spilling,
7896
..Default::default()
7997
})
8098
.await?;

0 commit comments

Comments
 (0)