Skip to content

Commit 9e98ecb

Browse files
authored
Merge branch 'main' into lei/scale_ray_sink
2 parents b83161b + 1b8adb5 commit 9e98ecb

17 files changed

+653
-378
lines changed

java/core/lance-jni/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,5 @@ tokio.workspace = true
2323
jni = "0.21.1"
2424
snafu.workspace = true
2525
lazy_static.workspace = true
26+
serde = { version = "^1" }
27+
serde_json = { version = "1" }

java/core/lance-jni/src/blocking_dataset.rs

+36-21
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use crate::{traits::IntoJava, Result, RT};
15+
use crate::{traits::IntoJava, Error, Result, RT};
1616
use arrow::array::RecordBatchReader;
1717
use arrow::ffi::FFI_ArrowSchema;
1818
use arrow_schema::Schema;
1919
use jni::sys::jlong;
2020
use jni::{objects::JObject, JNIEnv};
21+
use lance::dataset::fragment::FileFragment;
22+
use lance::dataset::transaction::Operation;
2123
use lance::dataset::{Dataset, WriteParams};
22-
24+
use snafu::{location, Location};
2325
pub const NATIVE_DATASET: &str = "nativeDatasetHandle";
2426

2527
#[derive(Clone)]
@@ -42,6 +44,11 @@ impl BlockingDataset {
4244
Ok(Self { inner })
4345
}
4446

47+
pub fn commit(uri: &str, operation: Operation, read_version: Option<u64>) -> Result<Self> {
48+
let inner = RT.block_on(Dataset::commit(uri, operation, read_version, None, None))?;
49+
Ok(Self { inner })
50+
}
51+
4552
pub fn count_rows(&self, filter: Option<String>) -> Result<usize> {
4653
Ok(RT.block_on(self.inner.count_rows(filter))?)
4754
}
@@ -100,7 +107,7 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_releaseNativeDataset(
100107
}
101108

102109
#[no_mangle]
103-
pub extern "system" fn Java_com_lancedb_lance_Dataset_getFragmentsIds<'a>(
110+
pub extern "system" fn Java_com_lancedb_lance_Dataset_getJsonFragments<'a>(
104111
mut env: JNIEnv<'a>,
105112
jdataset: JObject,
106113
) -> JObject<'a> {
@@ -111,13 +118,7 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_getFragmentsIds<'a>(
111118
dataset.inner.get_fragments()
112119
};
113120

114-
let array_list = env
115-
.new_int_array(fragments.len() as i32)
116-
.expect("Failed to create int array");
117-
let fragment_ids = fragments.iter().map(|f| f.id() as i32).collect::<Vec<_>>();
118-
env.set_int_array_region(&array_list, 0, &fragment_ids)
119-
.expect("Failed to set int array region");
120-
array_list.into()
121+
ok_or_throw!(env, create_json_fragment_list(&mut env, fragments))
121122
}
122123

123124
#[no_mangle]
@@ -133,20 +134,34 @@ pub extern "system" fn Java_com_lancedb_lance_Dataset_importFfiSchema(
133134
Schema::from(dataset.inner.schema())
134135
};
135136
let out_c_schema = arrow_schema_addr as *mut FFI_ArrowSchema;
136-
let c_schema = match FFI_ArrowSchema::try_from(&schema) {
137-
Ok(schema) => schema,
138-
Err(err) => {
139-
env.throw_new(
140-
"java/lang/RuntimeException",
141-
format!("Failed to convert Arrow schema: {}", err),
142-
)
143-
.expect("Error throwing exception");
144-
return;
145-
}
146-
};
137+
let c_schema = ok_or_throw_without_return!(env, FFI_ArrowSchema::try_from(&schema));
147138

148139
unsafe {
149140
std::ptr::copy(std::ptr::addr_of!(c_schema), out_c_schema, 1);
150141
std::mem::forget(c_schema);
151142
};
152143
}
144+
145+
fn create_json_fragment_list<'a>(
146+
env: &mut JNIEnv<'a>,
147+
fragments: Vec<FileFragment>,
148+
) -> Result<JObject<'a>> {
149+
let array_list_class = env.find_class("java/util/ArrayList")?;
150+
151+
let array_list = env.new_object(array_list_class, "()V", &[])?;
152+
153+
for fragment in fragments {
154+
let json_string = serde_json::to_string(fragment.metadata()).map_err(|e| Error::JSON {
155+
message: e.to_string(),
156+
location: location!(),
157+
})?;
158+
let jstring = env.new_string(json_string)?;
159+
env.call_method(
160+
&array_list,
161+
"add",
162+
"(Ljava/lang/Object;)Z",
163+
&[(&jstring).into()],
164+
)?;
165+
}
166+
Ok(array_list)
167+
}

java/core/lance-jni/src/error.rs

+24-1
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414

