Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RCORE-2233 Reduce locking for StringInterner lookup and compare methods #7954

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 48 additions & 28 deletions src/realm/string_interner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,20 @@ static std::vector<uint32_t> hash_to_id(Array& node, uint32_t hash, uint8_t hash

enum positions { Pos_Version, Pos_ColKey, Pos_Size, Pos_Compressor, Pos_Data, Pos_Map, Top_Size };
struct StringInterner::DataLeaf {
std::vector<CompressedStringView> m_compressed;
ref_type m_leaf_ref = 0;
bool m_is_loaded = false;
std::vector<CompressedStringView> m_compressed;
std::atomic<bool> m_is_loaded = false;
DataLeaf() {}
DataLeaf(ref_type ref)
: m_leaf_ref(ref)
{
}
DataLeaf(const DataLeaf&& other)
finnschiermer marked this conversation as resolved.
Show resolved Hide resolved
: m_leaf_ref(other.m_leaf_ref)
, m_compressed(other.m_compressed)
, m_is_loaded(other.m_is_loaded.load(std::memory_order_acquire))
{
}
};

StringInterner::StringInterner(Allocator& alloc, Array& parent, ColKey col_key, bool writable)
Expand Down Expand Up @@ -359,7 +365,6 @@ void StringInterner::update_from_parent(bool writable)

void StringInterner::rebuild_internal()
{
std::lock_guard lock(m_mutex);
// release old decompressed strings
for (size_t idx = 0; idx < m_in_memory_strings.size(); ++idx) {
StringID id = m_in_memory_strings[idx];
Expand All @@ -369,7 +374,9 @@ void StringInterner::rebuild_internal()
continue;
}
if (auto& w = m_decompressed_strings[id - 1].m_weight) {
w >>= 1;
auto val = w.load(std::memory_order_acquire);
val = val >> 1;
w.store(val, std::memory_order_release);
}
else {
m_decompressed_strings[id - 1].m_decompressed.reset();
Expand All @@ -389,7 +396,7 @@ void StringInterner::rebuild_internal()
for (size_t idx = 0; idx < m_compressed_leafs.size(); ++idx) {
auto ref = m_data.get_as_ref(idx);
auto& leaf_meta = m_compressed_leafs[idx];
leaf_meta.m_is_loaded = false;
leaf_meta.m_is_loaded.store(false, std::memory_order_release);
leaf_meta.m_compressed.clear();
leaf_meta.m_leaf_ref = ref;
}
Expand All @@ -400,7 +407,6 @@ StringInterner::~StringInterner() {}
StringID StringInterner::intern(StringData sd)
{
REALM_ASSERT(m_top.is_attached());
std::lock_guard lock(m_mutex);
// special case for null string
if (sd.data() == nullptr)
return 0;
Expand All @@ -414,15 +420,14 @@ StringID StringInterner::intern(StringData sd)
// it's a new string
bool learn = true;
auto c_str = m_compressor->compress(sd, learn);
m_decompressed_strings.push_back({64, std::make_unique<std::string>(sd)});
m_decompressed_strings.emplace_back(64, std::make_unique<std::string>(sd));
auto id = m_decompressed_strings.size();
m_in_memory_strings.push_back(id);
add_to_hash_map(m_hash_map, h, id, 32);
size_t index = (size_t)m_top.get_as_ref_or_tagged(Pos_Size).get_as_int();
REALM_ASSERT_DEBUG(index == id - 1);
bool need_long_string_node = c_str.size() >= 65536;

// TODO: update_internal must set up m_current_long_string_node if it is in use
if (need_long_string_node && !m_current_long_string_node.is_attached()) {

m_current_long_string_node.create(NodeHeader::type_HasRefs);
Expand Down Expand Up @@ -458,7 +463,7 @@ StringID StringInterner::intern(StringData sd)
}
m_current_string_leaf.destroy();
// force later reload of leaf
m_compressed_leafs.back().m_is_loaded = false;
m_compressed_leafs.back().m_is_loaded.store(false, std::memory_order_release);
}
}
if (m_current_long_string_node.is_attached()) {
Expand Down Expand Up @@ -527,7 +532,7 @@ StringID StringInterner::intern(StringData sd)

bool StringInterner::load_leaf_if_needed(DataLeaf& leaf)
{
if (!leaf.m_is_loaded) {
if (!leaf.m_is_loaded.load(std::memory_order_relaxed)) {
// start with an empty leaf:
leaf.m_compressed.clear();
leaf.m_compressed.reserve(256);
Expand Down Expand Up @@ -568,7 +573,7 @@ bool StringInterner::load_leaf_if_needed(DataLeaf& leaf)
leaf.m_compressed.push_back({c, str_array.size()});
}
}
leaf.m_is_loaded = true;
leaf.m_is_loaded.store(true, std::memory_order_release);
return true;
}
return false;
Expand All @@ -585,14 +590,25 @@ bool StringInterner::load_leaf_if_new_ref(DataLeaf& leaf, ref_type new_ref)
return load_leaf_if_needed(leaf);
}

CompressedStringView& StringInterner::get_compressed(StringID id)
CompressedStringView& StringInterner::get_compressed(StringID id, bool lock_if_mutating)
{
auto index = id - 1; // 0 represents null
auto hi = index >> 8;
auto lo = index & 0xFFUL;

// This is an instance of the "double checked locking" idiom, chosen to minimize
// locking in the common case of the leaf already being fully initialized.
DataLeaf& leaf = m_compressed_leafs[hi];
load_leaf_if_needed(leaf);
if (leaf.m_is_loaded.load(std::memory_order_acquire)) {
return leaf.m_compressed[lo];
}
if (lock_if_mutating) {
std::lock_guard lock(m_mutex);
load_leaf_if_needed(leaf);
}
else {
load_leaf_if_needed(leaf);
}
REALM_ASSERT_DEBUG(lo < leaf.m_compressed.size());
return leaf.m_compressed[lo];
}
Expand All @@ -603,13 +619,12 @@ std::optional<StringID> StringInterner::lookup(StringData sd)
// "dead" mode
return {};
}
std::lock_guard lock(m_mutex);
if (sd.data() == nullptr)
return 0;
uint32_t h = (uint32_t)sd.hash();
auto candidates = hash_to_id(m_hash_map, h, 32);
for (auto& candidate : candidates) {
auto candidate_cpr = get_compressed(candidate);
auto candidate_cpr = get_compressed(candidate, true);
if (m_compressor->compare(sd, candidate_cpr) == 0)
return candidate;
}
Expand All @@ -618,10 +633,6 @@ std::optional<StringID> StringInterner::lookup(StringData sd)

int StringInterner::compare(StringID A, StringID B)
{
std::lock_guard lock(m_mutex);
// 0 is null, the first index starts from 1.
REALM_ASSERT_DEBUG(A <= m_decompressed_strings.size());
REALM_ASSERT_DEBUG(B <= m_decompressed_strings.size());
// comparisons against null
if (A == B && A == 0)
return 0;
Expand All @@ -630,14 +641,15 @@ int StringInterner::compare(StringID A, StringID B)
if (B == 0)
return 1;
// ok, no nulls.
// StringID 0 is null, the first true index starts from 1.
REALM_ASSERT_DEBUG(A <= m_decompressed_strings.size());
REALM_ASSERT_DEBUG(B <= m_decompressed_strings.size());
REALM_ASSERT(m_compressor);
return m_compressor->compare(get_compressed(A), get_compressed(B));
return m_compressor->compare(get_compressed(A, true), get_compressed(B, true));
}

int StringInterner::compare(StringData s, StringID A)
{
std::lock_guard lock(m_mutex);
REALM_ASSERT_DEBUG(A <= m_decompressed_strings.size());
// comparisons against null
if (s.data() == nullptr && A == 0)
return 0;
Expand All @@ -646,27 +658,35 @@ int StringInterner::compare(StringData s, StringID A)
if (A == 0)
return 1;
// ok, no nulls
REALM_ASSERT_DEBUG(A <= m_decompressed_strings.size());
REALM_ASSERT(m_compressor);
return m_compressor->compare(s, get_compressed(A));
return m_compressor->compare(s, get_compressed(A, true));
}


StringData StringInterner::get(StringID id)
{
REALM_ASSERT(m_compressor);
std::lock_guard lock(m_mutex);
if (id == 0)
return StringData{nullptr};
REALM_ASSERT_DEBUG(id <= m_decompressed_strings.size());

// Avoid taking a lock in the (presumably) common case, where the leaf with the compressed
// strings have already been loaded. Such leafs have "m_weight" > 0.
CachedString& cs = m_decompressed_strings[id - 1];
if (cs.m_decompressed) {
if (cs.m_weight < 128)
cs.m_weight += 64;
if (auto weight = cs.m_weight.load(std::memory_order_acquire)) {
REALM_ASSERT_DEBUG(cs.m_decompressed);
if (weight < 128) {
// ignore if this fails - that happens if some other thread bumps the value
// concurrently. And if so, we can live with loosing our own "bump"
cs.m_weight.compare_exchange_strong(weight, weight + 64, std::memory_order_acq_rel);
}
return {cs.m_decompressed->c_str(), cs.m_decompressed->size()};
}
cs.m_weight = 64;
std::lock_guard lock(m_mutex);
cs.m_decompressed = std::make_unique<std::string>(m_compressor->decompress(get_compressed(id)));
m_in_memory_strings.push_back(id);
cs.m_weight.store(64, std::memory_order_release);
return {cs.m_decompressed->c_str(), cs.m_decompressed->size()};
}

Expand Down
32 changes: 27 additions & 5 deletions src/realm/string_interner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,40 @@ namespace realm {
class StringCompressor;

struct CachedString {
uint8_t m_weight = 0;
std::atomic<uint8_t> m_weight = 0;
std::unique_ptr<std::string> m_decompressed;
CachedString() {}
CachedString(CachedString&& other)
{
m_decompressed = std::move(other.m_decompressed);
m_weight.store(other.m_weight.load(std::memory_order_relaxed), std::memory_order_relaxed);
}
CachedString(uint8_t init_weight, std::unique_ptr<std::string>&& ptr)
{
m_decompressed = std::move(ptr);
m_weight.store(init_weight, std::memory_order_relaxed);
}
};

class StringInterner {
public:
// To be used exclusively from Table
// Use of the StringInterner must honour the restrictions on concurrency given
// below. Currently this is ensured by only using concurrent access on frozen
// objects.
//
// Limitations wrt concurrency:
//
// To be used exclusively from Table and in a non-concurrent setting:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to have a counter that is increased/decreased when calling the non-threadsafe operation. Then we could have assertions requiring that the counter is 1 inside those functions and 0 in the threadsafe functions below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for such a counter to be correctly shared among threads (and thus be useful for asserting) it would have to be an atomic which carries some overhead. I'm not convinced it is worth it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add it in debug mode only?

StringInterner(Allocator& alloc, Array& parent, ColKey col_key, bool writable);
void update_from_parent(bool writable);
~StringInterner();

// To be used from Obj and for searching
// To be used from Obj within a write transaction or during commit.
// To be used only in a non-concurrent setting:
StringID intern(StringData);

// The following four methods can be used in a concurrent setting with each other,
// but not concurrently with any of the above methods.
std::optional<StringID> lookup(StringData);
int compare(StringID A, StringID B);
int compare(StringData, StringID A);
Expand All @@ -74,7 +95,7 @@ class StringInterner {
// when ever we meet a string too large to be placed inline.
Array m_current_long_string_node;
void rebuild_internal();
CompressedStringView& get_compressed(StringID id);
CompressedStringView& get_compressed(StringID id, bool lock_if_mutating = false);
// return true if the leaf was reloaded
bool load_leaf_if_needed(DataLeaf& leaf);
// return 'true' if the new ref was different and forced a reload
Expand All @@ -87,7 +108,8 @@ class StringInterner {
std::vector<CachedString> m_decompressed_strings;
std::vector<StringID> m_in_memory_strings;
// Mutual exclusion is needed for frozen transactions only. Live objects are
// only used in single threaded contexts so don't need them. For now, just use always.
// only used in single threaded contexts so don't need them. For now, we don't
// distinguish, assuming that locking is sufficiently low in both scenarios.
std::mutex m_mutex;
};
} // namespace realm
Expand Down
Loading