Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Row delta supports addition, delete, non-contiguous updates #582

Merged
merged 5 commits into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions cpp/perspective/src/cpp/context_one.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,29 +369,24 @@ t_ctx1::get_step_delta(t_index bidx, t_index eidx) {
/**
* @brief Returns the row indices that have been updated with new data.
*
* @param bidx
* @param eidx
* @return t_rowdelta
*/
t_rowdelta
t_ctx1::get_row_delta(t_index bidx, t_index eidx) {
t_ctx1::get_row_delta() {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
bidx = std::min(bidx, t_index(m_traversal->size()));
eidx = std::min(eidx, t_index(m_traversal->size()));
std::vector<std::int32_t> rows;
t_index eidx = t_index(m_traversal->size());
tsl::hopscotch_set<t_index> rows;

const auto& deltas = m_tree->get_deltas();
for (t_index idx = bidx; idx < eidx; ++idx) {
for (t_index idx = 0; idx < eidx; ++idx) {
t_index ptidx = m_traversal->get_tree_index(idx);
// Retrieve delta from storage
// Retrieve delta from storage and check if the row has been changed
auto iterators = deltas->get<by_tc_nidx_aggidx>().equal_range(ptidx);
bool unique_ridx = std::find(rows.begin(), rows.end(), idx) == rows.end();
if ((iterators.first != iterators.second) && unique_ridx)
rows.push_back(idx);
if ((iterators.first != iterators.second))
rows.insert(idx);
}

std::sort(rows.begin(), rows.end());
t_rowdelta rval(m_rows_changed, rows);
m_tree->clear_deltas();
return rval;
Expand Down
27 changes: 8 additions & 19 deletions cpp/perspective/src/cpp/context_two.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,28 +671,19 @@ t_ctx2::get_step_delta(t_index bidx, t_index eidx) {
/**
* @brief Returns the row indices that have been updated with new data.
*
* @param bidx
* @param eidx
* @return t_rowdelta
*/
t_rowdelta
t_ctx2::get_row_delta(t_index bidx, t_index eidx) {
t_uindex start_row = bidx;
t_uindex end_row = eidx;
t_uindex start_col = 1;
t_uindex end_col = get_num_view_columns();
std::vector<std::int32_t> rows;

t_uindex ctx_nrows = get_row_count();
t_uindex ctx_ncols = get_column_count();
auto ext = sanitize_get_data_extents(
ctx_nrows, ctx_ncols, start_row, end_row, start_col, end_col);
t_ctx2::get_row_delta() {
t_index nrows = get_row_count();
t_index ncols = get_num_view_columns();
tsl::hopscotch_set<t_index> rows;

std::vector<std::pair<t_uindex, t_uindex>> cells;

// get cells and imbue with additional information
for (t_index ridx = ext.m_srow; ridx < ext.m_erow; ++ridx) {
for (t_uindex cidx = 1; cidx < end_col; ++cidx) {
for (t_index ridx = 0; ridx < nrows; ++ridx) {
for (t_index cidx = 1; cidx < ncols; ++cidx) {
cells.push_back(std::pair<t_index, t_index>(ridx, cidx));
}
}
Expand All @@ -705,12 +696,10 @@ t_ctx2::get_row_delta(t_index bidx, t_index eidx) {
const auto& deltas = m_trees[c.m_treenum]->get_deltas();
auto iterators = deltas->get<by_tc_nidx_aggidx>().equal_range(c.m_idx);
auto ridx = c.m_ridx;
bool unique_ridx = std::find(rows.begin(), rows.end(), ridx) == rows.end();
if ((iterators.first != iterators.second) && unique_ridx)
rows.push_back(ridx);
if ((iterators.first != iterators.second))
rows.insert(ridx);
}

std::sort(rows.begin(), rows.end());
t_rowdelta rval(true, rows);
clear_deltas();
return rval;
Expand Down
191 changes: 94 additions & 97 deletions cpp/perspective/src/cpp/context_zero.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ t_ctx0::step_begin() {
return;

m_deltas = std::make_shared<t_zcdeltas>();
m_delta_pkeys.clear();
m_rows_changed = false;
m_columns_changed = false;
m_traversal->step_begin();
Expand Down Expand Up @@ -301,64 +302,23 @@ t_ctx0::get_step_delta(t_index bidx, t_index eidx) {
* @return t_rowdelta
*/
t_rowdelta
t_ctx0::get_row_delta(t_index bidx, t_index eidx) {
bidx = std::min(bidx, m_traversal->size());
eidx = std::min(eidx, m_traversal->size());
t_ctx0::get_row_delta() {
bool rows_changed = m_rows_changed || !m_traversal->empty_sort_by();
std::vector<std::int32_t> rows;

tsl::hopscotch_set<t_tscalar> pkeys;
t_tscalar prev_pkey;
prev_pkey.set(t_none());

if (m_traversal->empty_sort_by()) {
std::vector<t_tscalar> pkey_vec = m_traversal->get_pkeys(bidx, eidx);
for (t_index idx = 0, loop_end = pkey_vec.size(); idx < loop_end; ++idx) {
const t_tscalar& pkey = pkey_vec[idx];
t_index row = bidx + idx;
// Retrieve a pair of iterators from delta storage - start of cell, end of cell
std::pair<t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator,
t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator>
iters = m_deltas->get<by_zc_pkey_colidx>().equal_range(pkey);
for (t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator iter = iters.first;
iter != iters.second; ++iter) {
if (std::find(rows.begin(), rows.end(), row) == rows.end())
rows.push_back(row);
}
}
} else {
for (t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator iter
= m_deltas->get<by_zc_pkey_colidx>().begin();
iter != m_deltas->get<by_zc_pkey_colidx>().end(); ++iter) {
if (prev_pkey != iter->m_pkey) {
pkeys.insert(iter->m_pkey);
prev_pkey = iter->m_pkey;
}
}

// get row indices and assign into r_indices
tsl::hopscotch_map<t_tscalar, t_index> r_indices;
m_traversal->get_row_indices(pkeys, r_indices);

for (t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator iter
= m_deltas->get<by_zc_pkey_colidx>().begin();
iter != m_deltas->get<by_zc_pkey_colidx>().end(); ++iter) {
t_index row = r_indices[iter->m_pkey];
bool valid_ridx = bidx <= row && row <= eidx;
bool unique_ridx = std::find(rows.begin(), rows.end(), row) == rows.end();
if (valid_ridx && unique_ridx) {
rows.push_back(row);
}
}
}
// Given a set of primary keys, transform them into row indices
tsl::hopscotch_set<t_tscalar> changed_pkeys = get_delta_pkeys();
tsl::hopscotch_set<t_index> rows = m_traversal->get_row_indices(changed_pkeys);

std::sort(rows.begin(), rows.end());
t_rowdelta rval(rows_changed, rows);
m_deltas->clear(); // what is the difference between this line and clear_deltas()?
clear_deltas();
return rval;
}

const tsl::hopscotch_set<t_tscalar>&
t_ctx0::get_delta_pkeys() const {
return m_delta_pkeys;
}

std::vector<std::string>
t_ctx0::get_column_names() const {
return m_config.get_column_names();
Expand All @@ -382,6 +342,16 @@ t_ctx0::sidedness() const {
return 0;
}

/**
* @brief Handle additions and new data, calculating deltas along the way.
*
* @param flattened
* @param delta
* @param prev
* @param curr
* @param transitions
* @param existed
*/
void
t_ctx0::notify(const t_table& flattened, const t_table& delta, const t_table& prev,
const t_table& curr, const t_table& transitions, const t_table& existed) {
Expand Down Expand Up @@ -430,10 +400,16 @@ t_ctx0::notify(const t_table& flattened, const t_table& delta, const t_table& pr
} break;
default: { PSP_COMPLAIN_AND_ABORT("Unexpected OP"); } break;
}

// add the pkey for updated rows
add_delta_pkey(pkey);
}
psp_log_time(repr() + " notify.has_filter_path.updated_traversal");

// calculate deltas
calc_step_delta(flattened, prev, curr, transitions);
m_has_delta = m_deltas->size() > 0 || delete_encountered;
m_has_delta = m_deltas->size() > 0 || m_delta_pkeys.size() > 0 || delete_encountered;

psp_log_time(repr() + " notify.has_filter_path.exit");

return;
Expand Down Expand Up @@ -461,14 +437,70 @@ t_ctx0::notify(const t_table& flattened, const t_table& delta, const t_table& pr
PSP_COMPLAIN_AND_ABORT("Unexpected OP");
} break;
}

// add the pkey for updated rows
add_delta_pkey(pkey);
}

psp_log_time(repr() + " notify.no_filter_path.updated_traversal");

// calculate deltas
calc_step_delta(flattened, prev, curr, transitions);
m_has_delta = m_deltas->size() > 0 || delete_encountered;
m_has_delta = m_deltas->size() > 0 || m_delta_pkeys.size() > 0 || delete_encountered;

psp_log_time(repr() + " notify.no_filter_path.exit");
}

/**
* @brief Handle the addition of new data.
*
* @param flattened
*/
void
t_ctx0::notify(const t_table& flattened) {
t_uindex nrecs = flattened.size();
std::shared_ptr<const t_column> pkey_sptr = flattened.get_const_column("psp_pkey");
std::shared_ptr<const t_column> op_sptr = flattened.get_const_column("psp_op");
const t_column* pkey_col = pkey_sptr.get();
const t_column* op_col = op_sptr.get();

m_has_delta = true;

if (m_config.has_filters()) {
t_mask msk = filter_table_for_config(flattened, m_config);

for (t_uindex idx = 0; idx < nrecs; ++idx) {
t_tscalar pkey = m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);

switch (op) {
case OP_INSERT: {
if (msk.get(idx)) {
m_traversal->add_row(m_state, m_config, pkey);
}
} break;
default: {
// pass
} break;
}
}
return;
}

for (t_uindex idx = 0; idx < nrecs; ++idx) {
t_tscalar pkey = m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);

switch (op) {
case OP_INSERT: {
m_traversal->add_row(m_state, m_config, pkey);
} break;
default: { } break; }
}
}

void
t_ctx0::calc_step_delta(const t_table& flattened, const t_table& prev, const t_table& curr,
const t_table& transitions) {
Expand Down Expand Up @@ -511,6 +543,16 @@ t_ctx0::calc_step_delta(const t_table& flattened, const t_table& prev, const t_t
}
}

/**
* @brief Mark a primary key as updated by adding it to the tracking set.
*
* @param pkey
*/
void
t_ctx0::add_delta_pkey(t_tscalar pkey) {
m_delta_pkeys.insert(pkey);
}

std::vector<t_minmax>
t_ctx0::get_min_max() const {
return m_minmax;
Expand Down Expand Up @@ -551,51 +593,6 @@ t_ctx0::has_deltas() const {
return m_has_delta;
}

void
t_ctx0::notify(const t_table& flattened) {
t_uindex nrecs = flattened.size();
std::shared_ptr<const t_column> pkey_sptr = flattened.get_const_column("psp_pkey");
std::shared_ptr<const t_column> op_sptr = flattened.get_const_column("psp_op");
const t_column* pkey_col = pkey_sptr.get();
const t_column* op_col = op_sptr.get();

m_has_delta = true;

if (m_config.has_filters()) {
t_mask msk = filter_table_for_config(flattened, m_config);

for (t_uindex idx = 0; idx < nrecs; ++idx) {
t_tscalar pkey = m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);

switch (op) {
case OP_INSERT: {
if (msk.get(idx)) {
m_traversal->add_row(m_state, m_config, pkey);
}
} break;
default: {
// pass
} break;
}
}
return;
}

for (t_uindex idx = 0; idx < nrecs; ++idx) {
t_tscalar pkey = m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);

switch (op) {
case OP_INSERT: {
m_traversal->add_row(m_state, m_config, pkey);
} break;
default: { } break; }
}
}

void
t_ctx0::pprint() const {}

Expand Down
18 changes: 18 additions & 0 deletions cpp/perspective/src/cpp/flat_traversal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,24 @@ t_ftrav::get_row_indices(t_index bidx, t_index eidx, const tsl::hopscotch_set<t_
}
}

/**
* @brief Given a set of primary keys, return the corresponding row indices.
*
* @param pkeys
* @return tsl::hopscotch_set<t_index>
*/
tsl::hopscotch_set<t_index>
t_ftrav::get_row_indices(const tsl::hopscotch_set<t_tscalar>& pkeys) const {
tsl::hopscotch_set<t_index> rows;
for (t_index idx = 0, loop_end = size(); idx < loop_end; ++idx) {
const t_tscalar& pkey = (*m_index)[idx].m_pkey;
if (pkeys.find(pkey) != pkeys.end()) {
rows.insert(idx);
}
}
return rows;
}

void
t_ftrav::reset() {
if (m_index.get())
Expand Down
8 changes: 7 additions & 1 deletion cpp/perspective/src/cpp/step_delta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ t_stepdelta::t_stepdelta(
// t_rowdelta contains a vector of row indices that have been changed
t_rowdelta::t_rowdelta() {}

t_rowdelta::t_rowdelta(bool rows_changed, const std::vector<std::int32_t>& rows)
t_rowdelta::t_rowdelta(bool rows_changed, const tsl::hopscotch_set<t_index>& rows)
: rows_changed(rows_changed)
, rows(rows) {}

t_rowdelta::t_rowdelta(bool rows_changed, const tsl::hopscotch_set<t_index>& rows,
const std::vector<t_tscalar>& data)
: rows_changed(rows_changed)
, rows(rows)
, data(data) {}
} // end namespace perspective

namespace std {
Expand Down
Loading