Skip to content

Commit

Permalink
Add new timestamp formats and simplify timestamp key handling in clp-…
Browse files Browse the repository at this point in the history
…s: (#262)

- Add new timestamp formats for clp, clp-s, and glt
- Remove obsolete FLOATDATESTRING MST node
- Handle timestamp keys in JsonParser rather than column writers (fixes #261)
  • Loading branch information
gibber9809 authored Feb 8, 2024
1 parent 2e9f150 commit 7de16f9
Show file tree
Hide file tree
Showing 26 changed files with 364 additions and 361 deletions.
41 changes: 27 additions & 14 deletions components/core/src/clp/TimestampPattern.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,45 @@ void TimestampPattern::init() {
patterns.emplace_back(0, "%Y-%m-%dT%H:%M:%S.%3");
// E.g. 2015-01-31T15:50:45,392
patterns.emplace_back(0, "%Y-%m-%dT%H:%M:%S,%3");
// E.g. [2015-01-31T15:50:45
patterns.emplace_back(0, "[%Y-%m-%dT%H:%M:%S");
// E.g. [20170106-16:56:41]
patterns.emplace_back(0, "[%Y%m%d-%H:%M:%S]");
// E.g. 2015-01-31 15:50:45,392
patterns.emplace_back(0, "%Y-%m-%d %H:%M:%S,%3");
// E.g. 2015-01-31 15:50:45.392
patterns.emplace_back(0, "%Y-%m-%d %H:%M:%S.%3");
// E.g. 2015-01-31 15:50:45,392
patterns.emplace_back(0, "%Y-%m-%d %H:%M:%S,%3");
// E.g. 2015/01/31T15:50:45.123
patterns.emplace_back(0, "%Y/%m/%dT%H:%M:%S.%3");
// E.g. 2015/01/31T15:50:45,123
patterns.emplace_back(0, "%Y/%m/%dT%H:%M:%S,%3");
// E.g. 2015/01/31 15:50:45.123
patterns.emplace_back(0, "%Y/%m/%d %H:%M:%S.%3");
// E.g. 2015/01/31 15:50:45,123
patterns.emplace_back(0, "%Y/%m/%d %H:%M:%S,%3");
// E.g. [2015-01-31 15:50:45,085]
patterns.emplace_back(0, "[%Y-%m-%d %H:%M:%S,%3]");
// E.g. INFO [main] 2015-01-31 15:50:45,085
patterns.emplace_back(2, "%Y-%m-%d %H:%M:%S,%3");
// E.g. <<<2016-11-10 03:02:29:936
patterns.emplace_back(0, "<<<%Y-%m-%d %H:%M:%S:%3");
// E.g. 01 Jan 2016 15:50:17,085
patterns.emplace_back(0, "%d %b %Y %H:%M:%S,%3");

// E.g. 2015-01-31T15:50:45
patterns.emplace_back(0, "%Y-%m-%dT%H:%M:%S");
// E.g. 2015-01-31 15:50:45
patterns.emplace_back(0, "%Y-%m-%d %H:%M:%S");
// E.g. Start-Date: 2015-01-31 15:50:45
patterns.emplace_back(1, "%Y-%m-%d %H:%M:%S");
// E.g. 2015/01/31T15:50:45
patterns.emplace_back(0, "%Y/%m/%dT%H:%M:%S");
// E.g. 2015/01/31 15:50:45
patterns.emplace_back(0, "%Y/%m/%d %H:%M:%S");
// E.g. [2015-01-31T15:50:45
patterns.emplace_back(0, "[%Y-%m-%dT%H:%M:%S");
// E.g. [20170106-16:56:41]
patterns.emplace_back(0, "[%Y%m%d-%H:%M:%S]");
// E.g. Start-Date: 2015-01-31 15:50:45
patterns.emplace_back(1, "%Y-%m-%d %H:%M:%S");
// E.g. 15/01/31 15:50:45
patterns.emplace_back(0, "%y/%m/%d %H:%M:%S");
// E.g. 150131 9:50:45
patterns.emplace_back(0, "%y%m%d %k:%M:%S");
// E.g. 01 Jan 2016 15:50:17,085
patterns.emplace_back(0, "%d %b %Y %H:%M:%S,%3");
// E.g. Jan 01, 2016 3:50:17 PM
patterns.emplace_back(0, "%b %d, %Y %l:%M:%S %p");
// E.g. January 31, 2015 15:50
Expand All @@ -157,16 +174,12 @@ void TimestampPattern::init() {
patterns.emplace_back(3, "[%d/%b/%Y:%H:%M:%S");
// E.g. 192.168.4.5 - - [01/01/2016:15:50:17
patterns.emplace_back(3, "[%d/%m/%Y:%H:%M:%S");
// E.g. INFO [main] 2015-01-31 15:50:45,085
patterns.emplace_back(2, "%Y-%m-%d %H:%M:%S,%3");
// E.g. Started POST "/api/v3/internal/allowed" for 127.0.0.1 at 2017-06-18 00:20:44
patterns.emplace_back(6, "%Y-%m-%d %H:%M:%S");
// E.g. update-alternatives 2015-01-31 15:50:45
patterns.emplace_back(1, "%Y-%m-%d %H:%M:%S");
// E.g. ERROR: apport (pid 4557) Sun Jan 1 15:50:45 2015
patterns.emplace_back(4, "%a %b %e %H:%M:%S %Y");
// E.g. <<<2016-11-10 03:02:29:936
patterns.emplace_back(0, "<<<%Y-%m-%d %H:%M:%S:%3");
// E.g. Sun Jan 1 15:50:45 2015
patterns.emplace_back(0, "%a %b %e %H:%M:%S %Y");

Expand Down
24 changes: 7 additions & 17 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,37 +90,27 @@ size_t ArchiveWriter::get_data_size() {
void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const& schema) {
for (int32_t id : schema) {
auto node = m_schema_tree->get_node(id);
std::string key_name = node->get_key_name();
switch (node->get_type()) {
case NodeType::INTEGER:
writer->append_column(new Int64ColumnWriter(key_name, id));
writer->append_column(new Int64ColumnWriter(id));
break;
case NodeType::FLOAT:
writer->append_column(new FloatColumnWriter(key_name, id));
writer->append_column(new FloatColumnWriter(id));
break;
case NodeType::CLPSTRING:
writer->append_column(
new ClpStringColumnWriter(key_name, id, m_var_dict, m_log_dict)
);
writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_log_dict));
break;
case NodeType::VARSTRING:
writer->append_column(new VariableStringColumnWriter(key_name, id, m_var_dict));
writer->append_column(new VariableStringColumnWriter(id, m_var_dict));
break;
case NodeType::BOOLEAN:
writer->append_column(new BooleanColumnWriter(key_name, id));
writer->append_column(new BooleanColumnWriter(id));
break;
case NodeType::ARRAY:
writer->append_column(
new ClpStringColumnWriter(key_name, id, m_var_dict, m_array_dict)
);
writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_array_dict));
break;
case NodeType::DATESTRING:
writer->append_column(new DateStringColumnWriter(key_name, id, m_timestamp_dict));
break;
case NodeType::FLOATDATESTRING:
writer->append_column(
new FloatDateStringColumnWriter(key_name, id, m_timestamp_dict)
);
writer->append_column(new DateStringColumnWriter(id));
break;
case NodeType::OBJECT:
case NodeType::NULLVALUE:
Expand Down
18 changes: 0 additions & 18 deletions components/core/src/clp_s/ColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,4 @@ std::variant<int64_t, double, std::string, uint8_t> DateStringColumnReader::extr
epochtime_t DateStringColumnReader::get_encoded_time(uint64_t cur_message) {
return m_timestamps[cur_message];
}

void FloatDateStringColumnReader::load(ZstdDecompressor& decompressor, uint64_t num_messages) {
m_timestamps = std::make_unique<double[]>(num_messages);
decompressor.try_read_exact_length(
reinterpret_cast<char*>(m_timestamps.get()),
num_messages * sizeof(double)
);
}

std::variant<int64_t, double, std::string, uint8_t> FloatDateStringColumnReader::extract_value(
uint64_t cur_message
) {
return std::to_string(m_timestamps[cur_message]);
}

double FloatDateStringColumnReader::get_encoded_time(uint64_t cur_message) {
return m_timestamps[cur_message];
}
} // namespace clp_s
26 changes: 0 additions & 26 deletions components/core/src/clp_s/ColumnReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,32 +235,6 @@ class DateStringColumnReader : public BaseColumnReader {
std::unique_ptr<int64_t[]> m_timestamps;
std::unique_ptr<int64_t[]> m_timestamp_encodings;
};

class FloatDateStringColumnReader : public BaseColumnReader {
public:
// Constructor
FloatDateStringColumnReader(std::string const& name, int32_t id) : BaseColumnReader(name, id) {}

// Destructor
~FloatDateStringColumnReader() override = default;

// Methods inherited from BaseColumnReader
void load(ZstdDecompressor& decompressor, uint64_t num_messages) override;

NodeType get_type() override { return NodeType::FLOATDATESTRING; }

std::variant<int64_t, double, std::string, uint8_t> extract_value(uint64_t cur_message
) override;

/**
* @param cur_message
* @return The encoded time in float epoch time
*/
double get_encoded_time(uint64_t cur_message);

private:
std::unique_ptr<double[]> m_timestamps;
};
} // namespace clp_s

#endif // CLP_S_COLUMNREADER_HPP
60 changes: 9 additions & 51 deletions components/core/src/clp_s/ColumnWriter.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#include "ColumnWriter.hpp"

namespace clp_s {
void Int64ColumnWriter::add_value(
std::variant<int64_t, double, std::string, bool>& value,
size_t& size
) {
void Int64ColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) {
size = sizeof(int64_t);
m_values.push_back(std::get<int64_t>(value));
}
Expand All @@ -16,10 +13,7 @@ void Int64ColumnWriter::store(ZstdCompressor& compressor) {
);
}

void FloatColumnWriter::add_value(
std::variant<int64_t, double, std::string, bool>& value,
size_t& size
) {
void FloatColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) {
size = sizeof(double);
m_values.push_back(std::get<double>(value));
}
Expand All @@ -31,10 +25,7 @@ void FloatColumnWriter::store(ZstdCompressor& compressor) {
);
}

