Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to Arrow updates and indexed columns #837

Merged
merged 13 commits into from
Dec 9, 2019
3 changes: 1 addition & 2 deletions cpp/perspective/src/cpp/arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ namespace arrow {
} else if (src == "date32" || src == "date64") {
return DTYPE_DATE;
}
// TODO: remove and decide what to do when we can't parse the type
std::stringstream ss;
ss << "Could not convert arrow type: " << src << std::endl;
ss << "Could not load arrow column of type `" << src << "`" << std::endl;
PSP_COMPLAIN_AND_ABORT(ss.str());
return DTYPE_STR;
}
Expand Down
22 changes: 17 additions & 5 deletions cpp/perspective/src/cpp/data_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,15 @@ t_data_table::append(const t_data_table& other) {
t_dtype col_dtype = get_column(cname)->get_dtype();
t_dtype other_col_dtype = other.get_const_column(cname)->get_dtype();
if (col_dtype != other_col_dtype) {
std::stringstream ss;
ss << "Mismatched column dtypes: attempted to append column of dtype `"
<< dtype_to_str(other_col_dtype) << "` to existing column of dtype `"
<< dtype_to_str(col_dtype) << std::endl;
std::stringstream ss;
ss << "Mismatched dtypes for `"
<< cname
<< "`: attempted to append column of dtype `"
<< get_dtype_descr(other_col_dtype)
<< "` to existing column of dtype `"
<< get_dtype_descr(col_dtype) << "`"
<< std::endl;
std::cout << ss.str();
PSP_COMPLAIN_AND_ABORT(ss.str())
}
src_cols.push_back(other.get_const_column(cname).get());
Expand Down Expand Up @@ -554,6 +559,11 @@ t_data_table::promote_column(
return;
}

t_dtype current_dtype = m_schema.get_dtype(name);
if (current_dtype == new_dtype) {
return;
}

t_uindex idx = m_schema.get_colidx(name);
std::shared_ptr<t_column> current_col = m_columns[idx];

Expand Down Expand Up @@ -582,7 +592,9 @@ t_data_table::promote_column(
std::string fval = std::to_string(*val);
promoted_col->set_nth(i, fval);
} break;
default: { PSP_COMPLAIN_AND_ABORT("Bad promotion"); }
default: {
PSP_COMPLAIN_AND_ABORT("Columns can only be promoted to int64, float64, or string type.");
}
}
}
}
Expand Down
104 changes: 65 additions & 39 deletions cpp/perspective/src/cpp/emscripten.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1123,35 +1123,91 @@ namespace binding {
std::shared_ptr<Table>
make_table(t_val table, t_data_accessor accessor, t_val computed,
std::uint32_t limit, const std::string& index, t_op op, bool is_update, bool is_arrow) {
bool table_initialized = has_value(table);
std::shared_ptr<t_pool> pool;
std::shared_ptr<Table> tbl;
std::shared_ptr<t_gnode> gnode;
std::uint32_t offset;

// If the Table has already been created, use it
if (table_initialized) {
tbl = table.as<std::shared_ptr<Table>>();
pool = tbl->get_pool();
gnode = tbl->get_gnode();
offset = tbl->get_offset();
is_update = (is_update || gnode->mapping_size() > 0);
}

std::vector<std::string> column_names;
std::vector<t_dtype> data_types;
arrow::ArrowLoader loader;
std::uintptr_t ptr;

// Determine metadata
bool is_delete = op == OP_DELETE;
if (is_arrow) {

// Get details of the Typed Array from JS
if (is_arrow && !is_delete) {
t_val constructor = accessor["constructor"];
std::int32_t length = accessor["byteLength"].as<std::int32_t>();

// Allocate memory
ptr = reinterpret_cast<std::uintptr_t>(malloc(length));
if (ptr == NULL) {
std::cout << "ERROR" << std::endl;
std::cout << "Unable to load arrow of size 0" << std::endl;
return nullptr;
}

// Write to the C++ heap where we allocated the space
t_val memory = t_val::module_property("HEAP8")["buffer"];
t_val memoryView = constructor.new_(memory, ptr, length);
memoryView.call<void>("set", accessor);
// Dispatch to the core library

// Parse the arrow and get its metadata
loader.initialize(ptr, length);
column_names = loader.names();
data_types = loader.types();

// Always use the `Table` column names and data types on up
if (table_initialized && is_update) {
auto schema = gnode->get_tblschema();
column_names = schema.columns();
data_types = schema.types();

auto data_table = gnode->get_table();
if (data_table->size() == 0) {
/**
* If updating a table created from schema, a 32-bit int/float
* needs to be promoted to a 64-bit int/float if specified in
* the Arrow schema.
*/
std::vector<t_dtype> arrow_dtypes = loader.types();
for (auto idx = 0; idx < column_names.size(); ++idx) {
const std::string& name = column_names[idx];
bool can_retype = name != "psp_okey" && name != "psp_pkey" && name != "psp_op";
bool is_32_bit = data_types[idx] == DTYPE_INT32 || data_types[idx] == DTYPE_FLOAT32;
if (can_retype && is_32_bit) {
t_dtype arrow_dtype = arrow_dtypes[idx];
switch (arrow_dtype) {
case DTYPE_INT64:
case DTYPE_FLOAT64: {
std::cout << "Promoting column `"
<< column_names[idx]
<< "` to maintain consistency with Arrow type."
<< std::endl;
gnode->promote_column(name, arrow_dtype);
} break;
default: {
continue;
}
}
}
}

// Updated data types need to reflect in new data table
auto new_schema = gnode->get_tblschema();
data_types = new_schema.types();
}
} else {
column_names = loader.names();
data_types = loader.types();
}
} else if (is_update || is_delete) {
t_val names = accessor["names"];
t_val types = accessor["types"];
Expand All @@ -1165,37 +1221,7 @@ namespace binding {
data_types = get_data_types(data, format, column_names, accessor["date_validator"]);
}

bool table_initialized = has_value(table);
std::shared_ptr<Table> tbl;
std::uint32_t offset;

// If the Table has already been created, use it
if (table_initialized) {
// Get a reference to the Table, and update its metadata
tbl = table.as<std::shared_ptr<Table>>();
tbl->set_column_names(column_names);
tbl->set_data_types(data_types);
offset = tbl->get_offset();

auto current_gnode = tbl->get_gnode();

// use gnode metadata to help decide if we need to update
is_update = (is_update || current_gnode->mapping_size() > 0);

// if performing an arrow schema update, promote columns
auto current_data_table = current_gnode->get_table();

if (is_arrow && is_update && current_data_table->size() == 0) {
auto current_schema = current_data_table->get_schema();
for (auto idx = 0; idx < current_schema.m_types.size(); ++idx) {
if (data_types[idx] == DTYPE_INT64) {
std::cout << "Promoting int64 `" << column_names[idx] << "`"
<< std::endl;
current_gnode->promote_column(column_names[idx], DTYPE_INT64);
}
}
}
} else {
if (!table_initialized) {
std::shared_ptr<t_pool> pool = std::make_shared<t_pool>();
tbl = std::make_shared<Table>(
pool, column_names, data_types, limit, index);
Expand Down
22 changes: 2 additions & 20 deletions cpp/perspective/src/cpp/vocab.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,7 @@ t_vocab::get_interned(const char* s) {
} else {
idx = iter->second;
}
#ifndef PSP_ENABLE_WASM
#ifdef PSP_COLUMN_VERIFY
if (std::string(s) == "") {
PSP_VERBOSE_ASSERT(idx == 0, "Expected empty string to map to 0");
}
#endif
#endif

return idx;
}

Expand All @@ -115,9 +109,6 @@ t_vocab::init(bool from_recipe) {
if (from_recipe) {
rebuild_map();
}
#ifndef PSP_ENABLE_WASM
get_interned("");
#endif // PSP_ENABLE_WASM
}

t_uindex
Expand All @@ -133,16 +124,7 @@ t_vocab::verify() const {
rlookup[kv.second] = kv.first;
}

#ifndef PSP_ENABLE_WASM
auto zero = rlookup.find(t_uindex(0));
PSP_VERBOSE_ASSERT(zero, != rlookup.end(), "0 Not found");
PSP_VERBOSE_ASSERT(std::string(zero->second), == "", "0 mapped to unknown");
#endif

tsl::hopscotch_set<std::string> seen;
#ifndef PSP_ENABLE_WASM
seen.insert(std::string(""));
#endif

for (t_uindex idx = 1; idx < m_vlenidx; ++idx) {
std::stringstream ss;
Expand Down Expand Up @@ -193,7 +175,7 @@ void
t_vocab::pprint_vocabulary() const {
std::cout << "vocabulary =========\n";
for (t_uindex idx = 0; idx < m_vlenidx; ++idx) {
std::cout << "\t" << idx << " => " << unintern_c(idx) << std::endl;
std::cout << "\t" << idx << " => '" << unintern_c(idx) << "'" << std::endl;
}

std::cout << "end vocabulary =========\n";
Expand Down
2 changes: 1 addition & 1 deletion cpp/perspective/src/include/perspective/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ PERSPECTIVE_EXPORT void psp_abort(const std::string& message);
ss << __FILE__ << ":" << __LINE__ << ": " << MSG << " : " \
<< perspective::get_error_str(); \
perror(ss.str().c_str()); \
psp_abort("Verbose assert failed!"); \
psp_abort("Verbose assert failed!"); \
} \
}

Expand Down
27 changes: 20 additions & 7 deletions cpp/perspective/src/include/perspective/data_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace perspective {
template <typename DATA_T>
struct t_rowpack {
DATA_T m_pkey;
bool m_pkey_is_valid;
t_index m_idx;
t_op m_op;
};
Expand Down Expand Up @@ -160,7 +161,8 @@ t_data_table::flatten_body(FLATTENED_T flattened) const {
PSP_VERBOSE_ASSERT(m_init, "touching uninited object");
PSP_VERBOSE_ASSERT(is_pkey_table(), "Not a pkeyed table");

switch (get_const_column("psp_pkey")->get_dtype()) {
t_dtype pkey_dtype = get_const_column("psp_pkey")->get_dtype();
switch (pkey_dtype) {
case DTYPE_INT64: {
flatten_helper_1<FLATTENED_T, std::int64_t>(flattened);
} break;
Expand Down Expand Up @@ -200,7 +202,12 @@ t_data_table::flatten_body(FLATTENED_T flattened) const {
case DTYPE_FLOAT32: {
flatten_helper_1<FLATTENED_T, float>(flattened);
} break;
default: { PSP_COMPLAIN_AND_ABORT("Unsupported key type"); }
default: {
std::stringstream ss;
ss << "Unsupported type `" << get_dtype_descr(pkey_dtype)
<< "` for `index`." << std::endl;
PSP_COMPLAIN_AND_ABORT(ss.str());
}
}

return;
Expand Down Expand Up @@ -262,6 +269,7 @@ t_data_table::flatten_helper_1(FLATTENED_T flattened) const {
std::vector<t_rowpack<PKEY_T>> sorted(frags_size);
for (t_uindex fragidx = 0; fragidx < frags_size; ++fragidx) {
sorted[fragidx].m_pkey = *(s_pkey_col->get_nth<PKEY_T>(fragidx));
sorted[fragidx].m_pkey_is_valid = s_pkey_col->is_valid(fragidx);
sorted[fragidx].m_op = static_cast<t_op>(*(s_op_col->get_nth<std::uint8_t>(fragidx)));
sorted[fragidx].m_idx = fragidx;
}
Expand All @@ -280,7 +288,8 @@ t_data_table::flatten_helper_1(FLATTENED_T flattened) const {
edges.push_back(0);

for (t_index idx = 1, loop_end = sorted.size(); idx < loop_end; ++idx) {
if (sorted[idx].m_pkey != sorted[idx - 1].m_pkey) {
if ((sorted[idx].m_pkey_is_valid != sorted[idx - 1].m_pkey_is_valid)
|| (sorted[idx].m_pkey != sorted[idx - 1].m_pkey)) {
edges.push_back(idx);
}
}
Expand All @@ -306,9 +315,11 @@ t_data_table::flatten_helper_1(FLATTENED_T flattened) const {
}

const auto& sort_rec = sorted[bidx];

if (delete_encountered) {
d_pkey_col->push_back(sort_rec.m_pkey);
d_pkey_col->push_back(
sort_rec.m_pkey,
sort_rec.m_pkey_is_valid ? t_status::STATUS_VALID
: t_status::STATUS_INVALID);
std::uint8_t op8 = OP_DELETE;
d_op_col->push_back(op8);
++store_idx;
Expand All @@ -321,7 +332,10 @@ t_data_table::flatten_helper_1(FLATTENED_T flattened) const {
rec.m_eidx = eidx;
fltrecs.push_back(rec);

d_pkey_col->push_back(sort_rec.m_pkey);
d_pkey_col->push_back(
sort_rec.m_pkey,
sort_rec.m_pkey_is_valid ? t_status::STATUS_VALID
: t_status::STATUS_INVALID);

std::uint8_t op8 = OP_INSERT;
d_op_col->push_back(op8);
Expand Down Expand Up @@ -410,7 +424,6 @@ t_data_table::flatten_helper_1(FLATTENED_T flattened) const {
);
#endif

d_pkey_col->valid_raw_fill();
d_op_col->valid_raw_fill();
}

Expand Down
59 changes: 59 additions & 0 deletions examples/simple/arrow.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<!--
Copyright (c) 2017, the Perspective Authors.
This file is part of the Perspective library, distributed under the terms of
the Apache License 2.0. The full license can be found in the LICENSE file.
-->

<!DOCTYPE html>
<html>

<head>

<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1, minimum-scale=1, user-scalable=no">

<script src="perspective-viewer.js"></script>
<script src="perspective-viewer-hypergrid.js"></script>
<script src="perspective-viewer-d3fc.js"></script>

<script src="perspective.js"></script>

<link rel='stylesheet' href="index.css">
<link rel='stylesheet' href="material.css" is="custom-style">


</head>

<body>

<perspective-viewer id="viewer">

</perspective-viewer>

<script>
window.addEventListener('WebComponentsReady', function() {
const elem = document.getElementById("viewer");
fetch("date32.arrow").then((response) => {
if (!response.ok) {
throw new Error("Error: status " + status);
}

const date32 = response.arrayBuffer();
date32.then(buffer => elem.load(buffer));
})
fetch("date64.arrow").then((response) => {
if (!response.ok) {
throw new Error("Error: status " + status);
}

const date64 = response.arrayBuffer();
date64.then(buffer => elem.update(buffer));
})
});
</script>

</body>

</html>
Binary file added examples/simple/date32.arrow
Binary file not shown.
Binary file added examples/simple/date64.arrow
Binary file not shown.
Loading