Skip to content

Commit

Permalink
Cleanup Database64
Browse files Browse the repository at this point in the history
  • Loading branch information
fractasy committed Oct 17, 2023
1 parent 4b5f4cb commit 51a79f7
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 249 deletions.
194 changes: 2 additions & 192 deletions src/hashdb64/database_64.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@
#include "tree_chunk.hpp"
#include "key_value_page.hpp"
#include "raw_data_page.hpp"
#include "zkglobals.hpp"

// Helper functions
string removeBSXIfExists64(string s) {return ((s.at(0) == '\\') && (s.at(1) == 'x')) ? s.substr(2) : s;}

Database64::Database64 (Goldilocks &fr, const Config &config) :
fr(fr),
config(config),
headerPageNumber(0)
Database64::Database64 (Goldilocks &fr, const Config &config) : headerPageNumber(0)
{
zkresult zkr;
headerPageNumber = 0;
Expand Down Expand Up @@ -57,26 +55,6 @@ void Database64::init(void)
exitProcess();
}

// Configure the server, if configuration is provided
if (config.databaseURL != "local")
{
// Sender thread creation
//pthread_create(&senderPthread, NULL, dbSenderThread64, this);

// Cache synchronization thread creation
/*if (config.dbCacheSynchURL.size() > 0)
{
pthread_create(&cacheSynchPthread, NULL, dbCacheSynchThread64, this);
}*/

useRemoteDB = true;
}
else
{
useRemoteDB = false;
}

// Mark the database as initialized
bInitialized = true;
}
Expand Down Expand Up @@ -341,127 +319,6 @@ zkresult Database64::getFlushData(uint64_t flushId, uint64_t &storedFlushId, uno
return ZKR_SUCCESS;
}

void Database64::clearCache (void)
{
}

#if 0
void *dbSenderThread64 (void *arg)
{
Database64 *pDatabase = (Database64 *)arg;
zklog.info("dbSenderThread64() started");
MultiWrite64 &multiWrite = pDatabase->multiWrite;

while (true)
{
// Wait for the sending semaphore to be released, if there is no more data to send
struct timespec currentTime;
int iResult = clock_gettime(CLOCK_REALTIME, &currentTime);
if (iResult == -1)
{
zklog.error("dbSenderThread64() failed calling clock_gettime()");
exitProcess();
}

currentTime.tv_sec += 5;
sem_timedwait(&pDatabase->senderSem, &currentTime);

multiWrite.Lock();

bool bDataEmpty = false;

// If sending data is not empty (it failed before) then try to send it again
if (!multiWrite.data[multiWrite.storingDataIndex].multiQuery.isEmpty())
{
zklog.warning("dbSenderThread64() found sending data index not empty, probably because of a previous error; resuming...");
}
// If processing data is empty, then simply pretend to have sent data
else if (multiWrite.data[multiWrite.pendingToFlushDataIndex].IsEmpty())
{
//zklog.warning("dbSenderThread() found pending to flush data empty");

// Mark as if we sent all batches
multiWrite.storedFlushId = multiWrite.lastFlushId;
#ifdef LOG_DB_SENDER_THREAD
zklog.info("dbSenderThread64() found multi write processing data empty, so ignoring");
#endif
multiWrite.Unlock();
continue;
}
// Else, switch data indexes
else
{
// Accept all intray data
multiWrite.data[multiWrite.pendingToFlushDataIndex].acceptIntray(true);

// Advance processing and sending indexes
multiWrite.storingDataIndex = (multiWrite.storingDataIndex + 1) % 3;
multiWrite.pendingToFlushDataIndex = (multiWrite.pendingToFlushDataIndex + 1) % 3;
multiWrite.data[multiWrite.pendingToFlushDataIndex].Reset();
#ifdef LOG_DB_SENDER_THREAD
zklog.info("dbSenderThread64() updated: multiWrite=[" + multiWrite.print() + "]");
#endif

// Record the last processed batch included in this data set
multiWrite.storingFlushId = multiWrite.lastFlushId;

// If there is no data to send, just pretend to have sent it
if (multiWrite.data[multiWrite.storingDataIndex].IsEmpty())
{
// Update stored flush ID
multiWrite.storedFlushId = multiWrite.storingFlushId;

// Advance synchronizing index
multiWrite.synchronizingDataIndex = (multiWrite.synchronizingDataIndex + 1) % 3;
#ifdef LOG_DB_SENDER_THREAD
zklog.info("dbSenderThread64() no data to send: multiWrite=[" + multiWrite.print() + "]");
#endif
bDataEmpty = true;
}

}

// Unlock to let more processing batch data in
multiWrite.Unlock();

if (!bDataEmpty)
{
#ifdef LOG_DB_SENDER_THREAD
zklog.info("dbSenderThread64() starting to send data, multiWrite=[" + multiWrite.print() + "]");
#endif
zkresult zkr;
zkr = pDatabase->sendData();
if (zkr == ZKR_SUCCESS)
{
multiWrite.Lock();
multiWrite.storedFlushId = multiWrite.storingFlushId;
#ifdef LOG_DB_SENDER_THREAD
zklog.info("dbSenderThread64() successfully sent data, multiWrite=[]" + multiWrite.print() + "]");
#endif
// Advance synchronizing index
multiWrite.synchronizingDataIndex = (multiWrite.synchronizingDataIndex + 1) % 3;
#ifdef LOG_DB_SENDER_THREAD
zklog.info("dbSenderThread64() updated: multiWrite=[" + multiWrite.print() + "]");
#endif
sem_post(&pDatabase->getFlushDataSem);
#ifdef LOG_DB_SENDER_THREAD
zklog.info("dbSenderThread64() successfully called sem_post(&pDatabase->getFlushDataSem)");
#endif
multiWrite.Unlock();
}
else
{
zklog.error("dbSenderThread64() failed calling sendData() error=" + zkresult2string(zkr));
usleep(1000000);
}
}
}

zklog.info("dbSenderThread64() done");
return NULL;
}
#endif