void BooleanColumnWriter::add_value(
std::variant<int64_t, double, std::string, bool>& value,
size_t& size
) {
void BooleanColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) {
size = sizeof(uint8_t);
m_values.push_back(std::get<bool>(value) ? 1 : 0);
}
Expand All @@ -46,10 +37,7 @@ void BooleanColumnWriter::store(ZstdCompressor& compressor) {
);
}

void ClpStringColumnWriter::add_value(
std::variant<int64_t, double, std::string, bool>& value,
size_t& size
) {
void ClpStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) {
size = sizeof(int64_t);
std::string string_var = std::get<std::string>(value);
uint64_t id;
Expand Down Expand Up @@ -78,10 +66,7 @@ void ClpStringColumnWriter::store(ZstdCompressor& compressor) {
);
}

void VariableStringColumnWriter::add_value(
std::variant<int64_t, double, std::string, bool>& value,
size_t& size
) {
void VariableStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) {
size = sizeof(int64_t);
std::string string_var = std::get<std::string>(value);
uint64_t id;
Expand All @@ -96,19 +81,11 @@ void VariableStringColumnWriter::store(ZstdCompressor& compressor) {
);
}

void DateStringColumnWriter::add_value(
std::variant<int64_t, double, std::string, bool>& value,
size_t& size
) {
void DateStringColumnWriter::add_value(ParsedMessage::variable_t& value, size_t& size) {
size = 2 * sizeof(int64_t);
std::string string_timestamp = std::get<std::string>(value);

uint64_t encoding_id;
epochtime_t timestamp
= m_timestamp_dict->ingest_entry(m_name, m_id, string_timestamp, encoding_id);

m_timestamps.push_back(timestamp);
m_timestamp_encodings.push_back(encoding_id);
auto encoded_timestamp = std::get<std::pair<uint64_t, epochtime_t>>(value);
m_timestamps.push_back(encoded_timestamp.second);
m_timestamp_encodings.push_back(encoded_timestamp.first);
}

void DateStringColumnWriter::store(ZstdCompressor& compressor) {
Expand All @@ -121,23 +98,4 @@ void DateStringColumnWriter::store(ZstdCompressor& compressor) {
m_timestamp_encodings.size() * sizeof(int64_t)
);
}

void FloatDateStringColumnWriter::add_value(
std::variant<int64_t, double, std::string, bool>& value,
size_t& size
) {
size = sizeof(double);
double timestamp = std::get<double>(value);

m_timestamp_dict->ingest_entry(m_name, m_id, timestamp);

m_timestamps.push_back(timestamp);
}

void FloatDateStringColumnWriter::store(ZstdCompressor& compressor) {
compressor.write(
reinterpret_cast<char const*>(m_timestamps.data()),
m_timestamps.size() * sizeof(double)
);
}
} // namespace clp_s
Loading

0 comments on commit 7de16f9

Please sign in to comment.