diff --git a/Cargo.lock b/Cargo.lock index c237681..7c4e08a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -245,6 +245,7 @@ version = "0.2.0" dependencies = [ "criterion", "left-right", + "lockfree", "once_cell", "radix_trie", "serde", @@ -297,6 +298,15 @@ version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "lockfree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74ee94b5ad113c7cb98c5a040f783d0952ee4fe100993881d1673c2cb002dd23" +dependencies = [ + "owned-alloc", +] + [[package]] name = "log" version = "0.4.22" @@ -383,6 +393,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owned-alloc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30fceb411f9a12ff9222c5f824026be368ff15dc2f13468d850c7d3f502205d6" + [[package]] name = "pin-project-lite" version = "0.2.14" diff --git a/Cargo.toml b/Cargo.toml index ac978b9..67d8eea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ keywords = ["string", "interner", "caching"] [dependencies] left-right = "0.11.5" +lockfree = "0.5.1" once_cell = "1.19.0" radix_trie = "0.2.1" serde = { version = "1.0", optional = true } diff --git a/README.md b/README.md index 475c638..d41558c 100644 --- a/README.md +++ b/README.md @@ -70,9 +70,6 @@ scaling for writes. ## Planned Improvements -- Make `IString::from` (in the already interned case), `IString::clone`, and `IString::drop` - lock free. - - Replace or rewrite the radix tree to make it reuse the string storage, instead of storing a clone of the each interned string. Currently the crate uses 2x the interned string storage space because of this (1x in storage, diff --git a/src/lib.rs b/src/lib.rs index d3c221c..503d844 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,15 +5,17 @@ mod storage; /// An immutable and interned string. /// -/// Reading an `IString`'s contents is very fast, lock free and wait free (thanks to `left_right`). -/// Can be shared and read from any number of threads. -/// Scales linearly with the number of reading threads. +/// Reading an `IString`'s contents is very fast, lock-free and wait-free. +/// It can be shared and read from any number of threads. +/// It scales linearly with the number of reading threads. +/// +/// `IString` provides `Hash` and `Eq` implementations that run in O(1), +/// perfect for an high performance `HashMap` /// /// The tradeoff is that creating a new `IString` is comparatively slower : -/// - Creating a new `IString` with a string that is already interned is generally fast. -/// It acquires a global lock. -/// - Creating a new `IString` with a string that isn't already interned is much slower. -/// It acquired a global lock and waits for all readers to finish reading. +/// - Creating a new `IString` with a string that is already interned is fast and lock-free. +/// - Creating a new `IString` with a string that isn't already interned is slower. +/// It acquires a global lock and waits for all readers to finish reading. #[derive(Eq, PartialEq, Ord, Hash)] pub struct IString { pub(crate) key: IStringKey @@ -22,18 +24,46 @@ pub struct IString { // Indispensable traits impl : From, Drop, Deref impl From for IString { + /// Intern the given `String` by consuming it. Its allocation is reused. + /// + /// This operation runs in O(N) where N is the `string.len()`. + /// If the string was already interned, this operation is lock-free. + /// Otherwise, a global lock is acquired. + /// + /// # Example + /// + /// ``` + /// use interned_string::IString; + /// + /// let my_istring = IString::from("hello".to_string()); + /// ``` #[inline] fn from(string: String) -> Self { Self { + // could block key: SHARED_STORAGE.insert_or_retain(string) } } } impl From<&str> for IString { + /// Intern the given `&str` by cloning its contents. + /// + /// This operation runs in O(N) where N is the `string.len()`. + /// If the string was already interned, this operation is lock-free. + /// Otherwise, a global lock is acquired. + /// + /// # Example + /// + /// ``` + /// use interned_string::IString; + /// + /// let my_istring = IString::from("hello"); + /// ``` #[inline] fn from(string: &str) -> Self { Self { + // could block key: SHARED_STORAGE.insert_or_retain(String::from(string)) } } @@ -42,13 +72,31 @@ impl From<&str> for IString { impl Drop for IString { #[inline] fn drop(&mut self) { - SHARED_STORAGE.release(self) + THREAD_LOCAL_READER.with(|tl_reader| { + tl_reader.release(self); + }); } } impl Deref for IString { type Target = str; + /// Returns a reference to the string's contents. + /// + /// This operation runs in O(1) and is lock-free. + /// + /// # Example + /// ``` + /// use interned_string::Intern; + /// + /// fn foo(string: &str) { + /// println!("{string}") + /// } + /// + /// let my_istring = "hello".intern(); + /// // implicit call to Deref::deref + /// foo(&my_istring); + /// ``` #[inline] fn deref(&self) -> &Self::Target { THREAD_LOCAL_READER.with(|reader: &ThreadLocalReader| { @@ -58,10 +106,21 @@ impl Deref for IString { } impl AsRef for IString { + /// Returns a reference to the string's contents. + /// + /// This operation runs in O(1) and is lock-free. + /// + /// # Example + /// ``` + /// use interned_string::Intern; + /// + /// let my_istring = "Hello, World!".intern(); + /// let (hello, world) = my_istring.as_ref().split_at(5); + /// ``` #[inline] fn as_ref(&self) -> &str { - THREAD_LOCAL_READER.with(|reader: &ThreadLocalReader| { - reader.read(self) + THREAD_LOCAL_READER.with(|tl_reader: &ThreadLocalReader| { + tl_reader.read(self) }) } } @@ -69,30 +128,41 @@ impl AsRef for IString { // Common traits impl that can't be derived : Clone, PartialOrd, Debug, Display, Default impl Clone for IString { + /// Returns a copy of the `IString`. + /// + /// This operation runs in O(1) and is lock-free. + #[inline] fn clone(&self) -> Self { - SHARED_STORAGE.retain(self.key); + THREAD_LOCAL_READER.with(|reader: &ThreadLocalReader| { + reader.retain(self.key) + }); Self { key: self.key } } } impl PartialOrd for IString { + #[inline] fn lt(&self, other: &Self) -> bool { self.deref().lt(other.deref()) } + #[inline] fn le(&self, other: &Self) -> bool { self.deref().le(other.deref()) } + #[inline] fn gt(&self, other: &Self) -> bool { self.deref().gt(other.deref()) } + #[inline] fn ge(&self, other: &Self) -> bool { self.deref().ge(other.deref()) } + #[inline] fn partial_cmp(&self, other: &Self) -> Option { self.deref().partial_cmp(other.deref()) } @@ -114,6 +184,8 @@ impl std::fmt::Display for IString { } impl Default for IString { + /// Creates an empty `IString`. + #[inline] fn default() -> Self { Self::from(String::default()) } @@ -126,6 +198,19 @@ pub trait Intern { } impl Intern for String { + /// Intern the given `String` by consuming it. Its allocation is reused. + /// + /// This operation runs in O(N) where N is the `string.len()`. + /// If the string was already interned, this operation is lock-free. + /// Otherwise, a global lock is acquired. + /// + /// # Example + /// + /// ``` + /// use interned_string::Intern; + /// + /// let my_istring = "hello".to_string().intern(); + /// ``` #[inline] fn intern(self) -> IString { IString::from(self) @@ -133,12 +218,41 @@ impl Intern for String { } impl Intern for &str { + /// Intern the given `&str` by cloning its contents. + /// + /// This operation runs in O(N) where N is the `string.len()`. + /// If the string was already interned, this operation is lock-free. + /// Otherwise, a global lock is acquired. + /// + /// # Example + /// + /// ``` + /// use interned_string::Intern; + /// + /// let my_istring = "hello".intern(); + /// ``` #[inline] fn intern(self) -> IString { IString::from(self) } } +// Garbage collection + +impl IString { + /// Immediately frees all the interned strings that are no longer used. + /// + /// Call this function when you wish to immediately reduce memory usage, + /// at the cost of some CPU time. + /// This will acquire a global lock and wait for all readers to finish reading. + /// It's recommended to only call this function when your program has nothing else to do. + /// + /// Using this function is optional. Memory is always eventually freed. + pub fn collect_garbage_now() { + SHARED_STORAGE.writer.lock().unwrap().collect_garbage(); + } +} + #[cfg(feature = "serde")] mod feature_serde { use serde::{de::Visitor, Deserialize, Serialize}; @@ -370,6 +484,7 @@ mod tests { // reset the writer for the next test let mut writer = SHARED_STORAGE.writer.lock().unwrap(); + writer.drain_channel_ops(); writer.write_handle.append(storage::StringStorageOp::DropUnusedStrings); writer.write_handle.publish(); drop(writer); diff --git a/src/storage.rs b/src/storage.rs index 654ee5a..2429597 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -3,57 +3,131 @@ use std::{ collections::HashMap, mem::MaybeUninit, ops::Deref, - sync::Mutex + sync::Mutex, }; use left_right::{Absorb, ReadHandle, WriteHandle}; use once_cell::sync::Lazy; use radix_trie::{Trie, TrieKey}; +use lockfree::channel::{mpsc, RecvErr}; use crate::IString; pub(crate) type IStringKey = u32; pub(crate) enum StringStorageOp { + /// Insert the string in storage with the given key. Insert { key: IStringKey, string: BoxedStr }, + /// Increment the `strong_count` of the stored string with the given key. Retain { key: IStringKey }, - // Note: releasing a string does not immediately free the storage, you have to run DropUnusedStrings as well. + /// Decrement the `strong_count` of the stored string with the given key. Release { key: IStringKey }, + /// Drop (and eventually free) all stored strings that are no longer used. DropUnusedStrings, } +#[derive(Debug)] +enum ChannelOp { + /// Eventually increment the `strong_count` of the stored string with the given key. + Retain { key: IStringKey }, + /// Eventually decrement the `strong_count` of the stored string with the given key. + Release { key: IStringKey }, +} + pub(crate) struct UniqueWriter { pub(crate) write_handle: WriteHandle, next_key: IStringKey, + ops_channel_receiver: mpsc::Receiver, +} + +impl UniqueWriter { + fn do_pending_ops_and_insert(&mut self, string: BoxedStr) -> IStringKey { + // add pending operations + self.drain_channel_ops(); + + // insert + let key = self.next_key; + // TODO: scan the storage for reusable keys when it overflows, instead of panic'ing + self.next_key = self.next_key.checked_add(1).unwrap(); + self.write_handle.append(StringStorageOp::Insert { key, string }); + + // drop what is unused + self.write_handle.append(StringStorageOp::DropUnusedStrings); + + // block until readers are done + self.write_handle.publish(); + return key; + } + + pub(crate) fn drain_channel_ops(&mut self) { + loop { + match self.ops_channel_receiver.recv() { + Ok(operation) => { + match operation { + ChannelOp::Retain { key } => { + self.write_handle.append(StringStorageOp::Retain { key }) + }, + ChannelOp::Release { key } => { + self.write_handle.append(StringStorageOp::Release { key }) + }, + }; + } + Err(RecvErr::NoMessage) => { + // the channel is empty + return; + }, + Err(RecvErr::NoSender) => { + // the sending threads went away + return; + } + } + } + } + + pub(crate) fn collect_garbage(&mut self) { + // add pending operations + self.drain_channel_ops(); + // drop what is unused + self.write_handle.append(StringStorageOp::DropUnusedStrings); + // block until readers are done + self.write_handle.publish(); + } } // Needs to be Sync, so we need to use Mutex pub(crate) struct ConcurrentStringStorage { pub(crate) writer: Mutex, pub(crate) read_handle: Mutex>, + ops_channel_sender: mpsc::Sender } impl ConcurrentStringStorage { fn new() -> Self { let (write_handle, read_handle) = left_right::new::(); + let (sender, receiver) = mpsc::create(); Self { writer: Mutex::new(UniqueWriter { - write_handle: write_handle, + write_handle, next_key: 0, + ops_channel_receiver: receiver, }), read_handle: Mutex::new(read_handle), + ops_channel_sender: sender, } } pub(crate) fn insert_or_retain(&self, string: String) -> IStringKey { let boxed: BoxedStr = string.into(); - let found_key: Option = THREAD_LOCAL_READER.with(|reader: &ThreadLocalReader| { - let storage = reader.read_handle.enter().expect("reader is available"); - return storage.trie.get(&boxed).copied(); + let found_key: Option = THREAD_LOCAL_READER.with(|tl_reader: &ThreadLocalReader| { + let storage = tl_reader.read_handle.enter().expect("reader is available"); + if let Some(found_key) = storage.trie.get(&boxed).copied() { + tl_reader.retain(found_key); + return Some(found_key); + } + return None; }); if let Some(key) = found_key { // string is already in storage - self.retain(key); return key; } else { // string is not in storage yet @@ -63,46 +137,36 @@ impl ConcurrentStringStorage { fn insert(&self, string: BoxedStr) -> IStringKey { let mut writer = self.writer.lock().unwrap(); - let key = writer.next_key; - // TODO: scan the storage for reusable keys when it overflows, instead of panic'ing - writer.next_key = writer.next_key.checked_add(1).unwrap(); - writer.write_handle.append(StringStorageOp::Insert { key, string }); - writer.write_handle.append(StringStorageOp::DropUnusedStrings); - writer.write_handle.publish(); - return key; - } - - pub(crate) fn retain(&self, key: IStringKey) { - let mut writer = self.writer.lock().unwrap(); - writer.write_handle.append(StringStorageOp::Retain { key }); - // optimisation: do not publish here - } - - // Not sure if inlining this one is a clear win: - // it's used by IString::drop and will potentially be called from a truckload of places - // in client code and may cause code bloat without giving much of a perf win because - // - it's acquiring a lock, which is expensive anyway - // - WriteHandle::append inlines to a bunch of code so Self::release can be quite big - pub(crate) fn release(&self, istring: &mut IString) { - let mut writer = self.writer.lock().unwrap(); - writer.write_handle.append(StringStorageOp::Release { key: istring.key }); - // optimisation: do not publish here + return writer.do_pending_ops_and_insert(string); } } // does not need to be Sync nor Send :-) pub(crate) struct ThreadLocalReader { - read_handle: ReadHandle + read_handle: ReadHandle, + ops_channel_sender: mpsc::Sender, } impl ThreadLocalReader { fn from(css: &ConcurrentStringStorage) -> Self { Self { read_handle: css.read_handle.lock().unwrap().clone(), + ops_channel_sender: css.ops_channel_sender.clone(), } } - #[inline] + pub(crate) fn retain(&self, key: IStringKey) { + self.ops_channel_sender + .send(ChannelOp::Retain { key }) + .expect("the receiver is available"); + } + + pub(crate) fn release(&self, istring: &mut IString) { + self.ops_channel_sender + .send(ChannelOp::Release { key: istring.key }) + .expect("the receiver is available"); + } + pub(crate) fn read<'a>(&self, istring: &'a IString) -> &'a str { let iss = self.read_handle.enter().expect("reader is available"); let stored_string = iss.map.get(&istring.key).expect("a valid IString implies that the storage has it's string contents"); @@ -116,7 +180,10 @@ impl ThreadLocalReader { #[derive(Clone)] pub(crate) struct StoredString { pub(crate) inner: BoxedStr, - strong_count: usize + // Note: can be negative because StringStorageOp::Retain and StringStorageOp::Release + // are not guaranteeded to be appended in order. + // When performing StringStorageOp::DropUnusedStrings, it should be >= 0 though. + strong_count: isize } impl StoredString { @@ -287,6 +354,7 @@ impl Absorb for InnerStringStorage { // they are still being aliased by the read map's and the write map's `StoredString`s for string_key in self.strings_to_possibly_free.drain(..) { let stored = self.map.remove(&string_key).unwrap(); + debug_assert!(stored.strong_count >= 0, "after all Retain/Release operations are absorbed, it should not be negative"); // make sure that the string is actually unused if stored.is_droppable() { // remove it from the trie as well @@ -330,6 +398,7 @@ impl Absorb for InnerStringStorage { StringStorageOp::DropUnusedStrings => { for string_key in self.strings_to_possibly_free.drain(..) { let stored = self.map.remove(&string_key).unwrap(); + debug_assert!(stored.strong_count >= 0, "after all Retain/Release operations are absorbed, it should not be negative"); // make sure that the string is actually unused if stored.is_droppable() { // remove it from the trie as well