Skip to content

Commit

Permalink
Merge pull request #1043 from finos/ports
Browse files Browse the repository at this point in the history
Implement Client/Server Editing
  • Loading branch information
texodus authored May 21, 2020
2 parents 1023492 + 40dd19a commit b2914ed
Show file tree
Hide file tree
Showing 48 changed files with 2,249 additions and 331 deletions.
7 changes: 5 additions & 2 deletions cpp/perspective/src/cpp/emscripten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,8 @@ namespace binding {
const std::string& index,
t_op op,
bool is_update,
bool is_arrow) {
bool is_arrow,
t_uindex port_id) {
bool table_initialized = has_value(table);
std::shared_ptr<t_pool> pool;
std::shared_ptr<Table> tbl;
Expand Down Expand Up @@ -1163,7 +1164,7 @@ namespace binding {
}

// calculate offset, limit, and set the gnode
tbl->init(data_table, row_count, op);
tbl->init(data_table, row_count, op, port_id);
return tbl;
}

Expand Down Expand Up @@ -1629,6 +1630,8 @@ EMSCRIPTEN_BINDINGS(perspective) {
.function("get_computed_schema", &Table::get_computed_schema)
.function("unregister_gnode", &Table::unregister_gnode)
.function("reset_gnode", &Table::reset_gnode)
.function("make_port", &Table::make_port)
.function("remove_port", &Table::remove_port)
.function("get_id", &Table::get_id)
.function("get_pool", &Table::get_pool)
.function("get_gnode", &Table::get_gnode);
Expand Down
194 changes: 134 additions & 60 deletions cpp/perspective/src/cpp/gnode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ t_gnode::t_gnode(const t_schema& input_schema, const t_schema& output_schema)
, m_output_schema(output_schema)
, m_init(false)
, m_id(0)
, m_last_input_port_id(0)
, m_pool_cleanup([]() {}) {
PSP_TRACE_SENTINEL();
LOG_CONSTRUCTOR("t_gnode");
Expand Down Expand Up @@ -77,11 +78,14 @@ t_gnode::init() {
m_gstate = std::make_shared<t_gstate>(m_input_schema, m_output_schema);
m_gstate->init();

// Create a single input port
std::shared_ptr<t_port> port
= std::make_shared<t_port>(PORT_MODE_PKEYED, m_input_schema);
port->init();
m_iports.push_back(port);
// Create and store the main input port, which is always port 0. The next
// input port will be port 1, and so on
std::shared_ptr<t_port> input_port =
std::make_shared<t_port>(PORT_MODE_PKEYED, m_input_schema);

input_port->init();

m_input_ports[0] = input_port;

for (t_uindex idx = 0, loop_end = m_transitional_schemas.size(); idx < loop_end; ++idx) {
t_port_mode mode = idx == 0 ? PORT_MODE_PKEYED : PORT_MODE_RAW;
Expand All @@ -92,32 +96,46 @@ t_gnode::init() {
m_oports.push_back(port);
}

std::shared_ptr<t_port>& iport = m_iports[0];
std::shared_ptr<t_data_table> flattened = iport->get_table()->flatten();
for (auto& iter : m_input_ports) {
std::shared_ptr<t_port> input_port = iter.second;
input_port->get_table()->flatten();
}

m_init = true;
}

std::string
t_gnode::repr() const {
std::stringstream ss;
ss << "t_gnode<" << this << ">";
return ss.str();
}
t_uindex
t_gnode::make_input_port() {
PSP_VERBOSE_ASSERT(m_init, "Cannot `make_input_port` on an uninited gnode.");
std::shared_ptr<t_port> input_port =
std::make_shared<t_port>(PORT_MODE_PKEYED, m_input_schema);
input_port->init();

void
t_gnode::_send(t_uindex portid, const t_data_table& fragments) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(portid == 0, "Only simple dataflows supported currently");
t_uindex port_id = m_last_input_port_id + 1;
m_input_ports[port_id] = input_port;

std::shared_ptr<t_port>& iport = m_iports[portid];
iport->send(fragments);
// increment the global input port id
m_last_input_port_id = port_id;

return port_id;
}

void
t_gnode::_send_and_process(const t_data_table& fragments) {
_send(0, fragments);
_process();
t_gnode::remove_input_port(t_uindex port_id) {
PSP_VERBOSE_ASSERT(m_init, "Cannot `remove_input_port` on an uninited gnode.");

if (m_input_ports.count(port_id) == 0) {
std::cerr << "Input port `" << port_id << "` cannot be removed, as it does not exist.";
return;
}

std::shared_ptr<t_port> input_port = m_input_ports[port_id];

// clear the table at the port
input_port->clear();

// remove from the map
m_input_ports.erase(port_id);
}

t_value_transition
Expand Down Expand Up @@ -217,18 +235,29 @@ t_gnode::_process_mask_existed_rows(t_process_state& process_state) {
return mask;
}

std::shared_ptr<t_data_table>
t_gnode::_process_table() {
t_process_table_result
t_gnode::_process_table(t_uindex port_id) {
m_was_updated = false;

std::shared_ptr<t_port>& iport = m_iports[0];
t_process_table_result result;
result.m_flattened_data_table = nullptr;
result.m_should_notify_userspace = false;

std::shared_ptr<t_data_table> flattened = nullptr;

if (m_input_ports.count(port_id) == 0) {
std::cerr << "Cannot process table on port `" << port_id << "` as it does not exist." << std::endl;
return result;
}

std::shared_ptr<t_port> input_port = m_input_ports[port_id];

if (iport->get_table()->size() == 0) {
return nullptr;
if (input_port->get_table()->size() == 0) {
return result;
}

m_was_updated = true;
std::shared_ptr<t_data_table> flattened(iport->get_table()->flatten());
flattened = input_port->get_table()->flatten();

PSP_GNODE_VERIFY_TABLE(flattened);
PSP_GNODE_VERIFY_TABLE(get_table());
Expand All @@ -244,6 +273,7 @@ t_gnode::_process_table() {
row_lookup[idx] = m_gstate->lookup(pkey);
}

// first update - master table is empty
if (m_gstate->mapping_size() == 0) {
// Update context from state first - computes columns during update
_update_contexts_from_state(flattened);
Expand All @@ -256,12 +286,12 @@ t_gnode::_process_table() {
auto state_table = get_table();
PSP_GNODE_VERIFY_TABLE(state_table);
#endif
return nullptr;
// Make sure user is notified after first update.
result.m_should_notify_userspace = true;
return result;
}

for (t_uindex idx = 0, loop_end = m_iports.size(); idx < loop_end; ++idx) {
m_iports[idx]->release_or_clear();
}
input_port->release_or_clear();

// Use `t_process_state` to manage intermediate structures
t_process_state _process_state;
Expand Down Expand Up @@ -440,8 +470,13 @@ t_gnode::_process_table() {
PSP_GNODE_VERIFY_TABLE(updated_table);
}
#endif

m_oports[PSP_PORT_FLATTENED]->set_table(flattened_masked);
return flattened_masked;

result.m_flattened_data_table = flattened_masked;
result.m_should_notify_userspace = true;

return result;
}

template <>
Expand Down Expand Up @@ -532,19 +567,33 @@ t_gnode::_process_column<std::string>(
}

void
t_gnode::_process() {
t_gnode::send(t_uindex port_id, const t_data_table& fragments) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(
m_mode == NODE_PROCESSING_SIMPLE_DATAFLOW, "Only simple dataflows supported currently");
psp_log_time(repr() + " _process.enter");
PSP_VERBOSE_ASSERT(m_init, "Cannot `send` to an uninited gnode.");

std::shared_ptr<t_data_table> flattened_masked = _process_table();
if (flattened_masked) {
notify_contexts(*flattened_masked);
if (m_input_ports.count(port_id) == 0) {
std::cerr << "Cannot send table to port `" << port_id << "`, which does not exist." << std::endl;
return;
}

psp_log_time(repr() + " _process.noinit_path.exit");
std::shared_ptr<t_port>& input_port = m_input_ports[port_id];
input_port->send(fragments);
}

bool
t_gnode::process(t_uindex port_id) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "Cannot `process` on an uninited gnode.");

t_process_table_result result = _process_table(port_id);

if (result.m_flattened_data_table) {
notify_contexts(*result.m_flattened_data_table);
}

// Whether the user should be notified - False if process_table exited
// early, True otherwise.
return result.m_should_notify_userspace;
}

t_uindex
Expand All @@ -553,39 +602,39 @@ t_gnode::mapping_size() const {
}

t_data_table*
t_gnode::_get_otable(t_uindex portidx) {
t_gnode::_get_otable(t_uindex port_id) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(portidx < m_oports.size(), "Invalid port number");
return m_oports[portidx]->get_table().get();
PSP_VERBOSE_ASSERT(m_init, "Cannot `_get_otable` on an uninited gnode.");
PSP_VERBOSE_ASSERT(port_id < m_oports.size(), "Invalid port number");
return m_oports[port_id]->get_table().get();
}

t_data_table*
t_gnode::_get_itable(t_uindex portidx) {
t_gnode::_get_itable(t_uindex port_id) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(portidx < m_iports.size(), "Invalid port number");
return m_iports[portidx]->get_table().get();
PSP_VERBOSE_ASSERT(m_init, "Cannot `_get_itable` on an uninited gnode.");
PSP_VERBOSE_ASSERT(m_input_ports.count(port_id) != 0, "Invalid port number");
return m_input_ports[port_id]->get_table().get();
}

t_data_table*
t_gnode::get_table() {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(m_init, "Cannot `get_table` on an uninited gnode.");
return m_gstate->get_table().get();
}

const t_data_table*
t_gnode::get_table() const {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(m_init, "Cannot `get_table` on an uninited gnode.");
return m_gstate->get_table().get();
}

std::shared_ptr<t_data_table>
t_gnode::get_table_sptr() {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(m_init, "Cannot `get_table_sptr` on an uninited gnode.");
return m_gstate->get_table();
}

Expand All @@ -597,10 +646,16 @@ t_gnode::get_table_sptr() {
void
t_gnode::promote_column(const std::string& name, t_dtype new_type) {
PSP_TRACE_SENTINEL();
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(m_init, "Cannot `promote_column` on an uninited gnode.");
get_table()->promote_column(name, new_type, 0, false);
_get_otable(0)->promote_column(name, new_type, 0, false);
_get_itable(0)->promote_column(name, new_type, 0, false);

for (auto& iter : m_input_ports) {
std::shared_ptr<t_port> input_port = iter.second;
std::shared_ptr<t_data_table> input_table = input_port->get_table();
input_table->promote_column(name, new_type, 0, false);
}

m_output_schema.retype_column(name, new_type);
m_input_schema.retype_column(name, new_type);
m_transitional_schemas[0].retype_column(name, new_type);
Expand Down Expand Up @@ -1098,10 +1153,21 @@ t_gnode::get_id() const {
return m_id;
}

t_uindex
t_gnode::num_input_ports() const {
return m_input_ports.size();
}

t_uindex
t_gnode::num_output_ports() const {
return m_oports.size();
}

void
t_gnode::release_inputs() {
for (const auto& p : m_iports) {
p->release();
for (auto& iter : m_input_ports) {
std::shared_ptr<t_port> input_port = iter.second;
input_port->release();
}
}

Expand Down Expand Up @@ -1194,8 +1260,9 @@ t_gnode::reset() {

void
t_gnode::clear_input_ports() {
for (t_uindex idx = 0, loop_end = m_oports.size(); idx < loop_end; ++idx) {
m_iports[idx]->get_table()->clear();
for (auto& iter : m_input_ports) {
std::shared_ptr<t_port> input_port = iter.second;
input_port->get_table()->clear();
}
}

Expand Down Expand Up @@ -1246,6 +1313,13 @@ t_gnode::get_sorted_pkeyed_table() const {
return m_gstate->get_sorted_pkeyed_table();
}

std::string
t_gnode::repr() const {
std::stringstream ss;
ss << "t_gnode<" << this << ">";
return ss.str();
}

void
t_gnode::register_context(const std::string& name, std::shared_ptr<t_ctx0> ctx) {
_register_context(name, ZERO_SIDED_CONTEXT, reinterpret_cast<std::int64_t>(ctx.get()));
Expand Down
Loading

0 comments on commit b2914ed

Please sign in to comment.