Skip to content

Commit 5d1c84f

Browse files
authored
fix: prevent despecialization of object store methods (#3506)
Adding enforcement that wrappers implement all ObjectStore methods. If they don't, then the specialized implementations in the base `ObjectStore` could be bypasses, causing key optimizations to be missed out on. I didn't see any real cases here, but adding this to be cautious. `delete_stream()` a common one we might worry about it, but it seems like tracing store already had this.
1 parent 87f055f commit 5d1c84f

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

rust/lance-io/src/object_store/tracing.rs

+31
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ impl std::fmt::Display for TracedObjectStore {
5555
}
5656

5757
#[async_trait::async_trait]
58+
#[deny(clippy::missing_trait_methods)]
5859
impl object_store::ObjectStore for TracedObjectStore {
5960
#[instrument(level = "debug", skip(self, bytes))]
6061
async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult<PutResult> {
@@ -71,6 +72,17 @@ impl object_store::ObjectStore for TracedObjectStore {
7172
self.target.put_opts(location, bytes, opts).await
7273
}
7374

75+
async fn put_multipart(
76+
&self,
77+
location: &Path,
78+
) -> OSResult<Box<dyn object_store::MultipartUpload>> {
79+
let upload = self.target.put_multipart(location).await?;
80+
Ok(Box::new(TracedMultipartUpload {
81+
target: upload,
82+
write_span: debug_span!("put_multipart"),
83+
}))
84+
}
85+
7486
async fn put_multipart_opts(
7587
&self,
7688
location: &Path,
@@ -83,6 +95,11 @@ impl object_store::ObjectStore for TracedObjectStore {
8395
}))
8496
}
8597

98+
#[instrument(level = "debug", skip(self, location))]
99+
async fn get(&self, location: &Path) -> OSResult<GetResult> {
100+
self.target.get(location).await
101+
}
102+
86103
#[instrument(level = "debug", skip(self, options))]
87104
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
88105
self.target.get_opts(location, options).await
@@ -121,6 +138,15 @@ impl object_store::ObjectStore for TracedObjectStore {
121138
self.target.list(prefix)
122139
}
123140

141+
#[instrument(level = "debug", skip(self))]
142+
fn list_with_offset(
143+
&self,
144+
prefix: Option<&Path>,
145+
offset: &Path,
146+
) -> BoxStream<'_, OSResult<ObjectMeta>> {
147+
self.target.list_with_offset(prefix, offset)
148+
}
149+
124150
#[instrument(level = "debug", skip(self))]
125151
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
126152
self.target.list_with_delimiter(prefix).await
@@ -136,6 +162,11 @@ impl object_store::ObjectStore for TracedObjectStore {
136162
self.target.rename(from, to).await
137163
}
138164

165+
#[instrument(level = "debug", skip(self))]
166+
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
167+
self.target.rename_if_not_exists(from, to).await
168+
}
169+
139170
#[instrument(level = "debug", skip(self))]
140171
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
141172
self.target.copy_if_not_exists(from, to).await

rust/lance/src/utils/test.rs

+37
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,13 @@ impl IoTrackingStore {
325325
}
326326

327327
#[async_trait::async_trait]
328+
#[deny(clippy::missing_trait_methods)]
328329
impl ObjectStore for IoTrackingStore {
330+
async fn put(&self, location: &Path, bytes: PutPayload) -> OSResult<PutResult> {
331+
self.record_write(bytes.content_length() as u64);
332+
self.target.put(location, bytes).await
333+
}
334+
329335
async fn put_opts(
330336
&self,
331337
location: &Path,
@@ -336,6 +342,14 @@ impl ObjectStore for IoTrackingStore {
336342
self.target.put_opts(location, bytes, opts).await
337343
}
338344

345+
async fn put_multipart(&self, location: &Path) -> OSResult<Box<dyn MultipartUpload>> {
346+
let target = self.target.put_multipart(location).await?;
347+
Ok(Box::new(IoTrackingMultipartUpload {
348+
target,
349+
stats: self.stats.clone(),
350+
}))
351+
}
352+
339353
async fn put_multipart_opts(
340354
&self,
341355
location: &Path,
@@ -348,6 +362,15 @@ impl ObjectStore for IoTrackingStore {
348362
}))
349363
}
350364

365+
async fn get(&self, location: &Path) -> OSResult<GetResult> {
366+
let result = self.target.get(location).await;
367+
if let Ok(result) = &result {
368+
let num_bytes = result.range.end - result.range.start;
369+
self.record_read(num_bytes as u64);
370+
}
371+
result
372+
}
373+
351374
async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult<GetResult> {
352375
let result = self.target.get_opts(location, options).await;
353376
if let Ok(result) = &result {
@@ -394,6 +417,15 @@ impl ObjectStore for IoTrackingStore {
394417
self.target.list(prefix)
395418
}
396419

420+
fn list_with_offset(
421+
&self,
422+
prefix: Option<&Path>,
423+
offset: &Path,
424+
) -> BoxStream<'_, OSResult<ObjectMeta>> {
425+
self.record_read(0);
426+
self.target.list_with_offset(prefix, offset)
427+
}
428+
397429
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult<ListResult> {
398430
self.record_read(0);
399431
self.target.list_with_delimiter(prefix).await
@@ -409,6 +441,11 @@ impl ObjectStore for IoTrackingStore {
409441
self.target.rename(from, to).await
410442
}
411443

444+
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
445+
self.record_write(0);
446+
self.target.rename_if_not_exists(from, to).await
447+
}
448+
412449
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> {
413450
self.record_write(0);
414451
self.target.copy_if_not_exists(from, to).await

0 commit comments

Comments
 (0)