Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[db] Fix inserting value with the same key #671 #672

Merged
merged 2 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions agdb/src/collections/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,8 @@ where
}

pub fn insert(&mut self, storage: &mut S, key: &K, value: &T) -> Result<Option<T>, DbError> {
let old_value = self.value(key)?;

if let Some(old) = &old_value {
self.multi_map.replace(storage, key, old, value)?;
} else {
self.multi_map.insert(storage, key, value)?;
}

Ok(old_value)
self.multi_map
.insert_or_replace(storage, key, |_| true, value)
}

#[allow(dead_code)]
Expand Down
120 changes: 64 additions & 56 deletions agdb/src/collections/multi_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,51 @@ where
pub fn insert(&mut self, storage: &mut S, key: &K, value: &T) -> Result<(), DbError> {
let id = self.data.transaction(storage);
let index = self.free_index(storage, key)?;
self.data.set_state(storage, index, MapValueState::Valid)?;
self.data.set_key(storage, index, key)?;
self.data.set_value(storage, index, value)?;
self.data.set_len(storage, self.len() + 1)?;
self.do_insert(storage, index, key, value)?;
self.data.commit(storage, id)
}

pub fn insert_or_replace<P: Fn(&T) -> bool>(
&mut self,
storage: &mut S,
key: &K,
predicate: P,
new_value: &T,
) -> Result<Option<T>, DbError> {
let id = self.data.transaction(storage);

if self.len() >= self.max_len() {
self.rehash(storage, self.capacity() * 2)?;
}

let hash = key.stable_hash();
let mut pos = hash % self.capacity();
let mut ret = None;

loop {
match self.data.state(pos)? {
MapValueState::Empty => {
self.do_insert(storage, pos, key, new_value)?;
break;
}
MapValueState::Valid if self.data.key(pos)? == *key => {
let old_value = self.data.value(pos)?;
if predicate(&old_value) {
self.data.set_value(storage, pos, new_value)?;
ret = Some(old_value);
break;
} else {
pos = self.next_pos(pos)
}
}
MapValueState::Valid | MapValueState::Deleted => pos = self.next_pos(pos),
}
}

self.data.commit(storage, id)?;
Ok(ret)
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}
Expand Down Expand Up @@ -213,38 +251,6 @@ where
self.data.commit(storage, id)
}

pub fn replace(
&mut self,
storage: &mut S,
key: &K,
value: &T,
new_value: &T,
) -> Result<(), DbError> {
if self.capacity() == 0 {
return Ok(());
}

let hash = key.stable_hash();
let mut pos = hash % self.capacity();

let id = self.data.transaction(storage);

loop {
match self.data.state(pos)? {
MapValueState::Empty => break,
MapValueState::Valid
if self.data.key(pos)? == *key && self.data.value(pos)? == *value =>
{
self.data.set_value(storage, pos, new_value)?;
break;
}
MapValueState::Valid | MapValueState::Deleted => pos = self.next_pos(pos),
}
}

self.data.commit(storage, id)
}

