Skip to content

Commit

Permalink
Implement Database64 flush mechanism (fake)
Browse files Browse the repository at this point in the history
  • Loading branch information
fractasy committed Oct 17, 2023
1 parent 51a79f7 commit 707bc14
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 117 deletions.
130 changes: 18 additions & 112 deletions src/hashdb64/database_64.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
// Helper functions
string removeBSXIfExists64(string s) {return ((s.at(0) == '\\') && (s.at(1) == 'x')) ? s.substr(2) : s;}

Database64::Database64 (Goldilocks &fr, const Config &config) : headerPageNumber(0)
Database64::Database64 (Goldilocks &fr, const Config &config) : headerPageNumber(0), currentFlushId(0)
{
// Init mutex
pthread_mutex_init(&mutex, NULL);

zkresult zkr;
headerPageNumber = 0;
zkr = HeaderPage::InitEmptyPage(headerPageNumber);
Expand All @@ -36,9 +39,6 @@ Database64::Database64 (Goldilocks &fr, const Config &config) : headerPageNumber
zklog.error("Database64::Database64() failed calling HeaderPage::InitEmptyPage() result=" + zkresult2string(zkr));
exitProcess();
}
// Initialize semaphores
//sem_init(&senderSem, 0, 0);
//sem_init(&getFlushDataSem, 0, 0);
};

Database64::~Database64()
Expand Down Expand Up @@ -118,20 +118,6 @@ zkresult Database64::readKV(const Goldilocks::Element (&root)[4], const Goldiloc

return zkr;
}
/*
zkresult Database64::readKV(const Goldilocks::Element (&root)[4], vector<KeyValueLevel> &KVLs, DatabaseMap *dbReadLog){
zkresult zkr;
for (uint64_t i=0; i<KVLs.size(); i++)
{
zkr = readKV(root, KVLs[i].key, KVLs[i].value, KVLs[i].level, dbReadLog);
if (zkr != ZKR_SUCCESS)
{
zklog.error("Database64::readKV(KBs) failed calling read() result=" + zkresult2string(zkr) + " key=" + fea2string(fr, KVLs[i].key) );
return zkr;
}
}
return ZKR_SUCCESS;
}*/

