Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only search archives within the search time range; Add begin and end timestamp to archive metadata. #138

Merged
merged 9 commits into from
Jul 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def main(argv):
CREATE TABLE IF NOT EXISTS `{table_prefix}archives` (
`pagination_id` BIGINT unsigned NOT NULL AUTO_INCREMENT,
`id` VARCHAR(64) NOT NULL,
`begin_timestamp` BIGINT NOT NULL,
`end_timestamp` BIGINT NOT NULL,
`uncompressed_size` BIGINT NOT NULL,
`size` BIGINT NOT NULL,
`creator_id` VARCHAR(64) NOT NULL,
Expand Down
8 changes: 8 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ set(SOURCE_FILES_clp
src/SQLitePreparedStatement.hpp
src/Stopwatch.cpp
src/Stopwatch.hpp
src/streaming_archive/ArchiveMetadata.cpp
src/streaming_archive/ArchiveMetadata.hpp
src/streaming_archive/Constants.hpp
src/streaming_archive/MetadataDB.cpp
src/streaming_archive/MetadataDB.hpp
Expand Down Expand Up @@ -430,6 +432,8 @@ set(SOURCE_FILES_clg
src/SQLitePreparedStatement.hpp
src/Stopwatch.cpp
src/Stopwatch.hpp
src/streaming_archive/ArchiveMetadata.cpp
src/streaming_archive/ArchiveMetadata.hpp
src/streaming_archive/Constants.hpp
src/streaming_archive/MetadataDB.cpp
src/streaming_archive/MetadataDB.hpp
Expand Down Expand Up @@ -573,6 +577,8 @@ set(SOURCE_FILES_clo
src/SQLitePreparedStatement.hpp
src/Stopwatch.cpp
src/Stopwatch.hpp
src/streaming_archive/ArchiveMetadata.cpp
src/streaming_archive/ArchiveMetadata.hpp
src/streaming_archive/Constants.hpp
src/streaming_archive/MetadataDB.cpp
src/streaming_archive/MetadataDB.hpp
Expand Down Expand Up @@ -774,6 +780,8 @@ set(SOURCE_FILES_unitTest
src/SQLitePreparedStatement.hpp
src/Stopwatch.cpp
src/Stopwatch.hpp
src/streaming_archive/ArchiveMetadata.cpp
src/streaming_archive/ArchiveMetadata.hpp
src/streaming_archive/Constants.hpp
src/streaming_archive/MetadataDB.cpp
src/streaming_archive/MetadataDB.hpp
Expand Down
22 changes: 12 additions & 10 deletions components/core/src/GlobalMetadataDB.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

// Project headers
#include "streaming_archive/ArchiveMetadata.hpp"
#include "streaming_archive/writer/File.hpp"

/**
Expand Down Expand Up @@ -44,21 +45,15 @@ class GlobalMetadataDB {
/**
* Adds an archive to the global metadata database
* @param id
* @param uncompressed_size
* @param size
* @param creator_id
* @param creation_num
* @param metadata
*/
virtual void add_archive (const std::string& id, uint64_t uncompressed_size, uint64_t size,
const std::string& creator_id, uint64_t creation_num) = 0;
virtual void add_archive (const std::string& id, const streaming_archive::ArchiveMetadata& metadata) = 0;
/**
* Updates the size of the archive identified by the given ID in the global metadata database
* @param archive_id
* @param uncompressed_size
* @param size
* @param metadata
*/
virtual void update_archive_size (const std::string& archive_id, uint64_t uncompressed_size,
uint64_t size) = 0;
virtual void update_archive_metadata (const std::string& archive_id, const streaming_archive::ArchiveMetadata& metadata) = 0;
/**
* Updates the metadata of the given files in the global metadata database
* @param archive_id
Expand All @@ -71,6 +66,13 @@ class GlobalMetadataDB {
* @return The archive iterator
*/
virtual ArchiveIterator* get_archive_iterator () = 0;
/**
* Gets an iterator to iterate over every archive that falls in the given time window in the global metadata database
* @param begin_ts
* @param end_ts
* @return The archive iterator
*/
virtual ArchiveIterator* get_archive_iterator_for_time_window (epochtime_t begin_ts, epochtime_t end_ts) = 0;
/**
* Gets an iterator to iterate over every archive that contains a given file path in the global metadata database
* @return The archive iterator
Expand Down
55 changes: 46 additions & 9 deletions components/core/src/GlobalMySQLMetadataDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ using std::vector;
// Types
enum class ArchivesTableFieldIndexes : uint16_t {
Id = 0,
BeginTimestamp,
EndTimestamp,
UncompressedSize,
Size,
CreatorId,
CreationIx,
Length,
};
enum class UpdateArchiveSizeStmtFieldIndexes : uint16_t {
UncompressedSize = 0,
BeginTimestamp = 0,
EndTimestamp,
UncompressedSize,
Size,
Length,
};
Expand Down Expand Up @@ -52,6 +56,8 @@ void GlobalMySQLMetadataDB::open () {

vector<string> archive_field_names(enum_to_underlying_type(ArchivesTableFieldIndexes::Length));
archive_field_names[enum_to_underlying_type(ArchivesTableFieldIndexes::Id)] = streaming_archive::cMetadataDB::Archive::Id;
archive_field_names[enum_to_underlying_type(ArchivesTableFieldIndexes::BeginTimestamp)] = streaming_archive::cMetadataDB::Archive::BeginTimestamp;
archive_field_names[enum_to_underlying_type(ArchivesTableFieldIndexes::EndTimestamp)] = streaming_archive::cMetadataDB::Archive::EndTimestamp;
archive_field_names[enum_to_underlying_type(ArchivesTableFieldIndexes::UncompressedSize)] = streaming_archive::cMetadataDB::Archive::UncompressedSize;
archive_field_names[enum_to_underlying_type(ArchivesTableFieldIndexes::Size)] = streaming_archive::cMetadataDB::Archive::Size;
archive_field_names[enum_to_underlying_type(ArchivesTableFieldIndexes::CreatorId)] = streaming_archive::cMetadataDB::Archive::CreatorId;
Expand All @@ -67,6 +73,10 @@ void GlobalMySQLMetadataDB::open () {
statement_buffer.clear();

vector<string> update_archive_size_stmt_field_names(enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::Length));
update_archive_size_stmt_field_names[enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::BeginTimestamp)] =
streaming_archive::cMetadataDB::Archive::BeginTimestamp;
update_archive_size_stmt_field_names[enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::EndTimestamp)] =
streaming_archive::cMetadataDB::Archive::EndTimestamp;
update_archive_size_stmt_field_names[enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::UncompressedSize)] =
streaming_archive::cMetadataDB::Archive::UncompressedSize;
update_archive_size_stmt_field_names[enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::Size)] =
Expand Down Expand Up @@ -107,35 +117,46 @@ void GlobalMySQLMetadataDB::close () {
m_is_open = false;
}

void GlobalMySQLMetadataDB::add_archive (const string& id, uint64_t uncompressed_size,
uint64_t size, const string& creator_id,
uint64_t creation_num)
{
void GlobalMySQLMetadataDB::add_archive (const string& id, const streaming_archive::ArchiveMetadata& metadata) {
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

auto& statement_bindings = m_insert_archive_statement->get_statement_bindings();
statement_bindings.bind_varchar(enum_to_underlying_type(ArchivesTableFieldIndexes::Id), id.c_str(), id.length());
auto begin_timestamp = metadata.get_begin_timestamp();
statement_bindings.bind_int64(enum_to_underlying_type(ArchivesTableFieldIndexes::BeginTimestamp), begin_timestamp);
auto end_timestamp = metadata.get_end_timestamp();
statement_bindings.bind_int64(enum_to_underlying_type(ArchivesTableFieldIndexes::EndTimestamp), end_timestamp);
auto uncompressed_size = metadata.get_uncompressed_size_bytes();
statement_bindings.bind_uint64(enum_to_underlying_type(ArchivesTableFieldIndexes::UncompressedSize), uncompressed_size);
statement_bindings.bind_uint64(enum_to_underlying_type(ArchivesTableFieldIndexes::Size), size);
auto compressed_size = metadata.get_compressed_size_bytes();
statement_bindings.bind_uint64(enum_to_underlying_type(ArchivesTableFieldIndexes::Size), compressed_size);
const auto& creator_id = metadata.get_creator_id();
statement_bindings.bind_varchar(enum_to_underlying_type(ArchivesTableFieldIndexes::CreatorId), creator_id.c_str(), creator_id.length());
auto creation_num = metadata.get_creation_idx();
statement_bindings.bind_uint64(enum_to_underlying_type(ArchivesTableFieldIndexes::CreationIx), creation_num);
if (false == m_insert_archive_statement->execute()) {
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
}

void GlobalMySQLMetadataDB::update_archive_size (const std::string& archive_id,
uint64_t uncompressed_size, uint64_t size)
void GlobalMySQLMetadataDB::update_archive_metadata (const std::string& archive_id,
const streaming_archive::ArchiveMetadata& metadata)
{
if (false == m_is_open) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

auto& statement_bindings = m_update_archive_size_statement->get_statement_bindings();
auto begin_timestamp = metadata.get_begin_timestamp();
statement_bindings.bind_int64(enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::BeginTimestamp), begin_timestamp);
auto end_timestamp = metadata.get_end_timestamp();
statement_bindings.bind_int64(enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::EndTimestamp), end_timestamp);
auto uncompressed_size = metadata.get_uncompressed_size_bytes();
statement_bindings.bind_uint64(enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::UncompressedSize), uncompressed_size);
statement_bindings.bind_uint64(enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::Size), size);
auto compressed_size = metadata.get_compressed_size_bytes();
statement_bindings.bind_uint64(enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::Size), compressed_size);
statement_bindings.bind_varchar(enum_to_underlying_type(UpdateArchiveSizeStmtFieldIndexes::Length), archive_id.c_str(), archive_id.length());
if (false == m_update_archive_size_statement->execute()) {
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
Expand Down Expand Up @@ -210,6 +231,22 @@ GlobalMetadataDB::ArchiveIterator* GlobalMySQLMetadataDB::get_archive_iterator (
return new ArchiveIterator(m_db.get_iterator());
}

GlobalMetadataDB::ArchiveIterator* GlobalMySQLMetadataDB::get_archive_iterator_for_time_window (epochtime_t begin_ts, epochtime_t end_ts) {
auto statement_string = fmt::format("SELECT DISTINCT {} FROM {}{} WHERE {} <= {} AND {} >= {} ORDER BY {} ASC, {} ASC",
streaming_archive::cMetadataDB::Archive::Id,
m_table_prefix, streaming_archive::cMetadataDB::ArchivesTableName,
streaming_archive::cMetadataDB::File::BeginTimestamp, end_ts,
streaming_archive::cMetadataDB::File::EndTimestamp, begin_ts,
streaming_archive::cMetadataDB::Archive::CreatorId, streaming_archive::cMetadataDB::Archive::CreationIx);
SPDLOG_DEBUG("{}", statement_string);

if (false == m_db.execute_query(statement_string)) {
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}

return new ArchiveIterator(m_db.get_iterator());
}

GlobalMetadataDB::ArchiveIterator* GlobalMySQLMetadataDB::get_archive_iterator_for_file_path (const string& file_path) {
auto statement_string = fmt::format("SELECT DISTINCT {}{}.{} FROM {}{} JOIN {}{} ON {}{}.{} = {}{}.{} WHERE {}{}.{} = '{}' ORDER BY {} ASC, {} ASC",
m_table_prefix, streaming_archive::cMetadataDB::ArchivesTableName, streaming_archive::cMetadataDB::Archive::Id,
Expand Down
7 changes: 3 additions & 4 deletions components/core/src/GlobalMySQLMetadataDB.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,12 @@ class GlobalMySQLMetadataDB : public GlobalMetadataDB {
void open () override;
void close () override;

void add_archive (const std::string& id, uint64_t uncompressed_size, uint64_t size,
const std::string& creator_id, uint64_t creation_num) override;
void update_archive_size (const std::string& archive_id, uint64_t uncompressed_size,
uint64_t size) override;
void add_archive (const std::string& id, const streaming_archive::ArchiveMetadata& metadata) override;
void update_archive_metadata (const std::string& archive_id, const streaming_archive::ArchiveMetadata& metadata) override;
void update_metadata_for_files (const std::string& archive_id, const std::vector<streaming_archive::writer::File*>& files) override;

GlobalMetadataDB::ArchiveIterator* get_archive_iterator () override;
GlobalMetadataDB::ArchiveIterator* get_archive_iterator_for_time_window (epochtime_t begin_ts, epochtime_t end_ts) override;
GlobalMetadataDB::ArchiveIterator* get_archive_iterator_for_file_path (const std::string& file_path) override;

private:
Expand Down
Loading