Skip to content

Commit

Permalink
Merge pull request #125 from GraphStreamingProject/better_serialize
Browse files Browse the repository at this point in the history
Better Serialize and 3 Columns
  • Loading branch information
etwest authored Mar 16, 2023
2 parents 28e23eb + a7cd494 commit 26b65e0
Show file tree
Hide file tree
Showing 13 changed files with 583 additions and 138 deletions.
2 changes: 1 addition & 1 deletion include/l0_sampling/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ inline col_hash_t Bucket_Boruvka::get_index_depth(const vec_t update_idx, const
const vec_hash_t max_depth) {
col_hash_t depth_hash = col_hash(&update_idx, sizeof(vec_t), seed_and_col);
depth_hash |= (1ull << max_depth); // assert not > max_depth by ORing
return __builtin_ctzl(depth_hash);
return __builtin_ctzll(depth_hash);
}

inline vec_hash_t Bucket_Boruvka::get_index_hash(const vec_t update_idx, const long sketch_seed) {
Expand Down
44 changes: 32 additions & 12 deletions include/l0_sampling/sketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
#include "../util.h"
#include "bucket.h"

// max number of non-zeroes in vector is n/2*n/2=n^2/4
#define guess_gen(x) double_to_ull(log2(x) - 2)
#define bucket_gen(d) double_to_ull((log2(d) + 1))

enum SampleSketchRet {
GOOD, // querying this sketch returned a single non-zero value
ZERO, // querying this sketch returned that there are no non-zero values
Expand All @@ -34,7 +30,7 @@ class Sketch {
static vec_t failure_factor; // Pr(failure) = 1 / factor. Determines number of columns in sketch.
static vec_t n; // Length of the vector this is sketching.
static size_t num_elems; // length of our actual arrays in number of elements
static size_t num_buckets; // Portion of array length, number of buckets
static size_t num_columns; // Portion of array length, number of columns
static size_t num_guesses; // Portion of array length, number of guesses

// Seed used for hashing operations in this sketch.
Expand All @@ -52,14 +48,14 @@ class Sketch {
FRIEND_TEST(EXPR_Parallelism, N10kU100k);

// Buckets of this sketch.
// Length is bucket_gen(failure_factor) * guess_gen(n).
// Length is column_gen(failure_factor) * guess_gen(n).
// For buckets[i * guess_gen(n) + j], the bucket has a 1/2^j probability
// of containing an index. The first two are pointers into the buckets array.
alignas(vec_t) char buckets[];

// private constructors -- use makeSketch
Sketch(uint64_t seed);
Sketch(uint64_t seed, std::istream& binary_in);
Sketch(uint64_t seed, std::istream& binary_in, bool sparse);
Sketch(const Sketch& s);

public:
Expand All @@ -73,7 +69,7 @@ class Sketch {
* @return A pointer to a newly constructed sketch
*/
static Sketch* makeSketch(void* loc, uint64_t seed);
static Sketch* makeSketch(void* loc, uint64_t seed, std::istream& binary_in);
static Sketch* makeSketch(void* loc, uint64_t seed, std::istream& binary_in, bool sparse=false);

/**
* Copy constructor to create a sketch from another
Expand All @@ -92,9 +88,9 @@ class Sketch {
inline static void configure(vec_t _n, vec_t _factor) {
n = _n;
failure_factor = _factor;
num_buckets = bucket_gen(failure_factor);
num_columns = column_gen(failure_factor);
num_guesses = guess_gen(n);
num_elems = num_buckets * num_guesses + 1; // +1 for zero bucket optimization
num_elems = num_columns * num_guesses + 1; // +1 for zero bucket optimization
}

inline static size_t sketchSizeof() {
Expand All @@ -110,6 +106,8 @@ class Sketch {

inline void reset_queried() { already_queried = false; }

inline static size_t get_columns() { return num_columns; }

/**
* Update a sketch based on information about one of its indices.
* @param update the point update.
Expand All @@ -124,11 +122,19 @@ class Sketch {

/**
* Function to query a sketch.
* @return A pair with the result index and a code indicating if the type of result.
* @return A pair with the result index and a code indicating the type of result.
*/
std::pair<vec_t, SampleSketchRet> query();

/*
* Function to query all columns within a sketch to return 1 or more non-zero indices
* @return A pair with the result indices and a code indicating the type of result.
*/
std::pair<std::vector<vec_t>, SampleSketchRet> exhaustive_query();

inline uint64_t get_seed() const { return seed; }
inline size_t column_seed(size_t column_idx) const { return seed + column_idx*5; }
inline size_t checksum_seed() const { return seed; }

/**
* Operator to add a sketch to another one in-place. Guaranteed to be
Expand All @@ -144,10 +150,24 @@ class Sketch {

/**
* Serialize the sketch to a binary output stream.
* @param out the stream to write to.
* @param binary_out the stream to write to.
*/
void write_binary(std::ostream& binary_out);
void write_binary(std::ostream& binary_out) const;

/**
* Serialize a sketch while optimizing for space
* This assumes that the sketch itself sparse
* Otherwise, this serialization will use more space
* @param binary_out the stream to write to.
*/
void write_sparse_binary(std::ostream& binary_out);
void write_sparse_binary(std::ostream& binary_out) const;


// max number of non-zeroes in vector is n/2*n/2=n^2/4
static size_t guess_gen(size_t x) { return double_to_ull(log2(x) - 2); }
static size_t column_gen(size_t d) { return double_to_ull((log2(d) + 1)); }
};

class MultipleQueryException : public std::exception {
Expand Down
68 changes: 49 additions & 19 deletions include/supernode.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@

#include "l0_sampling/sketch.h"

enum SerialType {
FULL,
PARTIAL,
SPARSE,
};

/**
* This interface implements the "supernode" so Boruvka can use it as a black
* box without needing to worry about implementing l_0.
*/
class Supernode {
// the size of a super-node in bytes including the all sketches off the end
static size_t bytes_size;
static size_t max_sketches;
static size_t bytes_size; // the size of a super-node in bytes including the sketches
static size_t serialized_size; // the size of a supernode that has been serialized
int idx;
int num_sketches;
size_t sample_idx;
std::mutex node_mt;

FRIEND_TEST(SupernodeTestSuite, TestBatchUpdate);
FRIEND_TEST(SupernodeTestSuite, TestConcurrency);
FRIEND_TEST(SupernodeTestSuite, TestSerialization);
FRIEND_TEST(SupernodeTestSuite, TestPartialSparseSerialization);
FRIEND_TEST(SupernodeTestSuite, SketchesHaveUniqueSeeds);
FRIEND_TEST(GraphTestSuite, TestCorrectnessOfReheating);
FRIEND_TEST(GraphTest, TestSupernodeRestoreAfterCCFailure);
FRIEND_TEST(EXPR_Parallelism, N10kU100k);
Expand All @@ -29,6 +36,7 @@ class Supernode {
const uint64_t seed; // for creating a copy

private:
size_t num_sketches;
size_t sketch_size;

/* collection of logn sketches to query from, since we can't query from one
Expand Down Expand Up @@ -79,10 +87,11 @@ class Supernode {

~Supernode();

static inline void configure(uint64_t n, vec_t sketch_fail_factor=100) {
static inline void configure(uint64_t n, vec_t sketch_fail_factor=default_fail_factor) {
Sketch::configure(n*n, sketch_fail_factor);
bytes_size = sizeof(Supernode) + size_t(log2(n)/(log2(3)-1)) * Sketch::sketchSizeof();
serialized_size = size_t(log2(n)/(log2(3)-1)) * Sketch::serialized_size();
max_sketches = log2(n)/(log2(3)-1);
bytes_size = sizeof(Supernode) + max_sketches * Sketch::sketchSizeof();
serialized_size = max_sketches * Sketch::serialized_size();
}

static inline size_t get_size() {
Expand All @@ -98,28 +107,28 @@ class Supernode {
return sketch_size;
}

// return the number of sketches held in this supernode
int get_num_sktch() { return num_sketches; };
// return the maximum number of sketches held in by a Supernode
// most Supernodes will hold this many sketches
static int get_max_sketches() { return max_sketches; };

// get number of samples remaining in the Supernode
int samples_remaining() { return num_sketches - sample_idx; }

inline bool out_of_queries() {
return idx == num_sketches;
return sample_idx >= num_sketches;
}

inline int curr_idx() {
return idx;
}

inline void incr_idx() {
++idx;
return sample_idx;
}

// reset the supernode query metadata
// we use this when resuming insertions after CC made copies in memory
inline void reset_query_state() {
for (int i = 0; i < idx; i++) {
for (size_t i = 0; i < sample_idx; i++) {
get_sketch(i)->reset_queried();
}
idx = 0;
sample_idx = 0;
}

// get the ith sketch in the sketch array as a const object
Expand All @@ -135,6 +144,15 @@ class Supernode {
*/
std::pair<Edge, SampleSketchRet> sample();

/**
* Function to sample 1 or more edges from the cut of a supernode.
* This function runs a query that samples from all columns in a single Sketch
* @return an list of edges in the cut, each represented as an Edge with LHS <= RHS,
* if one exists. Additionally, returns a code represnting the sample
* result (good, zero, or fail)
*/
std::pair<std::vector<Edge>, SampleSketchRet> exhaustive_sample();

/**
* In-place merge function. Guaranteed to update the caller Supernode.
*/
Expand Down Expand Up @@ -166,9 +184,21 @@ class Supernode {

/**
* Serialize the supernode to a binary output stream.
* @param out the stream to write to.
* @param binary_out the stream to write to.
*/
void write_binary(std::ostream &binary_out, bool sparse = false);

/*
* Serialize a portion of the supernode to a binary output stream.
* @param binary_out the stream to write to.
* @param beg the index of the first sketch to serialize
* @param num the number of sketches to serialize
*/
void write_binary(std::ostream &binary_out);
void write_binary_range(std::ostream&binary_out, uint32_t beg, uint32_t num, bool sparse = false);

// void write_sparse_binary_range(std::ostream&binary_out, uint32_t beg, uint32_t end);

static constexpr size_t default_fail_factor = 4;
};


Expand Down
4 changes: 2 additions & 2 deletions include/test/sketch_constructors.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ SketchUniquePtr makeSketch(long seed) {
};
}

SketchUniquePtr makeSketch(long seed, std::fstream &binary_in) {
SketchUniquePtr makeSketch(long seed, std::fstream &binary_in, bool sparse=false) {
void* loc = malloc(Sketch::sketchSizeof());
return {
Sketch::makeSketch(loc, seed, binary_in),
Sketch::makeSketch(loc, seed, binary_in, sparse),
[](Sketch* s){ free(s); }
};
}
6 changes: 6 additions & 0 deletions include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ enum UpdateType {
struct Edge {
node_id_t src = 0;
node_id_t dst = 0;

bool operator< (const Edge&oth) const {
if (src == oth.src)
return dst < oth.dst;
return src < oth.src;
}
};

struct GraphUpdate {
Expand Down
Loading

0 comments on commit 26b65e0

Please sign in to comment.