From 4b5f4cb948d002bb5c72732e40219b89119d76cc Mon Sep 17 00:00:00 2001 From: fractasy Date: Tue, 17 Oct 2023 08:16:35 +0000 Subject: [PATCH 1/3] Fix hashdb64 workflow test --- src/hashdb64/database_64.cpp | 57 ++++++++++++++++---- src/hashdb64/page/header_page.cpp | 7 +-- src/hashdb64/page/header_page.hpp | 2 +- src/hashdb64/page/key_value_history_page.cpp | 22 ++------ 4 files changed, 54 insertions(+), 34 deletions(-) diff --git a/src/hashdb64/database_64.cpp b/src/hashdb64/database_64.cpp index 6ee591d81..6ce2bd2de 100644 --- a/src/hashdb64/database_64.cpp +++ b/src/hashdb64/database_64.cpp @@ -83,7 +83,10 @@ void Database64::init(void) zkresult Database64::readKV(const Goldilocks::Element (&root)[4], const Goldilocks::Element (&key)[4], mpz_class &value, uint64_t &level ,DatabaseMap *dbReadLog) { - level = 128; + zkresult zkr; + + level = 128; // TODO: Return the right level + // Check that it has been initialized before if (!bInitialized) { @@ -91,24 +94,51 @@ zkresult Database64::readKV(const Goldilocks::Element (&root)[4], const Goldiloc exitProcess(); } - struct timeval t; - if (dbReadLog != NULL) gettimeofday(&t, NULL); + // Convert root to a byte array + string rootString = fea2string(fr, root); + string rootBa = string2ba(rootString); + + // Get the version associated to this root + uint64_t version; + zkr = HeaderPage::ReadRootVersion(headerPageNumber, rootBa, version); + if (zkr != ZKR_SUCCESS) + { + zklog.error("Database64::readKV() faile calling HeaderPage::ReadRootVersion() result=" + zkresult2string(zkr) + " root=" + rootString + " key=" + fea2string(fr, key)); + return zkr; + } + + // Get the version data + VersionDataStruct versionData; + zkr = HeaderPage::ReadVersionData(headerPageNumber, version, versionData); + if (zkr != ZKR_SUCCESS) + { + zklog.error("Database64::readKV() faile calling HeaderPage::ReadVersionData() result=" + zkresult2string(zkr) + " root=" + rootString + " key=" + fea2string(fr, key)); + return zkr; + } - zkresult rout = ZKR_UNSPECIFIED; + // Get the value + string keyString = fea2string(fr, key); + string keyBa = string2ba(keyString); + zkr = HeaderPage::KeyValueHistoryRead(versionData.keyValueHistoryPage, keyBa, version, value); + if (zkr != ZKR_SUCCESS) + { + zklog.error("Database64::readKV() faile calling HeaderPage::KeyValueHistoryRead() result=" + zkresult2string(zkr) + " root=" + rootString + " key=" + fea2string(fr, key)); + return zkr; + } #ifdef LOG_DB_READ { string s = "Database64::readKV()"; - if (rout != ZKR_SUCCESS) - s += " ERROR=" + zkresult2string(rout); + if (zkr != ZKR_SUCCESS) + s += " ERROR=" + zkresult2string(zkr); s += " key=" + keyStr; s += " value="; s += value.get_str(16) + ";"; zklog.info(s); } #endif - return rout; + return zkr; } /* zkresult Database64::readKV(const Goldilocks::Element (&root)[4], vector &KVLs, DatabaseMap *dbReadLog){ @@ -638,7 +668,7 @@ zkresult Database64::ReadTree (const Goldilocks::Element (&root)[4], vector 0) - { - pthread_create(&cacheSynchPthread, NULL, dbCacheSynchThread64, this); - - }*/ - - useRemoteDB = true; - } - else - { - useRemoteDB = false; - } - // Mark the database as initialized bInitialized = true; } @@ -341,127 +319,6 @@ zkresult Database64::getFlushData(uint64_t flushId, uint64_t &storedFlushId, uno return ZKR_SUCCESS; } -void Database64::clearCache (void) -{ -} - -#if 0 -void *dbSenderThread64 (void *arg) -{ - Database64 *pDatabase = (Database64 *)arg; - zklog.info("dbSenderThread64() started"); - MultiWrite64 &multiWrite = pDatabase->multiWrite; - - while (true) - { - // Wait for the sending semaphore to be released, if there is no more data to send - struct timespec currentTime; - int iResult = clock_gettime(CLOCK_REALTIME, ¤tTime); - if (iResult == -1) - { - zklog.error("dbSenderThread64() failed calling clock_gettime()"); - exitProcess(); - } - - currentTime.tv_sec += 5; - sem_timedwait(&pDatabase->senderSem, ¤tTime); - - multiWrite.Lock(); - - bool bDataEmpty = false; - - // If sending data is not empty (it failed before) then try to send it again - if (!multiWrite.data[multiWrite.storingDataIndex].multiQuery.isEmpty()) - { - zklog.warning("dbSenderThread64() found sending data index not empty, probably because of a previous error; resuming..."); - } - // If processing data is empty, then simply pretend to have sent data - else if (multiWrite.data[multiWrite.pendingToFlushDataIndex].IsEmpty()) - { - //zklog.warning("dbSenderThread() found pending to flush data empty"); - - // Mark as if we sent all batches - multiWrite.storedFlushId = multiWrite.lastFlushId; -#ifdef LOG_DB_SENDER_THREAD - zklog.info("dbSenderThread64() found multi write processing data empty, so ignoring"); -#endif - multiWrite.Unlock(); - continue; - } - // Else, switch data indexes - else - { - // Accept all intray data - multiWrite.data[multiWrite.pendingToFlushDataIndex].acceptIntray(true); - - // Advance processing and sending indexes - multiWrite.storingDataIndex = (multiWrite.storingDataIndex + 1) % 3; - multiWrite.pendingToFlushDataIndex = (multiWrite.pendingToFlushDataIndex + 1) % 3; - multiWrite.data[multiWrite.pendingToFlushDataIndex].Reset(); -#ifdef LOG_DB_SENDER_THREAD - zklog.info("dbSenderThread64() updated: multiWrite=[" + multiWrite.print() + "]"); -#endif - - // Record the last processed batch included in this data set - multiWrite.storingFlushId = multiWrite.lastFlushId; - - // If there is no data to send, just pretend to have sent it - if (multiWrite.data[multiWrite.storingDataIndex].IsEmpty()) - { - // Update stored flush ID - multiWrite.storedFlushId = multiWrite.storingFlushId; - - // Advance synchronizing index - multiWrite.synchronizingDataIndex = (multiWrite.synchronizingDataIndex + 1) % 3; -#ifdef LOG_DB_SENDER_THREAD - zklog.info("dbSenderThread64() no data to send: multiWrite=[" + multiWrite.print() + "]"); -#endif - bDataEmpty = true; - } - - } - - // Unlock to let more processing batch data in - multiWrite.Unlock(); - - if (!bDataEmpty) - { -#ifdef LOG_DB_SENDER_THREAD - zklog.info("dbSenderThread64() starting to send data, multiWrite=[" + multiWrite.print() + "]"); -#endif - zkresult zkr; - zkr = pDatabase->sendData(); - if (zkr == ZKR_SUCCESS) - { - multiWrite.Lock(); - multiWrite.storedFlushId = multiWrite.storingFlushId; -#ifdef LOG_DB_SENDER_THREAD - zklog.info("dbSenderThread64() successfully sent data, multiWrite=[]" + multiWrite.print() + "]"); -#endif - // Advance synchronizing index - multiWrite.synchronizingDataIndex = (multiWrite.synchronizingDataIndex + 1) % 3; -#ifdef LOG_DB_SENDER_THREAD - zklog.info("dbSenderThread64() updated: multiWrite=[" + multiWrite.print() + "]"); -#endif - sem_post(&pDatabase->getFlushDataSem); -#ifdef LOG_DB_SENDER_THREAD - zklog.info("dbSenderThread64() successfully called sem_post(&pDatabase->getFlushDataSem)"); -#endif - multiWrite.Unlock(); - } - else - { - zklog.error("dbSenderThread64() failed calling sendData() error=" + zkresult2string(zkr)); - usleep(1000000); - } - } - } - - zklog.info("dbSenderThread64() done"); - return NULL; -} -#endif - zkresult Database64::consolidateBlock (uint64_t blockNumber) { return ZKR_UNSPECIFIED; @@ -617,53 +474,6 @@ zkresult Database64::WriteTree (const Goldilocks::Element (&oldRoot)[4], const v return ZKR_SUCCESS; } -zkresult Database64::CalculateHash (Child &result, vector &chunks, vector &dbQueries, int chunkId, int level, vector *hashValues) -{ - zkresult zkr; - vector results(64); - - // Convert all TREE_CHUNK children into something else, typically INTERMEDIATE children, - // but they could also be LEAF (only one child below this level) or ZERO (no children below this level) - for (uint64_t i=0; i<64; i++) - { - if (chunks[chunkId]->getChild(i).type == TREE_CHUNK) - { - CalculateHash(result, chunks, dbQueries, chunks[chunkId]->getChild(i).treeChunkId, level + 6, hashValues); - chunks[chunkId]->setChild(i, result); - } - } - - // Calculate the hash of this chunk - zkr = chunks[chunkId]->calculateHash(hashValues); - if (zkr != ZKR_SUCCESS) - { - zklog.error("Database64::CalculateHash() failed calling chunks[chunkId]->calculateHash() result=" + zkresult2string(zkr)); - return zkr; - } - - // Copy the result child - result = chunks[chunkId]->getChild1(); - - // Add to the database queries - if (result.type != ZERO) - { - // Encode the 64 children into database format - zkr = chunks[chunkId]->children2data(); - if (zkr != ZKR_SUCCESS) - { - zklog.error("Database64::CalculateHash() failed calling chunks[chunkId]->children2data() result=" + zkresult2string(zkr)); - return zkr; - } - - Goldilocks::Element hash[4]; - chunks[chunkId]->getHash(hash); - DB64Query dbQuery(fea2string(fr, hash), hash, chunks[chunkId]->data); - dbQueries.emplace_back(dbQuery); - } - - return ZKR_SUCCESS; -} - zkresult Database64::ReadTree (const Goldilocks::Element (&root)[4], vector &keyValues, vector *hashValues) { zkresult zkr; diff --git a/src/hashdb64/database_64.hpp b/src/hashdb64/database_64.hpp index 0b5756581..5845198a0 100644 --- a/src/hashdb64/database_64.hpp +++ b/src/hashdb64/database_64.hpp @@ -23,7 +23,7 @@ using namespace std; /* -A Tree (state) is made of a set of TreeChunks: +A Tree (state) is made of a set of TreeChunks, each of them stored in one 4kB page: /\ /__\ @@ -35,12 +35,12 @@ A Tree (state) is made of a set of TreeChunks: /__\ When we want to read [key, value] for a given root: - - we call db.read(treeChunk.hash, treeChunk.data) starting from the root until we reach the [key, value] leaf node + - we search for the right page starting from the root until we reach the [key, value] leaf node in the final page When we want to write a new leaf node [key, newValue] on a given root and get the resulting newStateRoot - - we calculate the new position of [key, newValue], creating new chunks if needed + - we calculate the new position of [key, newValue], creating new pages if needed - we recalculate the hashes of all the modified and new chunks - - we call db.write(treeChunk.hash, treeChunk.data) of all the modified and new chunks + - we write every resulting hash in the proper position of the proper page Every time we write a [key, newValue] we are potentially creating a new Tree = SUM(TreeChunks) if newValue != oldValue Every new Tree represents a newer version of the state @@ -60,43 +60,12 @@ The Forest takes note of the latest Tree hash to keep track of the current state */ -class DB64Query -{ -public: - string key; - Goldilocks::Element keyFea[4]; - string &value; // value can be an input in multiWrite(), or an output in multiRead() - DB64Query(const string &_key, const Goldilocks::Element (&_keyFea)[4], string &_value) : key(_key), value(_value) - { - keyFea[0] = _keyFea[0]; - keyFea[1] = _keyFea[1]; - keyFea[2] = _keyFea[2]; - keyFea[3] = _keyFea[3]; - } -}; - class Database64 { -public: - Goldilocks &fr; - const Config &config; - PoseidonGoldilocks poseidon; - - // Basic flags - bool bInitialized = false; - bool useRemoteDB = false; - - uint64_t headerPageNumber; - -public: - //sem_t senderSem; // Semaphore to wakeup database sender thread when flush() is called - //sem_t getFlushDataSem; // Semaphore to unblock getFlushData() callers when new data is available private: - //pthread_t senderPthread; // Database sender thread - //pthread_t cacheSynchPthread; // Cache synchronization thread - // Tree64 - zkresult CalculateHash (Child &result, std::vector &chunks, vector &dbQueries, int idChunk, int level, vector *hashValues); + bool bInitialized = false; + uint64_t headerPageNumber; public: @@ -104,7 +73,6 @@ class Database64 Database64(Goldilocks &fr, const Config &config); ~Database64(); -public: // Basic methods void init(void); @@ -124,24 +92,12 @@ class Database64 zkresult consolidateBlock (uint64_t blockNumber); // TODO: Who reports this block number? zkresult revertBlock (uint64_t blockNumber); -public: // Flush data pending to be stored permamently zkresult flush(uint64_t &flushId, uint64_t &lastSentFlushId); zkresult getFlushStatus(uint64_t &storedFlushId, uint64_t &storingFlushId, uint64_t &lastFlushId, uint64_t &pendingToFlushNodes, uint64_t &pendingToFlushProgram, uint64_t &storingNodes, uint64_t &storingProgram); // Get flush data, written to database by dbSenderThread; it blocks zkresult getFlushData(uint64_t flushId, uint64_t &lastSentFlushId, unordered_map (&nodes), unordered_map (&program), string &nodesStateRoot); - - // Clear cache - void clearCache(void); - - }; -// Thread to send data to database -//void *dbSenderThread64(void *arg); - -// Thread to synchronize cache from master hash DB server -//void *dbCacheSynchThread64(void *arg); - #endif \ No newline at end of file diff --git a/src/hashdb64/state_manager_64.cpp b/src/hashdb64/state_manager_64.cpp index a7ec61dc5..ffa0e8154 100644 --- a/src/hashdb64/state_manager_64.cpp +++ b/src/hashdb64/state_manager_64.cpp @@ -5,6 +5,7 @@ #include "timer.hpp" #include "persistence.hpp" #include "definitions.hpp" +#include "zkglobals.hpp" Goldilocks frSM64; PoseidonGoldilocks poseidonSM64; @@ -638,7 +639,7 @@ zkresult StateManager64::purge (const string &batchUUID, const string &_newState continue; } - zkr = purgeTxPersistence(txState.persistence[persistence], db.config); + zkr = purgeTxPersistence(txState.persistence[persistence], config); if (zkr != ZKR_SUCCESS) { zklog.error("StateManager64::purge() failed calling purgeTxPersistence() zkr=" + zkresult2string(zkr) + @@ -1019,7 +1020,7 @@ zkresult StateManager64::set (const string &batchUUID, uint64_t tx, Database64 & zkresult zkr; - bool bUseStateManager = db.config.stateManager && (batchUUID.size() > 0); + bool bUseStateManager = config.stateManager && (batchUUID.size() > 0); if (bUseStateManager) { @@ -1081,7 +1082,7 @@ zkresult StateManager64::get (const string &batchUUID, Database64 &db, const Gol zklog.info("StateManager64::get() called with root=" + fea2string(fr,root) + " and key=" + fea2string(fr,key)); #endif - bool bUseStateManager = db.config.stateManager && (batchUUID.size() > 0); + bool bUseStateManager = config.stateManager && (batchUUID.size() > 0); string keyString = fea2string(fr, key); mpz_class value; diff --git a/src/service/hashdb/hashdb.cpp b/src/service/hashdb/hashdb.cpp index 882d91d4b..468365b08 100644 --- a/src/service/hashdb/hashdb.cpp +++ b/src/service/hashdb/hashdb.cpp @@ -413,7 +413,7 @@ void HashDB::clearCache(void) { if (config.hashDB64) { - db64.clearCache(); + // We don't use cache in HashDB64 } else { diff --git a/test/service/executor/executor_client.cpp b/test/service/executor/executor_client.cpp index 37316f0a1..56ab1ff21 100644 --- a/test/service/executor/executor_client.cpp +++ b/test/service/executor/executor_client.cpp @@ -196,7 +196,6 @@ bool ExecutorClient::ProcessBatch (void) if (config.hashDB64) { Database64 &db = hashDB.db64; - db.clearCache(); CheckTreeCounters64 checkTreeCounters; diff --git a/test/utils/check_tree_64.cpp b/test/utils/check_tree_64.cpp index 30debb57c..a022fef14 100644 --- a/test/utils/check_tree_64.cpp +++ b/test/utils/check_tree_64.cpp @@ -2,6 +2,7 @@ #include "zkmax.hpp" #include "scalar.hpp" #include "tree_chunk.hpp" +#include "zkglobals.hpp" zkresult CheckTree64 (Database64 &db, const string &key, uint64_t level, CheckTreeCounters64 &checkTreeCounters) { @@ -11,7 +12,7 @@ zkresult CheckTree64 (Database64 &db, const string &key, uint64_t level, CheckTr TreeChunk treeChunk(poseidon); Goldilocks::Element keyFea[4]; - string2fea(db.fr, key, keyFea); + string2fea(fr, key, keyFea); zkresult result = ZKR_UNSPECIFIED; // = db.read(key, keyFea, treeChunk.data, NULL, false); if (result != ZKR_SUCCESS) { @@ -38,7 +39,7 @@ zkresult CheckTree64 (Database64 &db, const string &key, uint64_t level, CheckTr case INTERMEDIATE: { checkTreeCounters.intermediateNodes++; - result = CheckTree64(db, fea2string(db.fr, treeChunk.getChild(i).intermediate.hash), level+1, checkTreeCounters); + result = CheckTree64(db, fea2string(fr, treeChunk.getChild(i).intermediate.hash), level+1, checkTreeCounters); if (zkr != ZKR_SUCCESS) { return zkr; From 707bc145e89faebf16a670f1a50a0fd14c017629 Mon Sep 17 00:00:00 2001 From: fractasy Date: Tue, 17 Oct 2023 11:13:46 +0000 Subject: [PATCH 3/3] Implement Database64 flush mechanism (fake) --- src/hashdb64/database_64.cpp | 130 +++--------------- src/hashdb64/database_64.hpp | 7 +- src/service/hashdb/hashdb.cpp | 3 +- .../service/hashdb/hashdb64_workflow_test.cpp | 4 +- 4 files changed, 27 insertions(+), 117 deletions(-) diff --git a/src/hashdb64/database_64.cpp b/src/hashdb64/database_64.cpp index e43c28ba8..e8959d6be 100644 --- a/src/hashdb64/database_64.cpp +++ b/src/hashdb64/database_64.cpp @@ -26,8 +26,11 @@ // Helper functions string removeBSXIfExists64(string s) {return ((s.at(0) == '\\') && (s.at(1) == 'x')) ? s.substr(2) : s;} -Database64::Database64 (Goldilocks &fr, const Config &config) : headerPageNumber(0) +Database64::Database64 (Goldilocks &fr, const Config &config) : headerPageNumber(0), currentFlushId(0) { + // Init mutex + pthread_mutex_init(&mutex, NULL); + zkresult zkr; headerPageNumber = 0; zkr = HeaderPage::InitEmptyPage(headerPageNumber); @@ -36,9 +39,6 @@ Database64::Database64 (Goldilocks &fr, const Config &config) : headerPageNumber zklog.error("Database64::Database64() failed calling HeaderPage::InitEmptyPage() result=" + zkresult2string(zkr)); exitProcess(); } - // Initialize semaphores - //sem_init(&senderSem, 0, 0); - //sem_init(&getFlushDataSem, 0, 0); }; Database64::~Database64() @@ -118,20 +118,6 @@ zkresult Database64::readKV(const Goldilocks::Element (&root)[4], const Goldiloc return zkr; } -/* -zkresult Database64::readKV(const Goldilocks::Element (&root)[4], vector &KVLs, DatabaseMap *dbReadLog){ - zkresult zkr; - for (uint64_t i=0; i (&nodes), unordered_map (&program), string &nodesStateRoot) -{ -#if 0 - //zklog.info("--> getFlushData()"); - - // Set the deadline to now + 60 seconds - struct timespec deadline; - clock_gettime(CLOCK_REALTIME, &deadline); - deadline.tv_sec += 60; - - // Try to get the semaphore - int iResult; - iResult = sem_timedwait(&getFlushDataSem, &deadline); - if (iResult != 0) - { - zklog.info("Database64::getFlushData() timed out"); - return ZKR_SUCCESS; - } - - multiWrite.Lock(); - MultiWriteData64 &data = multiWrite.data[multiWrite.synchronizingDataIndex]; - - zklog.info("Database64::getFlushData woke up: pendingToFlushDataIndex=" + to_string(multiWrite.pendingToFlushDataIndex) + - " storingDataIndex=" + to_string(multiWrite.storingDataIndex) + - " synchronizingDataIndex=" + to_string(multiWrite.synchronizingDataIndex) + - " nodes=" + to_string(data.nodes.size()) + - " program=" + to_string(data.program.size()) + - " nodesStateRoot=" + data.nodesStateRoot); - - if (data.nodes.size() > 0) - { - nodes = data.nodes; - } - - if (data.program.size() > 0) - { - program = data.program; - } - - if (data.nodesStateRoot.size() > 0) - { - nodesStateRoot = data.nodesStateRoot; - } - - multiWrite.Unlock(); - - //zklog.info("<-- getFlushData()"); -#endif + Lock(); + storedFlushId = currentFlushId; + storingFlushId = currentFlushId; + lastFlushId = currentFlushId; + pendingToFlushNodes = 0; + pendingToFlushProgram = 0; + storingNodes = 0; + storingProgram = 0; + Unlock(); return ZKR_SUCCESS; } diff --git a/src/hashdb64/database_64.hpp b/src/hashdb64/database_64.hpp index 5845198a0..d018859c0 100644 --- a/src/hashdb64/database_64.hpp +++ b/src/hashdb64/database_64.hpp @@ -66,6 +66,8 @@ class Database64 bool bInitialized = false; uint64_t headerPageNumber; + pthread_mutex_t mutex; + uint64_t currentFlushId; public: @@ -96,8 +98,9 @@ class Database64 zkresult flush(uint64_t &flushId, uint64_t &lastSentFlushId); zkresult getFlushStatus(uint64_t &storedFlushId, uint64_t &storingFlushId, uint64_t &lastFlushId, uint64_t &pendingToFlushNodes, uint64_t &pendingToFlushProgram, uint64_t &storingNodes, uint64_t &storingProgram); - // Get flush data, written to database by dbSenderThread; it blocks - zkresult getFlushData(uint64_t flushId, uint64_t &lastSentFlushId, unordered_map (&nodes), unordered_map (&program), string &nodesStateRoot); + // Lock/Unlock + void Lock(void) { pthread_mutex_lock(&mutex); }; + void Unlock(void) { pthread_mutex_unlock(&mutex); }; }; #endif \ No newline at end of file diff --git a/src/service/hashdb/hashdb.cpp b/src/service/hashdb/hashdb.cpp index 468365b08..2f51866f0 100644 --- a/src/service/hashdb/hashdb.cpp +++ b/src/service/hashdb/hashdb.cpp @@ -399,7 +399,8 @@ zkresult HashDB::getFlushData(uint64_t flushId, uint64_t &lastSentFlushId, unord if (config.hashDB64) { - zkr = db64.getFlushData(flushId, lastSentFlushId, nodes, program, nodesStateRoot); + zklog.error("HashDB::getFlushData() called with config.hashDB64=true"); + return ZKR_DB_ERROR; } else { diff --git a/test/service/hashdb/hashdb64_workflow_test.cpp b/test/service/hashdb/hashdb64_workflow_test.cpp index b21ab803c..137d2eb23 100644 --- a/test/service/hashdb/hashdb64_workflow_test.cpp +++ b/test/service/hashdb/hashdb64_workflow_test.cpp @@ -128,7 +128,7 @@ uint64_t HashDB64WorkflowTest (const Config& config) for (uint64_t i=0; i<4; i++) root[i] = consolidatedStateRoot[i]; // Wait for data to be sent - /*while (true) + while (true) { uint64_t storedFlushId, storingFlushId, lastFlushId, pendingToFlushNodes, pendingToFlushProgram, storingNodes, storingProgram; string proverId; @@ -141,7 +141,7 @@ uint64_t HashDB64WorkflowTest (const Config& config) } sleep(1); } - zklog.info("FLUSHED");*/ + zklog.info("FLUSHED"); // Call ReadTree with the old state root to get the hashes of the initial values of all read or written keys /*vector oldHashValues;