1515
use std::str::Utf8Error;
1616

17+
use arrow_schema::ArrowError;
1718
use jni::errors::Error as JniError;
18-
use snafu::{Location, Snafu};
19+
use serde_json::Error as JsonError;
20+
use snafu::{location, Location, Snafu};
1921

2022
/// Java Exception types
2123
pub enum JavaException {
@@ -47,6 +49,8 @@ pub enum Error {
4749
Arrow { message: String, location: Location },
4850
#[snafu(display("Index error: {}, location", message))]
4951
Index { message: String, location: Location },
52+
#[snafu(display("JSON error: {}, location: {}", message, location))]
53+
JSON { message: String, location: Location },
5054
#[snafu(display("Dataset not found error: {}, location {}", path, location))]
5155
DatasetNotFound { path: String, location: Location },
5256
#[snafu(display("Dataset already exists error: {}, location {}", uri, location))]
@@ -66,6 +70,7 @@ impl Error {
6670
Self::Arrow { .. }
6771
| Self::DatasetNotFound { .. }
6872
| Self::DatasetAlreadyExists { .. }
73+
| Self::JSON { .. }
6974
| Self::Other { .. }
7075
| Self::Jni { .. } => self.throw_as(env, JavaException::RuntimeException),
7176
}
@@ -96,6 +101,24 @@ impl From<Utf8Error> for Error {
96101
}
97102
}
98103

104+
impl From<ArrowError> for Error {
105+
fn from(source: ArrowError) -> Self {
106+
Self::Arrow {
107+
message: source.to_string(),
108+
location: location!(),
109+
}
110+
}
111+
}
112+
113+
impl From<JsonError> for Error {
114+
fn from(source: JsonError) -> Self {
115+
Self::JSON {
116+
message: source.to_string(),
117+
location: location!(),
118+
}
119+
}
120+
}
121+
99122
impl From<lance::Error> for Error {
100123
fn from(source: lance::Error) -> Self {
101124
match source {

java/core/lance-jni/src/ffi.rs

+28
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ use crate::Result;
1919

2020
/// Extend JNIEnv with helper functions.
2121
pub trait JNIEnvExt {
22+
/// Get integers from Java List<Integer> object.
23+
fn get_integers(&mut self, obj: &JObject) -> Result<Vec<i32>>;
24+
2225
/// Get strings from Java List<String> object.
2326
fn get_strings(&mut self, obj: &JObject) -> Result<Vec<String>>;
2427

@@ -34,12 +37,27 @@ pub trait JNIEnvExt {
3437
/// Get Option<i64> from Java Optional<Long>.
3538
fn get_long_opt(&mut self, obj: &JObject) -> Result<Option<i64>>;
3639

40+
/// Get Option<u64> from Java Optional<Long>.
41+
fn get_u64_opt(&mut self, obj: &JObject) -> Result<Option<u64>>;
42+
3743
fn get_optional<T, F>(&mut self, obj: &JObject, f: F) -> Result<Option<T>>
3844
where
3945
F: FnOnce(&mut JNIEnv, &JObject) -> Result<T>;
4046
}
4147

4248
impl JNIEnvExt for JNIEnv<'_> {
49+
fn get_integers(&mut self, obj: &JObject) -> Result<Vec<i32>> {
50+
let list = self.get_list(obj)?;
51+
let mut iter = list.iter(self)?;
52+
let mut results = Vec::with_capacity(list.size(self)? as usize);
53+
while let Some(elem) = iter.next(self)? {
54+
let int_obj = self.call_method(elem, "intValue", "()I", &[])?;
55+
let int_value = int_obj.i()?;
56+
results.push(int_value);
57+
}
58+
Ok(results)
59+
}
60+
4361
fn get_strings(&mut self, obj: &JObject) -> Result<Vec<String>> {
4462
let list = self.get_list(obj)?;
4563
let mut iter = list.iter(self)?;
@@ -90,6 +108,16 @@ impl JNIEnvExt for JNIEnv<'_> {
90108
})
91109
}
92110

111+
fn get_u64_opt(&mut self, obj: &JObject) -> Result<Option<u64>> {
112+
self.get_optional(obj, |env, inner_obj| {
113+
let java_obj_gen = env.call_method(inner_obj, "get", "()Ljava/lang/Object;", &[])?;
114+
let java_long_obj = java_obj_gen.l()?;
115+
let long_obj = env.call_method(java_long_obj, "longValue", "()J", &[])?;
116+
let long_value = long_obj.j()?;
117+
Ok(long_value as u64)
118+
})
119+
}
120+
93121
fn get_optional<T, F>(&mut self, obj: &JObject, f: F) -> Result<Option<T>>
94122
where
95123
F: FnOnce(&mut JNIEnv, &JObject) -> Result<T>,

0 commit comments

Comments
 (0)