Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[db] Deadlock in the db upon repeated execution of remove queries #1130 #1134

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/agdb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- uses: actions-rust-lang/setup-rust-toolchain@v1
- uses: taiki-e/install-action@cargo-llvm-cov
- run: rustup component add llvm-tools-preview
- run: cargo llvm-cov --package agdb --package agdb_derive --all-features --ignore-filename-regex "agdb_derive" --fail-uncovered-functions 0 --fail-uncovered-lines 1 --show-missing-lines
- run: cargo llvm-cov --package agdb --package agdb_derive --all-features --ignore-filename-regex "agdb_derive" --fail-uncovered-functions 0 --fail-uncovered-lines 0 --show-missing-lines

agdb_format:
runs-on: ubuntu-latest
Expand Down
106 changes: 94 additions & 12 deletions agdb/src/collections/multi_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ where

loop {
match self.data.state(storage, pos)? {
MapValueState::Empty => {
MapValueState::Empty | MapValueState::Deleted => {
self.do_insert(storage, pos, key, new_value)?;
break;
}
Expand All @@ -170,7 +170,7 @@ where
pos = self.next_pos(pos)
}
}
MapValueState::Valid | MapValueState::Deleted => pos = self.next_pos(pos),
MapValueState::Valid => pos = self.next_pos(pos),
}
}

Expand Down Expand Up @@ -222,6 +222,7 @@ where

let hash = key.stable_hash();
let mut pos = hash % self.capacity();
let start_pos = pos;
let mut len = self.len();

let id = self.data.transaction(storage);
Expand All @@ -237,6 +238,13 @@ where
}

pos = self.next_pos(pos);

if pos == start_pos {
if len == self.len() {
self.rehash(storage, self.capacity())?;
}
break;
}
}

if len != self.len() {
Expand All @@ -262,7 +270,7 @@ where

let hash = key.stable_hash();
let mut pos = hash % self.capacity();

let start_pos = pos;
let id = self.data.transaction(storage);

loop {
Expand All @@ -277,6 +285,11 @@ where
}
MapValueState::Valid | MapValueState::Deleted => pos = self.next_pos(pos),
}

if pos == start_pos {
self.rehash(storage, self.capacity())?;
break;
}
}

self.data.commit(storage, id)
Expand Down Expand Up @@ -431,8 +444,9 @@ where
let new_capacity = std::cmp::max(capacity, 64_u64);

match current_capacity.cmp(&new_capacity) {
std::cmp::Ordering::Less => self.grow(storage, current_capacity, new_capacity),
std::cmp::Ordering::Equal => Ok(()),
std::cmp::Ordering::Less | std::cmp::Ordering::Equal => {
self.grow(storage, current_capacity, new_capacity)
}
std::cmp::Ordering::Greater => self.shrink(storage, current_capacity, new_capacity),
}
}
Expand Down Expand Up @@ -519,13 +533,8 @@ where
let mut occupancy = BitSet::with_capacity(new_capacity);

while i != current_capacity {
self.rehash_value(
storage,
self.data.state(storage, i)?,
&mut i,
new_capacity,
&mut occupancy,
)?;
let state = self.data.state(storage, i)?;
self.rehash_value(storage, state, &mut i, new_capacity, &mut occupancy)?;
}

Ok(())
Expand Down Expand Up @@ -845,4 +854,77 @@ mod tests {
assert_eq!(value, Some(i.to_string()));
}
}

#[test]
fn deadlock_test_key() {
let mut storage: Storage<MemoryStorage> = Storage::new("test").unwrap();
let mut map = MultiMapStorage::<u64, u64, MemoryStorage>::new(&mut storage).unwrap();

for i in 0_u64..60 {
map.insert(&mut storage, &i, &i).unwrap();
}

map.remove_key(&mut storage, &0).unwrap();
map.remove_key(&mut storage, &1).unwrap();
map.remove_key(&mut storage, &2).unwrap();
map.remove_key(&mut storage, &3).unwrap();

map.insert(&mut storage, &60, &60).unwrap();
map.insert(&mut storage, &61, &61).unwrap();
map.insert(&mut storage, &62, &62).unwrap();
map.insert(&mut storage, &63, &63).unwrap();

map.remove_key(&mut storage, &32).unwrap();
map.remove_key(&mut storage, &0).unwrap();
}

#[test]
fn deadlock_test_value() {
let mut storage: Storage<MemoryStorage> = Storage::new("test").unwrap();
let mut map = MultiMapStorage::<u64, u64, MemoryStorage>::new(&mut storage).unwrap();

for i in 0_u64..60 {
map.insert(&mut storage, &i, &i).unwrap();
}

map.remove_key(&mut storage, &0).unwrap();
map.remove_key(&mut storage, &1).unwrap();
map.remove_key(&mut storage, &2).unwrap();
map.remove_key(&mut storage, &3).unwrap();

map.insert(&mut storage, &60, &60).unwrap();
map.insert(&mut storage, &61, &61).unwrap();
map.insert(&mut storage, &62, &62).unwrap();
map.insert(&mut storage, &63, &63).unwrap();

map.remove_value(&mut storage, &32, &32).unwrap();
map.remove_value(&mut storage, &32, &0).unwrap();
}