pub fn reserve(&mut self, storage: &mut S, capacity: u64) -> Result<(), DbError> {
if self.capacity() < capacity {
self.rehash(storage, capacity)?;
Expand Down Expand Up @@ -322,6 +328,19 @@ where
Ok(result)
}

fn do_insert(
&mut self,
storage: &mut S,
index: u64,
key: &K,
value: &T,
) -> Result<(), DbError> {
self.data.set_state(storage, index, MapValueState::Valid)?;
self.data.set_key(storage, index, key)?;
self.data.set_value(storage, index, value)?;
self.data.set_len(storage, self.len() + 1)
}

fn drop_value(&mut self, storage: &mut S, pos: u64) -> Result<(), DbError> {
self.data.set_state(storage, pos, MapValueState::Deleted)?;
self.data.set_key(storage, pos, &K::default())?;
Expand Down Expand Up @@ -628,31 +647,23 @@ mod tests {
let test_file = TestFile::new();
let mut storage = FileStorage::new(test_file.file_name()).unwrap();
let mut map = MultiMapStorage::<u64, String>::new(&mut storage).unwrap();

let p = |v: &String| v == "Hello";
assert!(map
.replace(
&mut storage,
&10,
&"Hello".to_string(),
&"World".to_string()
)
.insert_or_replace(&mut storage, &10, p, &"World".to_string())
.is_ok());
p(&"".to_string());
}

#[test]
fn replace_missing_value() {
let test_file = TestFile::new();
let mut storage = FileStorage::new(test_file.file_name()).unwrap();
let mut map = MultiMapStorage::<u64, String>::new(&mut storage).unwrap();
map.insert(&mut storage, &10, &"World".to_string()).unwrap();
map.insert(&mut storage, &11, &"Hello".to_string()).unwrap();

assert!(map
.replace(
&mut storage,
&10,
&"Hello".to_string(),
&"World".to_string()
)
.insert_or_replace(&mut storage, &10, |v| v == "Hello", &"World".to_string())
.is_ok());
}

Expand All @@ -662,15 +673,12 @@ mod tests {
let mut storage = FileStorage::new(test_file.file_name()).unwrap();
let mut map = MultiMapStorage::<u64, String>::new(&mut storage).unwrap();
map.insert(&mut storage, &10, &"Hello".to_string()).unwrap();
map.remove_key(&mut storage, &10).unwrap();
map.insert(&mut storage, &10, &"World".to_string()).unwrap();
map.remove_value(&mut storage, &10, &"Hello".to_string())
.unwrap();

assert!(map
.replace(
&mut storage,
&10,
&"Hello".to_string(),
&"World".to_string()
)
.insert_or_replace(&mut storage, &10, |v| v == "Hello", &"World".to_string())
.is_ok());
}

Expand Down
1 change: 1 addition & 0 deletions agdb/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ pub(crate) enum Command {
RemoveEdge { index: GraphIndex },
RemoveKeyValue { id: DbId, key_value: DbKeyValue },
RemoveNode { index: GraphIndex },
ReplaceKeyValue { id: DbId, key_value: DbKeyValue },
}
34 changes: 34 additions & 0 deletions agdb/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,15 @@ impl Db {
Command::RemoveNode { index } => {
self.graph.remove_node(&mut self.storage, *index)?
}
Command::ReplaceKeyValue { id, key_value } => {
self.values.insert_or_replace(
&mut self.storage,
id,
|v| v.key == key_value.key,
key_value,
)?;
return Ok(());
}
}
}

Expand Down Expand Up @@ -401,6 +410,31 @@ impl Db {
Ok(())
}

pub(crate) fn insert_or_replace_key_value(
&mut self,
db_id: DbId,
key_value: &DbKeyValue,
) -> Result<(), QueryError> {
if let Some(old) = self.values.insert_or_replace(
&mut self.storage,
&db_id,
|kv| kv.key == key_value.key,
key_value,
)? {
self.undo_stack.push(Command::ReplaceKeyValue {
id: db_id,
key_value: old,
});
} else {
self.undo_stack.push(Command::RemoveKeyValue {
id: db_id,
key_value: key_value.clone(),
});
}

Ok(())
}

pub(crate) fn keys(&self, db_id: DbId) -> Result<Vec<DbKeyValue>, DbError> {
Ok(self
.values
Expand Down
9 changes: 5 additions & 4 deletions agdb/src/query/insert_values_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ impl QueryMut for InsertValuesQuery {
QueryValues::Single(values) => {
for id in ids {
let db_id = db.db_id(id)?;

for key_value in values {
db.insert_key_value(db_id, key_value)?;
db.insert_or_replace_key_value(db_id, key_value)?;
result.result += 1;
}
}
Expand All @@ -45,7 +46,7 @@ impl QueryMut for InsertValuesQuery {
for (id, values) in ids.iter().zip(values) {
let db_id = db.db_id(id)?;
for key_value in values {
db.insert_key_value(db_id, key_value)?;
db.insert_or_replace_key_value(db_id, key_value)?;
result.result += 1;
}
}
Expand All @@ -58,7 +59,7 @@ impl QueryMut for InsertValuesQuery {
QueryValues::Single(values) => {
for db_id in db_ids {
for key_value in values {
db.insert_key_value(db_id, key_value)?;
db.insert_or_replace_key_value(db_id, key_value)?;
result.result += 1;
}
}
Expand All @@ -70,7 +71,7 @@ impl QueryMut for InsertValuesQuery {

for (db_id, values) in db_ids.iter().zip(values) {
for key_value in values {
db.insert_key_value(*db_id, key_value)?;
db.insert_or_replace_key_value(*db_id, key_value)?;
result.result += 1;
}
}
Expand Down
59 changes: 59 additions & 0 deletions agdb/tests/insert_values_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,62 @@ fn insert_values_search_invalid_length() {
"Ids and values length do not match",
);
}

#[test]
fn insert_values_overwrite() {
let mut db = TestDb::new();
db.exec_mut(
QueryBuilder::insert()
.nodes()
.values(vec![vec![("key", 10).into()]])
.query(),
1,
);
db.exec_mut(
QueryBuilder::insert()
.values_uniform(vec![("key", 20).into(), ("key2", 30).into()])
.ids(1)
.query(),
2,
);
db.exec_elements(
QueryBuilder::select().ids(1).query(),
&[DbElement {
id: DbId(1),
values: vec![("key", 20).into(), ("key2", 30).into()],
}],
)
}

#[test]
fn insert_values_overwrite_transaction() {
let mut db = TestDb::new();
db.exec_mut(
QueryBuilder::insert()
.nodes()
.values(vec![vec![("key", 10).into()]])
.query(),
1,
);

db.transaction_mut_error(
|t| -> Result<(), QueryError> {
t.exec_mut(
&QueryBuilder::insert()
.values_uniform(vec![("key", 20).into(), ("key2", 30).into()])
.ids(1)
.query(),
)?;
Err(QueryError::from("error"))
},
QueryError::from("error"),
);

db.exec_elements(
QueryBuilder::select().ids(1).query(),
&[DbElement {
id: DbId(1),
values: vec![("key", 10).into()],
}],
)
}