Skip to content

Commit

Permalink
Merge pull request #582 from finos/row-delta
Browse files Browse the repository at this point in the history
Row delta supports addition, delete, non-contiguous updates
  • Loading branch information
texodus authored May 16, 2019
2 parents 0ba5267 + 38babcd commit bd2c60f
Show file tree
Hide file tree
Showing 18 changed files with 369 additions and 170 deletions.
19 changes: 7 additions & 12 deletions cpp/perspective/src/cpp/context_one.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,29 +369,24 @@ t_ctx1::get_step_delta(t_index bidx, t_index eidx) {
/**
* @brief Returns the row indices that have been updated with new data.
*
* @param bidx
* @param eidx
* @return t_rowdelta
*/
t_rowdelta
t_ctx1::get_row_delta(t_index bidx, t_index eidx) {
t_ctx1::get_row_delta() {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
bidx = std::min(bidx, t_index(m_traversal->size()));
eidx = std::min(eidx, t_index(m_traversal->size()));
std::vector<std::int32_t> rows;
t_index eidx = t_index(m_traversal->size());
tsl::hopscotch_set<t_index> rows;

const auto& deltas = m_tree->get_deltas();
for (t_index idx = bidx; idx < eidx; ++idx) {
for (t_index idx = 0; idx < eidx; ++idx) {
t_index ptidx = m_traversal->get_tree_index(idx);
// Retrieve delta from storage
// Retrieve delta from storage and check if the row has been changed
auto iterators = deltas->get<by_tc_nidx_aggidx>().equal_range(ptidx);
bool unique_ridx = std::find(rows.begin(), rows.end(), idx) == rows.end();
if ((iterators.first != iterators.second) && unique_ridx)
rows.push_back(idx);
if ((iterators.first != iterators.second))
rows.insert(idx);
}

std::sort(rows.begin(), rows.end());
t_rowdelta rval(m_rows_changed, rows);
m_tree->clear_deltas();
return rval;
Expand Down
27 changes: 8 additions & 19 deletions cpp/perspective/src/cpp/context_two.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,28 +671,19 @@ t_ctx2::get_step_delta(t_index bidx, t_index eidx) {
/**
* @brief Returns the row indices that have been updated with new data.
*
* @param bidx
* @param eidx
* @return t_rowdelta
*/
t_rowdelta
t_ctx2::get_row_delta(t_index bidx, t_index eidx) {
t_uindex start_row = bidx;
t_uindex end_row = eidx;
t_uindex start_col = 1;
t_uindex end_col = get_num_view_columns();
std::vector<std::int32_t> rows;

t_uindex ctx_nrows = get_row_count();
t_uindex ctx_ncols = get_column_count();
auto ext = sanitize_get_data_extents(
ctx_nrows, ctx_ncols, start_row, end_row, start_col, end_col);
t_ctx2::get_row_delta() {
t_index nrows = get_row_count();
t_index ncols = get_num_view_columns();
tsl::hopscotch_set<t_index> rows;

std::vector<std::pair<t_uindex, t_uindex>> cells;

// get cells and imbue with additional information
for (t_index ridx = ext.m_srow; ridx < ext.m_erow; ++ridx) {
for (t_uindex cidx = 1; cidx < end_col; ++cidx) {
for (t_index ridx = 0; ridx < nrows; ++ridx) {
for (t_index cidx = 1; cidx < ncols; ++cidx) {
cells.push_back(std::pair<t_index, t_index>(ridx, cidx));
}
}
Expand All @@ -705,12 +696,10 @@ t_ctx2::get_row_delta(t_index bidx, t_index eidx) {
const auto& deltas = m_trees[c.m_treenum]->get_deltas();
auto iterators = deltas->get<by_tc_nidx_aggidx>().equal_range(c.m_idx);
auto ridx = c.m_ridx;
bool unique_ridx = std::find(rows.begin(), rows.end(), ridx) == rows.end();
if ((iterators.first != iterators.second) && unique_ridx)
rows.push_back(ridx);
if ((iterators.first != iterators.second))
rows.insert(ridx);
}

std::sort(rows.begin(), rows.end());
t_rowdelta rval(true, rows);
clear_deltas();
return rval;
Expand Down
191 changes: 94 additions & 97 deletions cpp/perspective/src/cpp/context_zero.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ t_ctx0::step_begin() {
return;

m_deltas = std::make_shared<t_zcdeltas>();
m_delta_pkeys.clear();
m_rows_changed = false;
m_columns_changed = false;
m_traversal->step_begin();
Expand Down Expand Up @@ -301,64 +302,23 @@ t_ctx0::get_step_delta(t_index bidx, t_index eidx) {
* @return t_rowdelta
*/
t_rowdelta
t_ctx0::get_row_delta(t_index bidx, t_index eidx) {
bidx = std::min(bidx, m_traversal->size());
eidx = std::min(eidx, m_traversal->size());
t_ctx0::get_row_delta() {
bool rows_changed = m_rows_changed || !m_traversal->empty_sort_by();
std::vector<std::int32_t> rows;

tsl::hopscotch_set<t_tscalar> pkeys;
t_tscalar prev_pkey;
prev_pkey.set(t_none());

if (m_traversal->empty_sort_by()) {
std::vector<t_tscalar> pkey_vec = m_traversal->get_pkeys(bidx, eidx);
for (t_index idx = 0, loop_end = pkey_vec.size(); idx < loop_end; ++idx) {
const t_tscalar& pkey = pkey_vec[idx];
t_index row = bidx + idx;
// Retrieve a pair of iterators from delta storage - start of cell, end of cell
std::pair<t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator,
t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator>
iters = m_deltas->get<by_zc_pkey_colidx>().equal_range(pkey);
for (t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator iter = iters.first;
iter != iters.second; ++iter) {
if (std::find(rows.begin(), rows.end(), row) == rows.end())
rows.push_back(row);
}
}
} else {
for (t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator iter
= m_deltas->get<by_zc_pkey_colidx>().begin();
iter != m_deltas->get<by_zc_pkey_colidx>().end(); ++iter) {
if (prev_pkey != iter->m_pkey) {
pkeys.insert(iter->m_pkey);
prev_pkey = iter->m_pkey;
}
}

// get row indices and assign into r_indices
tsl::hopscotch_map<t_tscalar, t_index> r_indices;
m_traversal->get_row_indices(pkeys, r_indices);

for (t_zcdeltas::index<by_zc_pkey_colidx>::type::iterator iter
= m_deltas->get<by_zc_pkey_colidx>().begin();
iter != m_deltas->get<by_zc_pkey_colidx>().end(); ++iter) {
t_index row = r_indices[iter->m_pkey];
bool valid_ridx = bidx <= row && row <= eidx;
bool unique_ridx = std::find(rows.begin(), rows.end(), row) == rows.end();
if (valid_ridx && unique_ridx) {
rows.push_back(row);
}
}
}
// Given a set of primary keys, transform them into row indices
tsl::hopscotch_set<t_tscalar> changed_pkeys = get_delta_pkeys();
tsl::hopscotch_set<t_index> rows = m_traversal->get_row_indices(changed_pkeys);

std::sort(rows.begin(), rows.end());
t_rowdelta rval(rows_changed, rows);
m_deltas->clear(); // what is the difference between this line and clear_deltas()?
clear_deltas();
return rval;
}

const tsl::hopscotch_set<t_tscalar>&
t_ctx0::get_delta_pkeys() const {
return m_delta_pkeys;
}

std::vector<std::string>
t_ctx0::get_column_names() const {
return m_config.get_column_names();
Expand All @@ -382,6 +342,16 @@ t_ctx0::sidedness() const {
return 0;
}

/**
* @brief Handle additions and new data, calculating deltas along the way.
*
* @param flattened
* @param delta
* @param prev
* @param curr
* @param transitions
* @param existed
*/
void
t_ctx0::notify(const t_table& flattened, const t_table& delta, const t_table& prev,
const t_table& curr, const t_table& transitions, const t_table& existed) {
Expand Down Expand Up @@ -430,10 +400,16 @@ t_ctx0::notify(const t_table& flattened, const t_table& delta, const t_table& pr
} break;
default: { PSP_COMPLAIN_AND_ABORT("Unexpected OP"); } break;
}

// add the pkey for updated rows
add_delta_pkey(pkey);
}
psp_log_time(repr() + " notify.has_filter_path.updated_traversal");

// calculate deltas
calc_step_delta(flattened, prev, curr, transitions);
m_has_delta = m_deltas->size() > 0 || delete_encountered;
m_has_delta = m_deltas->size() > 0 || m_delta_pkeys.size() > 0 || delete_encountered;

psp_log_time(repr() + " notify.has_filter_path.exit");

return;
Expand Down Expand Up @@ -461,14 +437,70 @@ t_ctx0::notify(const t_table& flattened, const t_table& delta, const t_table& pr
PSP_COMPLAIN_AND_ABORT("Unexpected OP");
} break;
}

// add the pkey for updated rows
add_delta_pkey(pkey);
}

psp_log_time(repr() + " notify.no_filter_path.updated_traversal");

// calculate deltas
calc_step_delta(flattened, prev, curr, transitions);
m_has_delta = m_deltas->size() > 0 || delete_encountered;
m_has_delta = m_deltas->size() > 0 || m_delta_pkeys.size() > 0 || delete_encountered;

psp_log_time(repr() + " notify.no_filter_path.exit");
}

/**
* @brief Handle the addition of new data.
*
* @param flattened
*/
void
t_ctx0::notify(const t_table& flattened) {
t_uindex nrecs = flattened.size();
std::shared_ptr<const t_column> pkey_sptr = flattened.get_const_column("psp_pkey");
std::shared_ptr<const t_column> op_sptr = flattened.get_const_column("psp_op");
const t_column* pkey_col = pkey_sptr.get();
const t_column* op_col = op_sptr.get();

m_has_delta = true;

if (m_config.has_filters()) {
t_mask msk = filter_table_for_config(flattened, m_config);

for (t_uindex idx = 0; idx < nrecs; ++idx) {
t_tscalar pkey = m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);

switch (op) {
case OP_INSERT: {
if (msk.get(idx)) {
m_traversal->add_row(m_state, m_config, pkey);
}
} break;
default: {
// pass
} break;
}
}
return;
}

for (t_uindex idx = 0; idx < nrecs; ++idx) {
t_tscalar pkey = m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);

switch (op) {
case OP_INSERT: {
m_traversal->add_row(m_state, m_config, pkey);
} break;
default: { } break; }
}
}

void
t_ctx0::calc_step_delta(const t_table& flattened, const t_table& prev, const t_table& curr,
const t_table& transitions) {
Expand Down Expand Up @@ -511,6 +543,16 @@ t_ctx0::calc_step_delta(const t_table& flattened, const t_table& prev, const t_t
}
}

/**
* @brief Mark a primary key as updated by adding it to the tracking set.
*
* @param pkey
*/
void
t_ctx0::add_delta_pkey(t_tscalar pkey) {
m_delta_pkeys.insert(pkey);
}

std::vector<t_minmax>
t_ctx0::get_min_max() const {
return m_minmax;
Expand Down Expand Up @@ -551,51 +593,6 @@ t_ctx0::has_deltas() const {
return m_has_delta;
}

void
t_ctx0::notify(const t_table& flattened) {
t_uindex nrecs = flattened.size();
std::shared_ptr<const t_column> pkey_sptr = flattened.get_const_column("psp_pkey");
std::shared_ptr<const t_column> op_sptr = flattened.get_const_column("psp_op");
const t_column* pkey_col = pkey_sptr.get();
const t_column* op_col = op_sptr.get();

m_has_delta = true;

if (m_config.has_filters()) {
t_mask msk = filter_table_for_config(flattened, m_config);

for (t_uindex idx = 0; idx < nrecs; ++idx) {
t_tscalar pkey = m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);

switch (op) {
case OP_INSERT: {
if (msk.get(idx)) {
m_traversal->add_row(m_state, m_config, pkey);
}
} break;
default: {
// pass
} break;
}
}
return;
}

for (t_uindex idx = 0; idx < nrecs; ++idx) {
t_tscalar pkey = m_symtable.get_interned_tscalar(pkey_col->get_scalar(idx));
std::uint8_t op_ = *(op_col->get_nth<std::uint8_t>(idx));
t_op op = static_cast<t_op>(op_);

switch (op) {
case OP_INSERT: {
m_traversal->add_row(m_state, m_config, pkey);
} break;
default: { } break; }
}
}

void
t_ctx0::pprint() const {}

Expand Down
18 changes: 18 additions & 0 deletions cpp/perspective/src/cpp/flat_traversal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,24 @@ t_ftrav::get_row_indices(t_index bidx, t_index eidx, const tsl::hopscotch_set<t_
}
}

/**
* @brief Given a set of primary keys, return the corresponding row indices.
*
* @param pkeys
* @return tsl::hopscotch_set<t_index>
*/
tsl::hopscotch_set<t_index>
t_ftrav::get_row_indices(const tsl::hopscotch_set<t_tscalar>& pkeys) const {
tsl::hopscotch_set<t_index> rows;
for (t_index idx = 0, loop_end = size(); idx < loop_end; ++idx) {
const t_tscalar& pkey = (*m_index)[idx].m_pkey;
if (pkeys.find(pkey) != pkeys.end()) {
rows.insert(idx);
}
}
return rows;
}

void
t_ftrav::reset() {
if (m_index.get())
Expand Down
8 changes: 7 additions & 1 deletion cpp/perspective/src/cpp/step_delta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ t_stepdelta::t_stepdelta(
// t_rowdelta contains a vector of row indices that have been changed
t_rowdelta::t_rowdelta() {}

t_rowdelta::t_rowdelta(bool rows_changed, const std::vector<std::int32_t>& rows)
t_rowdelta::t_rowdelta(bool rows_changed, const tsl::hopscotch_set<t_index>& rows)
: rows_changed(rows_changed)
, rows(rows) {}

t_rowdelta::t_rowdelta(bool rows_changed, const tsl::hopscotch_set<t_index>& rows,
const std::vector<t_tscalar>& data)
: rows_changed(rows_changed)
, rows(rows)
, data(data) {}
} // end namespace perspective

namespace std {
Expand Down
Loading

0 comments on commit bd2c60f

Please sign in to comment.