Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate row offset in C++, refactor Table, remove implicit primary key mode #692

Merged
merged 3 commits into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions cpp/perspective/src/cpp/emscripten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ namespace binding {

void
_fill_data(t_data_table& tbl, t_data_accessor accessor, std::vector<std::string> col_names,
std::vector<t_dtype> data_types, std::uint32_t offset, bool is_arrow, bool is_update) {
std::vector<t_dtype> data_types, bool is_arrow, bool is_update) {

for (auto cidx = 0; cidx < col_names.size(); ++cidx) {
auto name = col_names[cidx];
Expand Down Expand Up @@ -1204,14 +1204,13 @@ namespace binding {

template <>
std::shared_ptr<Table>
make_table(t_val table, t_data_accessor accessor, t_val computed, std::uint32_t offset,
std::uint32_t limit, std::string index, t_op op, bool is_arrow) {
bool is_update = op == OP_UPDATE;
bool is_delete = op == OP_DELETE;
make_table(t_val table, t_data_accessor accessor, t_val computed,
std::uint32_t limit, std::string index, t_op op, bool is_update, bool is_arrow) {
std::vector<std::string> column_names;
std::vector<t_dtype> data_types;

// Determine metadata
bool is_delete = op == OP_DELETE;
if (is_arrow || (is_update || is_delete)) {
t_val names = accessor["names"];
t_val types = accessor["types"];
Expand All @@ -1235,10 +1234,14 @@ namespace binding {
bool table_initialized = has_value(table);
std::shared_ptr<Table> tbl;

// If the Table has already been created, use it
if (table_initialized) {
// Get a reference to the Table, and update its metadata
tbl = table.as<std::shared_ptr<Table>>();
tbl->set_column_names(column_names);
tbl->set_data_types(data_types);

auto current_gnode = tbl->get_gnode();
tbl->update(column_names, data_types, offset, limit, index, op, is_arrow);

// use gnode metadata to help decide if we need to update
is_update = (is_update || current_gnode->mapping_size() > 0);
Expand All @@ -1259,21 +1262,23 @@ namespace binding {
} else {
std::shared_ptr<t_pool> pool = std::make_shared<t_pool>();
tbl = std::make_shared<Table>(
pool, column_names, data_types, offset, limit, index, op, is_arrow);
pool, column_names, data_types, limit, index);
}

std::uint32_t row_count = accessor["row_count"].as<std::int32_t>();
t_data_table data_table(t_schema(column_names, data_types));
data_table.init();
data_table.extend(row_count);

_fill_data(data_table, accessor, column_names, data_types, offset, is_arrow, is_update);
_fill_data(data_table, accessor, column_names, data_types, is_arrow, is_update);

if (!computed.isUndefined()) {
// re-add computed columns after update, delete, etc.
table_add_computed_column(data_table, computed);
}

tbl->init(data_table);
// calculate offset, limit, primary key index, and set the gnode
tbl->init(data_table, row_count, op);
return tbl;
}

Expand Down Expand Up @@ -1623,7 +1628,7 @@ EMSCRIPTEN_BINDINGS(perspective) {
*/
class_<Table>("Table")
.constructor<std::shared_ptr<t_pool>, std::vector<std::string>, std::vector<t_dtype>,
std::uint32_t, std::uint32_t, std::string, t_op, bool>()
std::uint32_t, std::string>()
.smart_ptr<std::shared_ptr<Table>>("shared_ptr<Table>")
.function("size", &Table::size)
.function("get_schema", &Table::get_schema)
Expand Down Expand Up @@ -1902,8 +1907,7 @@ EMSCRIPTEN_BINDINGS(perspective) {
enum_<t_op>("t_op")
.value("OP_INSERT", OP_INSERT)
.value("OP_DELETE", OP_DELETE)
.value("OP_CLEAR", OP_CLEAR)
.value("OP_UPDATE", OP_UPDATE);
.value("OP_CLEAR", OP_CLEAR);

/******************************************************************************
*
Expand Down
29 changes: 3 additions & 26 deletions cpp/perspective/src/cpp/gnode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,9 @@ t_gnode::t_gnode(const t_gnode_options& options)
}

t_schema port_schema(options.m_port_schema);
if (m_gnode_type == GNODE_TYPE_IMPLICIT_PKEYED) {

// Make sure that gnode type is consistent with input schema
if (port_schema.is_pkey()) {
PSP_COMPLAIN_AND_ABORT("gnode type specified as implicit pkey, however input "
"schema has psp_pkey column.");
}
port_schema
= t_schema{{"psp_op", "psp_pkey"}, {DTYPE_UINT8, DTYPE_INT64}} + port_schema;
} else {
if (!(port_schema.is_pkey())) {
PSP_COMPLAIN_AND_ABORT("gnode type specified as explicit pkey, however input "
"schema is missing required columns.");
}
if (!(port_schema.is_pkey())) {
PSP_COMPLAIN_AND_ABORT("gnode type specified as explicit pkey, however input "
"schema is missing required columns.");
}

t_schema trans_schema(m_tblschema.columns(), trans_types);
Expand Down Expand Up @@ -351,18 +340,6 @@ t_gnode::_process_table() {

m_was_updated = true;

if (m_gnode_type == GNODE_TYPE_IMPLICIT_PKEYED) {
auto tbl = iport->get_table();
auto op_col = tbl->add_column("psp_op", DTYPE_UINT8, false);
op_col->raw_fill<std::uint8_t>(OP_INSERT);

auto key_col = tbl->add_column("psp_pkey", DTYPE_INT64, true);
std::int64_t start = get_table()->size();
for (t_uindex ridx = 0; ridx < tbl->size(); ++ridx) {
key_col->set_nth<std::int64_t>(ridx, start + ridx);
}
}

std::shared_ptr<t_data_table> flattened(iport->get_table()->flatten());
PSP_GNODE_VERIFY_TABLE(flattened);
PSP_GNODE_VERIFY_TABLE(get_table());
Expand Down
56 changes: 31 additions & 25 deletions cpp/perspective/src/cpp/table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,33 @@

#include <perspective/table.h>

static perspective::t_uindex GLOBAL_ID = 0;
// Give each Table a unique ID so that operations on it map back correctly
static perspective::t_uindex GLOBAL_TABLE_ID = 0;

namespace perspective {
Table::Table(std::shared_ptr<t_pool> pool, std::vector<std::string> column_names,
std::vector<t_dtype> data_types, std::uint32_t offset, std::uint32_t limit,
std::string index, t_op op, bool is_arrow)
std::vector<t_dtype> data_types, std::uint32_t limit,
std::string index)
: m_init(false)
, m_id(GLOBAL_ID++)
, m_id(GLOBAL_TABLE_ID++)
, m_pool(pool)
, m_column_names(column_names)
, m_data_types(data_types)
, m_offset(offset)
, m_offset(0)
, m_limit(limit)
, m_index(index)
, m_op(op)
, m_is_arrow(is_arrow)
, m_gnode_set(false) {}

void
Table::init(t_data_table& data_table) {
// ensure the data table is indexed and has the operation column
process_op_column(data_table);
Table::init(t_data_table& data_table, std::uint32_t row_count, const t_op op) {
/**
* For the Table to be initialized correctly, make sure that the operation and index columns are
* processed before the new offset is calculated. Calculating the offset before the `process_op_column`
* and `process_index_column` causes primary keys to be misaligned.
*/
process_op_column(data_table, op);
process_index_column(data_table);
calculate_offset(row_count);

if (!m_gnode_set) {
// create a new gnode, send it to the table
Expand All @@ -47,19 +51,6 @@ Table::init(t_data_table& data_table) {
m_init = true;
}

void
Table::update(std::vector<std::string> column_names, std::vector<t_dtype> data_types,
std::uint32_t offset, std::uint32_t limit, std::string index, t_op op, bool is_arrow) {
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
m_column_names = column_names;
m_data_types = data_types;
m_offset = offset;
m_limit = limit;
m_index = index;
m_op = op;
m_is_arrow = is_arrow;
}

t_uindex
Table::size() const {
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
Expand Down Expand Up @@ -126,6 +117,11 @@ Table::reset_gnode(t_uindex id) {
m_pool->get_gnode(id)->reset();
}

void
Table::calculate_offset(std::uint32_t row_count) {
m_offset = (m_offset + row_count) % m_limit;
}

t_uindex
Table::get_id() const {
return m_id;
Expand Down Expand Up @@ -161,10 +157,20 @@ Table::get_index() const {
return m_index;
}

void
Table::set_column_names(const std::vector<std::string>& column_names) {
m_column_names = column_names;
}

void
Table::set_data_types(const std::vector<t_dtype>& data_types) {
m_data_types = data_types;
}

void
Table::process_op_column(t_data_table& data_table) {
Table::process_op_column(t_data_table& data_table, const t_op op) {
auto op_col = data_table.add_column("psp_op", DTYPE_UINT8, false);
switch (m_op) {
switch (op) {
case OP_DELETE: {
op_col->raw_fill<std::uint8_t>(OP_DELETE);
} break;
Expand Down
3 changes: 1 addition & 2 deletions cpp/perspective/src/include/perspective/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ enum t_ctx_type {
GROUPED_COLUMNS_CONTEXT
};

enum t_op { OP_INSERT, OP_DELETE, OP_CLEAR, OP_UPDATE };
enum t_op { OP_INSERT, OP_DELETE, OP_CLEAR };

enum t_value_transition {
VALUE_TRANSITION_EQ_FF,
Expand All @@ -278,7 +278,6 @@ enum t_value_transition {

enum t_gnode_type {
GNODE_TYPE_PKEYED, // Explicit user set pkey
GNODE_TYPE_IMPLICIT_PKEYED // pkey is row based
};

enum t_gnode_port {
Expand Down
9 changes: 3 additions & 6 deletions cpp/perspective/src/include/perspective/binding.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,12 @@ namespace binding {
* @param accessor
* @param col_names
* @param data_types
* @param offset
* @param is_arrow
* @param is_update
*/
template <typename T>
void _fill_data(t_data_table& tbl, T accessor, std::vector<std::string> col_names,
std::vector<t_dtype> data_types, std::uint32_t offset, bool is_arrow, bool is_update);
std::vector<t_dtype> data_types, bool is_arrow, bool is_update);

/**
* @brief Create and populate a table.
Expand All @@ -200,17 +199,15 @@ namespace binding {
* @param gnode
* @param accessor
* @param computed
* @param offset
* @param limit
* @param index
* @param is_update
* @param is_delete
* @param is_arrow
* @return std::shared_ptr<t_gnode>
*/
template <typename T>
std::shared_ptr<Table> make_table(T table, T accessor, T computed, std::uint32_t offset,
std::uint32_t limit, std::string index, t_op op, bool is_arrow);
std::shared_ptr<Table> make_table(T table, T accessor, T computed,
std::uint32_t limit, std::string index, t_op op, bool is_update, bool is_arrow);

/**
* @brief Given an array-like container with new computed columns, add them to the
Expand Down
Loading