Skip to content

Commit

Permalink
performant but incorrect bottom hashing attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
Gillgamesh committed Sep 6, 2024
1 parent 65da85a commit 16f89d9
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 66 deletions.
42 changes: 20 additions & 22 deletions include/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <vector>
#include <xxhash.h>
#include <iostream>
#include <bitset>
#include "types.h"

#pragma pack(push,1)
Expand Down Expand Up @@ -46,26 +47,28 @@ namespace Bucket_Boruvka {
const long num_columns,
const vec_hash_t max_depth
) {
if (num_columns == 0) return;
XXH128_hash_t *hashes = (XXH128_hash_t*) depths_buffer;
#pragma omp simd
for (int col = 0; col < num_columns -4; col+=4) {
for (int col = 0; col <= num_columns -4; col+=4) {
auto hash = XXH3_128bits_withSeed(&update_idx, sizeof(vec_t), seed + 5 * (col / 4) );
hashes[col / 4] = hash;
}
for (int col = 0; col< num_columns - 4; col+=4) {
for (int col = 0; col < num_columns - 4; col+=4) {
auto hash = hashes[col / 4];
// auto hash = XXH3_128bits_withSeed(&update_idx, sizeof(vec_t), seed + 5 * (col / 4) );
depths_buffer[col] = (uint32_t) (hash.low64 >> 32);
depths_buffer[col+1] = (uint32_t) (hash.low64 & 0xFFFFFFFF);
depths_buffer[col+2] = (uint32_t) (hash.high64 >> 32);
depths_buffer[col+3] = (uint32_t) (hash.high64 & 0xFFFFFFFF);
}
for (int col = num_columns - (num_columns % 4); col < num_columns; col++) {
depths_buffer[col] = get_index_depth(update_idx, seed, col, max_depth);
int col=0;
for (; col < num_columns -4; col++) {
depths_buffer[col] |= (uint32_t) (1ull << max_depth); // assert not > max_depth by ORing
depths_buffer[col] = (uint32_t) (__builtin_ctzll(depths_buffer[col]));
}
for (int col = 0; col < num_columns; col++) {
depths_buffer[col] |= (1ull << max_depth); // assert not > max_depth by ORing
depths_buffer[col] = __builtin_ctzll(depths_buffer[col]);
col-= 1;
for (; col< num_columns; col++) {
depths_buffer[col] = (uint32_t) get_index_depth(update_idx, seed, col, max_depth);
}
}

Expand Down Expand Up @@ -107,22 +110,17 @@ inline col_hash_t Bucket_Boruvka::get_index_depth(const vec_t update_idx, const
// auto hash = XXH3_128bits_withSeed(&update_idx, sizeof(vec_t), seed + 5 * (col) );
col_hash_t depth_hash = 0;
int offset = col % 4;
switch (offset) {
case 0:
depth_hash = (uint32_t) (hash.low64 >> 32);
break;
case 1:
depth_hash = (uint32_t) (hash.low64 & 0xFFFFFFFF);
break;
case 2:
depth_hash = (uint32_t) (hash.high64 >> 32);
break;
case 3:
depth_hash = (uint32_t) (hash.high64 & 0xFFFFFFFF);
break;
if (offset == 0) {
depth_hash = (uint32_t) (hash.low64 >> 32);
} else if (offset == 1) {
depth_hash = (uint32_t) (hash.low64 & 0xFFFFFFFF);
} else if (offset == 2) {
depth_hash = (uint32_t) (hash.high64 >> 32);
} else if (offset == 3) {
depth_hash = (uint32_t) (hash.high64 & 0xFFFFFFFF);
}
// std::cout << "hash " << hash.low64 << " " << hash.high64 << " " << depth_hash << std::endl;
// col_hash_t depth_hash = hash.low64;
// std::cout << "depth_hash: " << std::bitset<32>(depth_hash) << std::endl;
depth_hash |= (1ull << max_depth); // assert not > max_depth by ORing
return __builtin_ctzll(depth_hash);
}
Expand Down
37 changes: 20 additions & 17 deletions include/sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ class Sketch {
public:
size_t num_columns; // Total number of columns. (product of above 2)
size_t bkt_per_col; // number of buckets per column
std::function<std::vector<vec_t>()> get_evicted_fn = [this](){
std::vector<vec_t> ret;
for (auto it = bucket_map.begin(); it != bucket_map.end(); it++) {
if (it->second % 2 == 1) {
ret.push_back(it->first);
}
}
return ret;
};
// PER BUCKET
std::function<void(vec_t)> evict_fn = [this](vec_t update){
// interface: update is the index that's being pushed,
bucket_map.emplace(update, 0);
// std::cout << "POST EMPLACE VALUE" << bucket_map[update] << std::endl;
bucket_map[update] += 1;
};
private:

// TODO - decringe this
Expand Down Expand Up @@ -70,22 +86,7 @@ class Sketch {
// // std::equal_to<vec_t>(), // equal function for keys
// // std::allocator<std::pair<const vec_t, size_t>>(), // allocator for the map
// );
std::unordered_map<vec_t, bool> bucket_map;
// PER BUCKET
std::function<void(vec_t)> evict_fn = [this](vec_t update){
// interface: update is the index that's being pushed,
bucket_map.emplace(update, 0);
bucket_map[update] ^= 1;
};
std::function<std::vector<vec_t>()> get_evicted_fn = [this](){
std::vector<vec_t> ret;
for (auto it = bucket_map.begin(); it != bucket_map.end(); it++) {
if (it->second == 1) {
ret.push_back(it->first);
}
}
return ret;
};
std::unordered_map<vec_t, int> bucket_map;

// flags

Expand Down Expand Up @@ -113,7 +114,9 @@ class Sketch {
* @return The length of the vector to sketch
*/
static vec_t calc_vector_length(node_id_t num_vertices) {
return ceil(double(num_vertices) * (num_vertices - 1) / 2);
// return ceil(double(num_vertices) * (num_vertices - 1) / 2);
// return num_vertices * 2;
return num_vertices / 2;
}

/**
Expand Down
27 changes: 26 additions & 1 deletion src/cc_sketch_alg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,32 @@ inline bool CCSketchAlg::run_round_zero() {
for (node_id_t i = 0; i < num_vertices; i++) {
try {
// num_query += 1;
if (sample_supernode(*sketches[i]) && !modified) modified = true;
// first, sample the merge pool.
auto skt = *sketches[i];
std::vector<vec_t> evicted = skt.get_evicted_fn();
if (evicted.size() != 0) {
std::cout << "THI IS AN EVICTION NOTICE " << evicted.size() << std::endl;
for (auto ev : evicted) {
Edge e = inv_concat_pairing_fn(ev);
auto src = std::min(e.src, e.dst);
auto dst = std::max(e.src, e.dst);
DSUMergeRet<node_id_t> m_ret = dsu.merge(src, dst);
if (m_ret.merged) {
std::cout <<"GANG";
#ifdef VERIFY_SAMPLES_F
verifier->verify_edge(e);
#endif
modified = true;
// Update spanning forest
{
std::lock_guard<std::mutex> lk(spanning_forest_mtx[src]);
spanning_forest[src].insert(dst);
}
}
}
}
if (sample_supernode(*sketches[i]) && !modified)
modified = true;
} catch (...) {
except = true;
#pragma omp critical
Expand Down
73 changes: 47 additions & 26 deletions src/sketch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ Sketch::Sketch(const Sketch &s) : seed(s.seed) {
num_columns = s.num_columns;
bkt_per_col = s.bkt_per_col;
num_buckets = s.num_buckets;
bucket_map = s.bucket_map;
buckets = (Bucket*) new char[bucket_array_bytes()];
// buckets = new Bucket[num_buckets];

Expand Down Expand Up @@ -166,27 +167,26 @@ void Sketch::update(const vec_t update_idx) {
void Sketch::update(const vec_t update_idx) {
vec_hash_t checksum = Bucket_Boruvka::get_index_hash(update_idx, checksum_seed());

// Update depth 0 bucket
Bucket_Boruvka::update(get_deterministic_bucket(), update_idx, checksum);

// calculate all depths:
Bucket_Boruvka::get_all_index_depths(
update_idx, depth_buffer, get_seed(), num_columns, bkt_per_col + 1
);
uint32_t max_depth = 0;
for (size_t i = 0; i < num_columns; i++) {
for (size_t i = 0; i < num_columns; ++i) {
max_depth = std::max(max_depth, depth_buffer[i]);
// std::cout << "depth " << i << ": " << depth_buffer[i] << std::endl;
}
unlikely_if (max_depth >= bkt_per_col) {
// likely_if (1) {
// std::cout << "evicting " << update_idx << " with depth " << max_depth << "/ " << bkt_per_col << std::endl;
// evict_fn(update_idx);
// return;
evict_fn(update_idx);
return;
}
// Update depth 0 bucket
Bucket_Boruvka::update(get_deterministic_bucket(), update_idx, checksum);
for (unsigned i = 0; i < num_columns; ++i) {
col_hash_t depth = depth_buffer[i];
// col_hash_t depth = Bucket_Boruvka::get_index_depth(
// update_idx, get_seed(), i, bkt_per_col
// );
Bucket &bucket = get_bucket(i, depth);
likely_if(depth < bkt_per_col) {
Bucket_Boruvka::update(bucket, update_idx, checksum);
Expand All @@ -208,17 +208,10 @@ void Sketch::zero_contents() {
buckets[i].gamma = 0;
}
reset_sample_state();
bucket_map.clear();
}

SketchSample Sketch::sample() {
// first, try to sample from the table:

// std::vector<vec_t> full_samples = get_evicted_fn();
// if (full_samples.size() > 0) {
// std::cout << "Found " << full_samples.size() << " samples" << std::endl;
// evict_fn(full_samples[0]);
// return {full_samples[0], GOOD};
// }

if (sample_idx >= num_samples) {
throw OutOfSamplesException(seed, num_samples, sample_idx);
Expand All @@ -227,8 +220,20 @@ SketchSample Sketch::sample() {
size_t idx = sample_idx++;
size_t first_column = idx * cols_per_sample;

if (Bucket_Boruvka::is_empty(get_deterministic_bucket()))
if (Bucket_Boruvka::is_empty(get_deterministic_bucket())) {
// ONLY if we're out of our own samples, do we try the evicted
// std::vector<vec_t> full_samples = get_evicted_fn();
// if (full_samples.size() > 0)
// {
// std::cout << "Found " << full_samples.size() << " samples during round " << sample_idx << std::endl;
// evict_fn(full_samples[0]);
// SketchSample sample = {full_samples[0], GOOD};
// return sample;
// // full_samples = get_evicted_fn();
// // std::cout << "Now has" << full_samples.size() << " samples" << std::endl;
// }
return {0, ZERO}; // the "first" bucket is deterministic so if all zero then no edges to return
}

if (Bucket_Boruvka::is_good(get_deterministic_bucket(), checksum_seed()))
return {get_deterministic_bucket().alpha, GOOD};
Expand All @@ -245,6 +250,17 @@ SketchSample Sketch::sample() {
return {bucket.alpha, GOOD};
}
}
// no success? nows a good time to also try getting an eviction sample
// std::vector<vec_t> full_samples = get_evicted_fn();
// if (full_samples.size() > 0)
// {
// std::cout << "Found " << full_samples.size() << " samples during round " << sample_idx << std::endl;
// evict_fn(full_samples[0]);
// SketchSample sample = {full_samples[0], GOOD};
// return sample;
// // full_samples = get_evicted_fn();
// // std::cout << "Now has" << full_samples.size() << " samples" << std::endl;
// }
return {0, FAIL};
}

Expand Down Expand Up @@ -308,8 +324,11 @@ void Sketch::merge(const Sketch &other) {
deterministic_bucket.gamma ^= other.get_deterministic_bucket().gamma;

for (auto it = other.bucket_map.begin(); it != other.bucket_map.end(); it++) {
bucket_map.emplace(it->first, 0);
bucket_map[it->first] ^= it->second;
evict_fn(it->first);
// std::cout << bucket_map.size() << std::endl;
// for (auto &it : bucket_map) {
// // std::cout << it.first << " " << it.second << std::endl;
// }
}
}

Expand Down Expand Up @@ -353,13 +372,15 @@ void Sketch::recalculate_flags(size_t col_idx, size_t start_idx, size_t end_idx)


void Sketch::range_merge(const Sketch &other, size_t start_sample, size_t n_samples) {
for (auto it = other.bucket_map.begin(); it != other.bucket_map.end(); it++) {
// if (it->second) {
// evict_fn(it->first);
// }
bucket_map.emplace(it->first, 0);
bucket_map[it->first] ^= it->second;
}
// WE CANNOT RANGE MERGE THESE! ! ! ! ! ! ! ! ! !
// the eviction pool should be immutable
// for (auto it = other.bucket_map.begin(); it != other.bucket_map.end(); it++) {
// // if (it->second) {
// // evict_fn(it->first);
// // }
// bucket_map.emplace(it->first, 0);
// bucket_map[it->first] ^= it->second;
// }
if (start_sample + n_samples > num_samples) {
assert(false);
sample_idx = num_samples; // sketch is in a fail state!
Expand Down

0 comments on commit 16f89d9

Please sign in to comment.