zkresult Database64::setProgram (const string &key, const vector<uint8_t> &data, const bool persistent)
{
Expand Down Expand Up @@ -210,111 +196,31 @@ zkresult Database64::getProgram(const string &key, vector<uint8_t> &data, Databa

zkresult Database64::flush(uint64_t &thisBatch, uint64_t &lastSentBatch)
{
#if 0
if (!config.dbMultiWrite)
{
return ZKR_SUCCESS;
}

// If we are connected to a read-only database, just free memory and pretend to have sent all the data
if (config.dbReadOnly)
{
multiWrite.Lock();
multiWrite.data[multiWrite.pendingToFlushDataIndex].Reset();
multiWrite.Unlock();

return ZKR_SUCCESS;
}

//TimerStart(DATABASE_FLUSH);

multiWrite.Lock();

// Accept all intray data
multiWrite.data[multiWrite.pendingToFlushDataIndex].acceptIntray();

// Increase the last processed batch id and return the last sent batch id
multiWrite.lastFlushId++;
thisBatch = multiWrite.lastFlushId;
lastSentBatch = multiWrite.storedFlushId;
Lock();
currentFlushId++;
thisBatch = currentFlushId;
lastSentBatch = currentFlushId;

#ifdef LOG_DB_FLUSH
zklog.info("Database64::flush() thisBatch=" + to_string(thisBatch) + " lastSentBatch=" + to_string(lastSentBatch) + " multiWrite=[" + multiWrite.print() + "]");
#endif

// Notify the thread
sem_post(&senderSem);
Unlock();

multiWrite.Unlock();
#endif
return ZKR_SUCCESS;
}

zkresult Database64::getFlushStatus(uint64_t &storedFlushId, uint64_t &storingFlushId, uint64_t &lastFlushId, uint64_t &pendingToFlushNodes, uint64_t &pendingToFlushProgram, uint64_t &storingNodes, uint64_t &storingProgram)
{
/*multiWrite.Lock();
storedFlushId = multiWrite.storedFlushId;
storingFlushId = multiWrite.storingFlushId;
lastFlushId = multiWrite.lastFlushId;
pendingToFlushNodes = multiWrite.data[multiWrite.pendingToFlushDataIndex].nodes.size();
pendingToFlushProgram = multiWrite.data[multiWrite.pendingToFlushDataIndex].program.size();
storingNodes = multiWrite.data[multiWrite.storingDataIndex].nodes.size();
storingProgram = multiWrite.data[multiWrite.storingDataIndex].program.size();
multiWrite.Unlock();*/

return ZKR_SUCCESS;
}


// Get flush data, written to database by dbSenderThread; it blocks
zkresult Database64::getFlushData(uint64_t flushId, uint64_t &storedFlushId, unordered_map<string, string> (&nodes), unordered_map<string, string> (&program), string &nodesStateRoot)
{
#if 0
//zklog.info("--> getFlushData()");

// Set the deadline to now + 60 seconds
struct timespec deadline;
clock_gettime(CLOCK_REALTIME, &deadline);
deadline.tv_sec += 60;

// Try to get the semaphore
int iResult;
iResult = sem_timedwait(&getFlushDataSem, &deadline);
if (iResult != 0)
{
zklog.info("Database64::getFlushData() timed out");
return ZKR_SUCCESS;
}

multiWrite.Lock();
MultiWriteData64 &data = multiWrite.data[multiWrite.synchronizingDataIndex];

zklog.info("Database64::getFlushData woke up: pendingToFlushDataIndex=" + to_string(multiWrite.pendingToFlushDataIndex) +
" storingDataIndex=" + to_string(multiWrite.storingDataIndex) +
" synchronizingDataIndex=" + to_string(multiWrite.synchronizingDataIndex) +
" nodes=" + to_string(data.nodes.size()) +
" program=" + to_string(data.program.size()) +
" nodesStateRoot=" + data.nodesStateRoot);

if (data.nodes.size() > 0)
{
nodes = data.nodes;
}

if (data.program.size() > 0)
{
program = data.program;
}

if (data.nodesStateRoot.size() > 0)
{
nodesStateRoot = data.nodesStateRoot;
}

multiWrite.Unlock();

//zklog.info("<-- getFlushData()");
#endif
Lock();
storedFlushId = currentFlushId;
storingFlushId = currentFlushId;
lastFlushId = currentFlushId;
pendingToFlushNodes = 0;
pendingToFlushProgram = 0;
storingNodes = 0;
storingProgram = 0;
Unlock();

return ZKR_SUCCESS;
}
Expand Down
7 changes: 5 additions & 2 deletions src/hashdb64/database_64.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class Database64

bool bInitialized = false;
uint64_t headerPageNumber;
pthread_mutex_t mutex;
uint64_t currentFlushId;

public:

Expand Down Expand Up @@ -96,8 +98,9 @@ class Database64
zkresult flush(uint64_t &flushId, uint64_t &lastSentFlushId);
zkresult getFlushStatus(uint64_t &storedFlushId, uint64_t &storingFlushId, uint64_t &lastFlushId, uint64_t &pendingToFlushNodes, uint64_t &pendingToFlushProgram, uint64_t &storingNodes, uint64_t &storingProgram);

// Get flush data, written to database by dbSenderThread; it blocks
zkresult getFlushData(uint64_t flushId, uint64_t &lastSentFlushId, unordered_map<string, string> (&nodes), unordered_map<string, string> (&program), string &nodesStateRoot);
// Lock/Unlock
void Lock(void) { pthread_mutex_lock(&mutex); };
void Unlock(void) { pthread_mutex_unlock(&mutex); };
};

#endif
3 changes: 2 additions & 1 deletion src/service/hashdb/hashdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ zkresult HashDB::getFlushData(uint64_t flushId, uint64_t &lastSentFlushId, unord

if (config.hashDB64)
{
zkr = db64.getFlushData(flushId, lastSentFlushId, nodes, program, nodesStateRoot);
zklog.error("HashDB::getFlushData() called with config.hashDB64=true");
return ZKR_DB_ERROR;
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions test/service/hashdb/hashdb64_workflow_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ uint64_t HashDB64WorkflowTest (const Config& config)
for (uint64_t i=0; i<4; i++) root[i] = consolidatedStateRoot[i];

// Wait for data to be sent
/*while (true)
while (true)
{
uint64_t storedFlushId, storingFlushId, lastFlushId, pendingToFlushNodes, pendingToFlushProgram, storingNodes, storingProgram;
string proverId;
Expand All @@ -141,7 +141,7 @@ uint64_t HashDB64WorkflowTest (const Config& config)
}
sleep(1);
}
zklog.info("FLUSHED");*/
zklog.info("FLUSHED");

// Call ReadTree with the old state root to get the hashes of the initial values of all read or written keys
/*vector<HashValueGL> oldHashValues;
Expand Down

0 comments on commit 707bc14

Please sign in to comment.