zkresult Database64::consolidateBlock (uint64_t blockNumber)
{
return ZKR_UNSPECIFIED;
Expand Down Expand Up @@ -617,53 +474,6 @@ zkresult Database64::WriteTree (const Goldilocks::Element (&oldRoot)[4], const v
return ZKR_SUCCESS;
}

zkresult Database64::CalculateHash (Child &result, vector<TreeChunk *> &chunks, vector<DB64Query> &dbQueries, int chunkId, int level, vector<HashValueGL> *hashValues)
{
zkresult zkr;
vector<Child> results(64);

// Convert all TREE_CHUNK children into something else, typically INTERMEDIATE children,
// but they could also be LEAF (only one child below this level) or ZERO (no children below this level)
for (uint64_t i=0; i<64; i++)
{
if (chunks[chunkId]->getChild(i).type == TREE_CHUNK)
{
CalculateHash(result, chunks, dbQueries, chunks[chunkId]->getChild(i).treeChunkId, level + 6, hashValues);
chunks[chunkId]->setChild(i, result);
}
}

// Calculate the hash of this chunk
zkr = chunks[chunkId]->calculateHash(hashValues);
if (zkr != ZKR_SUCCESS)
{
zklog.error("Database64::CalculateHash() failed calling chunks[chunkId]->calculateHash() result=" + zkresult2string(zkr));
return zkr;
}

// Copy the result child
result = chunks[chunkId]->getChild1();

// Add to the database queries
if (result.type != ZERO)
{
// Encode the 64 children into database format
zkr = chunks[chunkId]->children2data();
if (zkr != ZKR_SUCCESS)
{
zklog.error("Database64::CalculateHash() failed calling chunks[chunkId]->children2data() result=" + zkresult2string(zkr));
return zkr;
}

Goldilocks::Element hash[4];
chunks[chunkId]->getHash(hash);
DB64Query dbQuery(fea2string(fr, hash), hash, chunks[chunkId]->data);
dbQueries.emplace_back(dbQuery);
}

return ZKR_SUCCESS;
}

zkresult Database64::ReadTree (const Goldilocks::Element (&root)[4], vector<KeyValue> &keyValues, vector<HashValueGL> *hashValues)
{
zkresult zkr;
Expand Down
56 changes: 6 additions & 50 deletions src/hashdb64/database_64.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ using namespace std;

/*
A Tree (state) is made of a set of TreeChunks:
A Tree (state) is made of a set of TreeChunks, each of them stored in one 4kB page:
/\
/__\
Expand All @@ -35,12 +35,12 @@ A Tree (state) is made of a set of TreeChunks:
/__\
When we want to read [key, value] for a given root:
- we call db.read(treeChunk.hash, treeChunk.data) starting from the root until we reach the [key, value] leaf node
- we search for the right page starting from the root until we reach the [key, value] leaf node in the final page
When we want to write a new leaf node [key, newValue] on a given root and get the resulting newStateRoot
- we calculate the new position of [key, newValue], creating new chunks if needed
- we calculate the new position of [key, newValue], creating new pages if needed
- we recalculate the hashes of all the modified and new chunks
- we call db.write(treeChunk.hash, treeChunk.data) of all the modified and new chunks
- we write every resulting hash in the proper position of the proper page
Every time we write a [key, newValue] we are potentially creating a new Tree = SUM(TreeChunks) if newValue != oldValue
Every new Tree represents a newer version of the state
Expand All @@ -60,51 +60,19 @@ The Forest takes note of the latest Tree hash to keep track of the current state
*/

class DB64Query
{
public:
string key;
Goldilocks::Element keyFea[4];
string &value; // value can be an input in multiWrite(), or an output in multiRead()
DB64Query(const string &_key, const Goldilocks::Element (&_keyFea)[4], string &_value) : key(_key), value(_value)
{
keyFea[0] = _keyFea[0];
keyFea[1] = _keyFea[1];
keyFea[2] = _keyFea[2];
keyFea[3] = _keyFea[3];
}
};

class Database64
{
public:
Goldilocks &fr;
const Config &config;
PoseidonGoldilocks poseidon;

// Basic flags
bool bInitialized = false;
bool useRemoteDB = false;

uint64_t headerPageNumber;

public:
//sem_t senderSem; // Semaphore to wakeup database sender thread when flush() is called
//sem_t getFlushDataSem; // Semaphore to unblock getFlushData() callers when new data is available
private:
//pthread_t senderPthread; // Database sender thread
//pthread_t cacheSynchPthread; // Cache synchronization thread

// Tree64
zkresult CalculateHash (Child &result, std::vector<TreeChunk *> &chunks, vector<DB64Query> &dbQueries, int idChunk, int level, vector<HashValueGL> *hashValues);
bool bInitialized = false;
uint64_t headerPageNumber;

public:

// Constructor and destructor
Database64(Goldilocks &fr, const Config &config);
~Database64();

public:
// Basic methods
void init(void);

Expand All @@ -124,24 +92,12 @@ class Database64
zkresult consolidateBlock (uint64_t blockNumber); // TODO: Who reports this block number?
zkresult revertBlock (uint64_t blockNumber);

public:
// Flush data pending to be stored permamently
zkresult flush(uint64_t &flushId, uint64_t &lastSentFlushId);
zkresult getFlushStatus(uint64_t &storedFlushId, uint64_t &storingFlushId, uint64_t &lastFlushId, uint64_t &pendingToFlushNodes, uint64_t &pendingToFlushProgram, uint64_t &storingNodes, uint64_t &storingProgram);

// Get flush data, written to database by dbSenderThread; it blocks
zkresult getFlushData(uint64_t flushId, uint64_t &lastSentFlushId, unordered_map<string, string> (&nodes), unordered_map<string, string> (&program), string &nodesStateRoot);

// Clear cache
void clearCache(void);


};

// Thread to send data to database
//void *dbSenderThread64(void *arg);

// Thread to synchronize cache from master hash DB server
//void *dbCacheSynchThread64(void *arg);

#endif
7 changes: 4 additions & 3 deletions src/hashdb64/state_manager_64.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "timer.hpp"
#include "persistence.hpp"
#include "definitions.hpp"
#include "zkglobals.hpp"

Goldilocks frSM64;
PoseidonGoldilocks poseidonSM64;
Expand Down Expand Up @@ -638,7 +639,7 @@ zkresult StateManager64::purge (const string &batchUUID, const string &_newState
continue;
}

zkr = purgeTxPersistence(txState.persistence[persistence], db.config);
zkr = purgeTxPersistence(txState.persistence[persistence], config);
if (zkr != ZKR_SUCCESS)
{
zklog.error("StateManager64::purge() failed calling purgeTxPersistence() zkr=" + zkresult2string(zkr) +
Expand Down Expand Up @@ -1019,7 +1020,7 @@ zkresult StateManager64::set (const string &batchUUID, uint64_t tx, Database64 &

zkresult zkr;

bool bUseStateManager = db.config.stateManager && (batchUUID.size() > 0);
bool bUseStateManager = config.stateManager && (batchUUID.size() > 0);

if (bUseStateManager)
{
Expand Down Expand Up @@ -1081,7 +1082,7 @@ zkresult StateManager64::get (const string &batchUUID, Database64 &db, const Gol
zklog.info("StateManager64::get() called with root=" + fea2string(fr,root) + " and key=" + fea2string(fr,key));
#endif

bool bUseStateManager = db.config.stateManager && (batchUUID.size() > 0);
bool bUseStateManager = config.stateManager && (batchUUID.size() > 0);

string keyString = fea2string(fr, key);
mpz_class value;
Expand Down
2 changes: 1 addition & 1 deletion src/service/hashdb/hashdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ void HashDB::clearCache(void)
{
if (config.hashDB64)
{
db64.clearCache();
// We don't use cache in HashDB64
}
else
{
Expand Down
1 change: 0 additions & 1 deletion test/service/executor/executor_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ bool ExecutorClient::ProcessBatch (void)
if (config.hashDB64)
{
Database64 &db = hashDB.db64;
db.clearCache();

CheckTreeCounters64 checkTreeCounters;

Expand Down
Loading

0 comments on commit 51a79f7

Please sign in to comment.