#[test]
fn iterate_over_deleted() {
let mut storage: Storage<MemoryStorage> = Storage::new("test").unwrap();
let mut map = MultiMapStorage::<u64, u64, MemoryStorage>::new(&mut storage).unwrap();

for i in 0_u64..20 {
map.insert(&mut storage, &i, &i).unwrap();
map.insert(&mut storage, &i, &(i + 1)).unwrap();
map.insert(&mut storage, &i, &(i + 2)).unwrap();
}

map.remove_value(&mut storage, &10, &10).unwrap();
map.remove_value(&mut storage, &10, &11).unwrap();

assert!(map.contains(&storage, &10).unwrap());
assert!(map.contains_value(&storage, &10, &12).unwrap());

let values = map.iter(&storage).collect::<Vec<(u64, u64)>>();

assert!(!values.contains(&(10, 10)));
assert!(!values.contains(&(10, 11)));
assert!(values.contains(&(10, 12)));

let keys = map.iter_key(&storage, &10).collect::<Vec<(u64, u64)>>();
assert_eq!(keys, vec![(10, 12)]);
}
}
13 changes: 13 additions & 0 deletions agdb/tests/db_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@ use test_db::TestDb;
#[allow(unused_imports)]
#[test]
fn public_types() {
use agdb::AgdbSerialize;
use agdb::Comparison;
use agdb::CountComparison;
use agdb::Db;
use agdb::DbElement;
use agdb::DbError;
use agdb::DbF64;
use agdb::DbFile;
use agdb::DbFileTransaction;
use agdb::DbFileTransactionMut;
use agdb::DbId;
use agdb::DbImpl;
use agdb::DbKeyOrder;
use agdb::DbKeyValue;
use agdb::DbMemory;
use agdb::DbMemoryTransaction;
use agdb::DbMemoryTransactionMut;
use agdb::DbTransaction;
use agdb::DbTransactionMut;
use agdb::DbUserValue;
Expand All @@ -38,6 +44,7 @@ fn public_types() {
use agdb::FileStorageMemoryMapped;
use agdb::InsertAliasesQuery;
use agdb::InsertEdgesQuery;
use agdb::InsertIndexQuery;
use agdb::InsertNodesQuery;
use agdb::InsertValuesQuery;
use agdb::MemoryStorage;
Expand All @@ -57,13 +64,19 @@ fn public_types() {
use agdb::RemoveQuery;
use agdb::RemoveValuesQuery;
use agdb::SearchQuery;
use agdb::SearchQueryAlgorithm;
use agdb::SelectAliasesQuery;
use agdb::SelectAllAliasesQuery;
use agdb::SelectEdgeCountQuery;
use agdb::SelectIndexesQuery;
use agdb::SelectKeyCountQuery;
use agdb::SelectKeysQuery;
use agdb::SelectNodeCountQuery;
use agdb::SelectQuery;
use agdb::SelectValuesQuery;
use agdb::StableHash;
use agdb::StorageData;
use agdb::StorageSlice;
use agdb::Transaction;
use agdb::TransactionMut;
use agdb::UserValue;
Expand Down
2 changes: 2 additions & 0 deletions agdb_api/typescript/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ cd ../../agdb_server
rm -f agdb_server.yaml
rm -f .agdb_server.agdb
rm -f agdb_server.agdb
rm -rf agdb_server_data
cargo build --release
cargo run --release &
cd ../agdb_api/typescript
Expand All @@ -14,4 +15,5 @@ curl -H "Authorization: Bearer $token" -X POST http://localhost:3000/api/v1/admi
rm -f agdb_server.yaml
rm -f .agdb_server.agdb
rm -f agdb_server.agdb
rm -rf agdb_server_data
exit $error_code
51 changes: 31 additions & 20 deletions agdb_server/src/db_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,35 +472,46 @@ impl DbPool {
return Err(permission_denied("write rights required"));
}

let pool = self.get_pool().await;
let server_db = pool.get(&db_name).ok_or(db_not_found(&db_name))?;
let results = if required_role == DbUserRole::Read {
server_db.get().await.transaction(|t| {
let mut results = vec![];
self.get_pool()
.await
.get(&db_name)
.ok_or(db_not_found(&db_name))?
.get()
.await
.transaction(|t| {
let mut results = vec![];

for q in queries.0.iter_mut() {
let result = t_exec(t, q, &results)?;
results.push(result);
}
for q in queries.0.iter_mut() {
let result = t_exec(t, q, &results)?;
results.push(result);
}

Ok(results)
})
Ok(results)
})
} else {
let username = self.user_name(user).await?;
let mut audit = vec![];

let r = server_db.get_mut().await.transaction_mut(|t| {
let mut results = vec![];
let mut qs = vec![];
std::mem::swap(&mut queries.0, &mut qs);
let r = self
.get_pool()
.await
.get(&db_name)
.ok_or(db_not_found(&db_name))?
.get_mut()
.await
.transaction_mut(|t| {
let mut results = vec![];
let mut qs = vec![];
std::mem::swap(&mut queries.0, &mut qs);

for q in qs {
let result = t_exec_mut(t, q, &results, &mut audit, &username)?;
results.push(result);
}
for q in qs {
let result = t_exec_mut(t, q, &results, &mut audit, &username)?;
results.push(result);
}

Ok(results)
});
Ok(results)
});
if r.is_ok() && !audit.is_empty() {
let mut log = std::fs::OpenOptions::new()
.create(true)
Expand Down
Loading