Skip to content

Commit

Permalink
Merge pull request #126 from GraphStreamingProject/range_merge
Browse files Browse the repository at this point in the history
Supernode: range merge function
  • Loading branch information
etwest authored Apr 3, 2023
2 parents 26b65e0 + 9436ddd commit bd8cddd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
15 changes: 13 additions & 2 deletions include/supernode.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Supernode {

private:
size_t num_sketches;
size_t merged_sketches; // This variable tells us which sketches are good for queries post merge
size_t sketch_size;

/* collection of logn sketches to query from, since we can't query from one
Expand Down Expand Up @@ -112,10 +113,10 @@ class Supernode {
static int get_max_sketches() { return max_sketches; };

// get number of samples remaining in the Supernode
int samples_remaining() { return num_sketches - sample_idx; }
int samples_remaining() { return merged_sketches - sample_idx; }

inline bool out_of_queries() {
return sample_idx >= num_sketches;
return sample_idx >= merged_sketches;
}

inline int curr_idx() {
Expand Down Expand Up @@ -158,6 +159,16 @@ class Supernode {
*/
void merge(Supernode& other);

/**
* In-place range merge function. Updates the caller Supernode.
* The range merge only merges some of the Sketches
* This function should only be used if you know what you're doing
* @param other Supernode to merge into caller
* @param start_idx Index of first Sketch to merge
* @param num_merge How many sketches to merge
*/
void range_merge(Supernode& other, size_t start_idx, size_t num_merge);

/**
* Insert or delete an (encoded) edge into the supernode. Guaranteed to be
* processed BEFORE Boruvka starts.
Expand Down
27 changes: 19 additions & 8 deletions src/supernode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ size_t Supernode::bytes_size;
size_t Supernode::serialized_size;

Supernode::Supernode(uint64_t n, uint64_t seed): sample_idx(0),
n(n), seed(seed), num_sketches(max_sketches), sketch_size(Sketch::sketchSizeof()) {
n(n), seed(seed), num_sketches(max_sketches),
merged_sketches(max_sketches), sketch_size(Sketch::sketchSizeof()) {

size_t sketch_width = Sketch::column_gen(Sketch::get_failure_factor());
// generate num_sketches sketches for each supernode (read: node)
Expand Down Expand Up @@ -40,6 +41,7 @@ Supernode::Supernode(uint64_t n, uint64_t seed, std::istream &binary_in) :
}
// sample in range [beg, beg + num)
num_sketches = beg + num;
merged_sketches = num_sketches;
sample_idx = beg;

// create empty sketches, if any
Expand All @@ -59,8 +61,9 @@ Supernode::Supernode(uint64_t n, uint64_t seed, std::istream &binary_in) :
}
}

Supernode::Supernode(const Supernode& s) : sample_idx(s.sample_idx), n(s.n),
seed(s.seed), num_sketches(s.num_sketches), sketch_size(s.sketch_size) {
Supernode::Supernode(const Supernode& s) :
sample_idx(s.sample_idx), n(s.n), seed(s.seed), num_sketches(s.num_sketches),
merged_sketches(s.merged_sketches), sketch_size(s.sketch_size) {
for (size_t i = 0; i < num_sketches; ++i) {
Sketch::makeSketch(get_sketch(i), *s.get_sketch(i));
}
Expand All @@ -82,7 +85,7 @@ Supernode::~Supernode() {
}

std::pair<Edge, SampleSketchRet> Supernode::sample() {
if (sample_idx == num_sketches) throw OutOfQueriesException();
if (out_of_queries()) throw OutOfQueriesException();

std::pair<vec_t, SampleSketchRet> query_ret = get_sketch(sample_idx++)->query();
vec_t non_zero = query_ret.first;
Expand All @@ -91,7 +94,7 @@ std::pair<Edge, SampleSketchRet> Supernode::sample() {
}

std::pair<std::vector<Edge>, SampleSketchRet> Supernode::exhaustive_sample() {
if (sample_idx == num_sketches) throw OutOfQueriesException();
if (out_of_queries()) throw OutOfQueriesException();

std::pair<std::vector<vec_t>, SampleSketchRet> query_ret = get_sketch(sample_idx++)->exhaustive_query();
std::vector<Edge> edges(query_ret.first.size());
Expand All @@ -104,10 +107,18 @@ std::pair<std::vector<Edge>, SampleSketchRet> Supernode::exhaustive_sample() {

void Supernode::merge(Supernode &other) {
sample_idx = std::max(sample_idx, other.sample_idx);
num_sketches = std::min(num_sketches, other.num_sketches);
for (size_t i=sample_idx;i<num_sketches;++i) {
merged_sketches = std::min(merged_sketches, other.merged_sketches);
for (size_t i = sample_idx; i < merged_sketches; ++i)
(*get_sketch(i))+=(*other.get_sketch(i));
}

void Supernode::range_merge(Supernode& other, size_t start_idx, size_t num_merge) {
sample_idx = std::max(sample_idx, other.sample_idx);
// we trust the caller so whatever they tell us goes here
// hopefully if the caller is incorrect then this will be caught by out_of_queries()
merged_sketches = start_idx + num_merge;
for (size_t i = sample_idx; i < merged_sketches; i++)
(*get_sketch(i))+=(*other.get_sketch(i));
}
}

void Supernode::update(vec_t upd) {
Expand Down

0 comments on commit bd8cddd

Please sign in to comment.