Skip to content

Commit

Permalink
fixed bugs with buffer compaction tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Gillgamesh committed Nov 7, 2024
1 parent 798b453 commit 2aae309
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
2 changes: 2 additions & 0 deletions include/bucket_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class BucketBuffer {
* If the buffer is still full after compaction, it will return false.
*/
bool insert(int col_idx, int row_idx, Bucket value) {
_compacted = false;
entries.emplace_back(BufferEntry({value, col_idx, row_idx}));
if (over_capacity()) {
sort_and_compact();
Expand Down Expand Up @@ -175,6 +176,7 @@ class BucketBuffer {
// std::cout << "Merging buffers of size " << size() << " and " << other.size() << std::endl;
assert(size() + other.size() <= _capacity);
entries.insert(entries.end(), other.entries.begin(), other.entries.end());
_compacted = false;
// TODO - use a more efficient sorting procedure
if (over_capacity()) {
sort_and_compact();
Expand Down
4 changes: 2 additions & 2 deletions include/sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ class Sketch {
* @return The length of the vector to sketch
*/
static vec_t calc_vector_length(node_id_t num_vertices) {
// return ceil(double(num_vertices) * (num_vertices - 1) / 2);
return ceil(double(num_vertices) * (num_vertices - 1) / 2);
// return num_vertices * 4;
return 50; // round to something thats approx 2^6
// return 50; // round to something thats approx 2^6
// return 1 + num_vertices / 16 ;
}

Expand Down
23 changes: 17 additions & 6 deletions src/sketch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ Sketch::~Sketch() {
}

void Sketch::reallocate(size_t new_num_rows) {
assert(new_num_rows > bkt_per_col);
uint8_t max_depth = 0;
for (size_t i = 0; i < num_columns; ++i) {
max_depth = std::max(max_depth, effective_size(i));
}
assert(new_num_rows >= max_depth);
size_t old_bucket_array_bytes = bucket_array_bytes();
size_t old_buckets_main = num_buckets - 1;
size_t old_num_rows = bkt_per_col;
Expand All @@ -157,12 +161,14 @@ Sketch::~Sketch() {
Bucket *new_buckets = (Bucket*) new char[bucket_array_bytes()];
std::memset(new_buckets, 0, bucket_array_bytes());
#ifdef ROW_MAJOR_SKETCHES
// TODO - implement to allow shrinkage
std::memcpy(new_buckets, buckets, old_bucket_array_bytes);
assert(false)
#else
for (size_t i = 0; i < num_columns; ++i) {
Bucket *old_column = buckets + (i * old_num_rows);
Bucket *new_column = new_buckets + (i * new_num_rows);
std::memcpy(new_column, old_column, old_num_rows * sizeof(Bucket));
std::memcpy(new_column, old_column, std::min(new_num_rows, old_num_rows) * sizeof(Bucket));
}
new_buckets[num_buckets - 1] = buckets[old_buckets_main];
#endif
Expand Down Expand Up @@ -255,7 +261,8 @@ void Sketch::update(const vec_t update_idx) {
while (!sufficient_space) {
// TODO - magical number
// std::cout << "Buffer full, reallocating" << std::endl;
reallocate((bkt_per_col * 8) / 5);
// reallocate((bkt_per_col * 8) / 5);
reallocate(bkt_per_col + 3);
// std::cout << "and now injecting" << std::endl;
inject_buffer_buckets();
// bucket_buffer.insert(i, depth, {update_idx, checksum});
Expand All @@ -272,6 +279,8 @@ void Sketch::zero_contents() {
buckets[i].gamma = 0;
}
reset_sample_state();
// TODO - dont do this. Or figure out a way to make it configurable
reallocate(5);
bucket_buffer.clear();
}

Expand Down Expand Up @@ -383,8 +392,9 @@ void Sketch::merge(const Sketch &other) {
// ie we would want to deal with some depths seperately.
bool sufficient_space = bucket_buffer.merge(other.bucket_buffer);
while (!sufficient_space) {
std::cout << "Merge: Buffer full, reallocating" << std::endl;
reallocate((bkt_per_col * 8) / 5);
// std::cout << "Merge: Buffer full, reallocating" << std::endl;
// reallocate((bkt_per_col * 8) / 5);
reallocate(bkt_per_col + 3);
inject_buffer_buckets();
sufficient_space = !bucket_buffer.over_capacity();
}
Expand Down Expand Up @@ -476,7 +486,8 @@ void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samp
bool sufficient_space = bucket_buffer.merge(other.bucket_buffer);
while (!sufficient_space) {
// std::cout << "Merge: Buffer full, reallocating" << std::endl;
reallocate((bkt_per_col * 8) / 5);
// reallocate((bkt_per_col * 8) / 5);
reallocate(bkt_per_col + 3);
inject_buffer_buckets();
sufficient_space = !bucket_buffer.over_capacity();
}
Expand Down

0 comments on commit 2aae309

Please sign in to comment.