Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow fixes #772

Merged
merged 3 commits into from
Oct 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 94 additions & 60 deletions cpp/perspective/src/cpp/arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <perspective/arrow.h>

#include <arrow/api.h>
#include <arrow/util/decimal.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/reader.h>

Expand All @@ -31,7 +32,7 @@ namespace arrow {
return DTYPE_INT16;
} else if (src == "int32") {
return DTYPE_INT32;
} else if (src == "int64") {
} else if (src == "decimal" || src == "decimal128" || src == "int64") {
return DTYPE_INT64;
} else if (src == "float") {
return DTYPE_FLOAT32;
Expand All @@ -40,8 +41,7 @@ namespace arrow {
} else if (src == "timestamp") {
return DTYPE_TIME;
}
std::cout << "No type valid type found for: " << src << std::endl;
PSP_COMPLAIN_AND_ABORT("No type valid type found");
std::cerr << "Unknown column type '" << src << "'" << std::endl;
return DTYPE_STR;
}

Expand All @@ -51,38 +51,59 @@ namespace arrow {
void
ArrowLoader::initialize(const uintptr_t ptr, const uint32_t length) {
io::BufferReader buffer_reader(reinterpret_cast<const std::uint8_t*>(ptr), length);
std::shared_ptr<ipc::RecordBatchFileReader> batch_reader;
auto status = ipc::RecordBatchFileReader::Open(&buffer_reader, &batch_reader);

if (!status.ok()) {
PSP_COMPLAIN_AND_ABORT(status.message());
if (std::memcmp("ARROW1", (const void *)ptr, 6) == 0) {
std::shared_ptr<ipc::RecordBatchFileReader> batch_reader;
::arrow::Status status = ipc::RecordBatchFileReader::Open(&buffer_reader, &batch_reader);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
} else {
std::vector<std::shared_ptr<RecordBatch>> batches;
auto num_batches = batch_reader->num_record_batches();
for (int i = 0; i < num_batches; ++i) {
std::shared_ptr<RecordBatch> chunk;
status = batch_reader->ReadRecordBatch(i, &chunk);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
}
batches.push_back(chunk);
}
status = ::arrow::Table::FromRecordBatches(batches, &m_table);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
};
};
} else {
auto num_batches = batch_reader->num_record_batches();
for (int i = 0; i < num_batches; ++i) {
std::shared_ptr<RecordBatch> chunk;
auto status = batch_reader->ReadRecordBatch(i, &chunk);
std::shared_ptr<ipc::RecordBatchReader> batch_reader;
::arrow::Status status = ipc::RecordBatchStreamReader::Open(&buffer_reader, &batch_reader);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
} else {
status = batch_reader->ReadAll(&m_table);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
}
m_batches.push_back(chunk);
};
}
}

// Get names and schema
std::shared_ptr<Schema> schema = m_batches[0]->schema();
std::vector<std::shared_ptr<Field>> fields = schema->fields();
std::shared_ptr<Schema> schema = m_table->schema();
std::vector<std::shared_ptr<Field>> fields = schema->fields();

for (auto field : fields) {
m_names.push_back(field->name());
m_types.push_back(convert_type(field->type()->name()));
}
for (auto field : fields) {
m_names.push_back(field->name());
m_types.push_back(convert_type(field->type()->name()));
}
}

void
ArrowLoader::fill_table(t_data_table& tbl, const std::string& index, std::uint32_t offset,
std::uint32_t limit, bool is_update) {
bool implicit_index = false;
std::shared_ptr<Schema> schema = m_batches[0]->schema();
std::shared_ptr<Schema> schema = m_table->schema();
std::vector<std::shared_ptr<Field>> fields = schema->fields();

for (long unsigned int cidx = 0; cidx < m_names.size(); ++cidx) {
Expand Down Expand Up @@ -125,17 +146,17 @@ namespace arrow {
template <typename T, typename V>
void
iter_col_copy(std::shared_ptr<t_column> dest, std::shared_ptr<::arrow::Array> src,
const uint32_t len) {
const int64_t offset, const int64_t len) {
std::shared_ptr<T> scol = std::static_pointer_cast<T>(src);
const typename T::value_type* vals = scol->raw_values();
for (uint32_t i = 0; i < len; i++) {
dest->set_nth<V>(i, static_cast<V>(vals[i]));
dest->set_nth<V>(offset + i, static_cast<V>(vals[i]));
}
}

void
copy_array(std::shared_ptr<t_column> dest, std::shared_ptr<::arrow::Array> src,
const uint32_t len) {
const int64_t offset, const int64_t len) {
switch (src->type()->id()) {
case ::arrow::DictionaryType::type_id: {
auto scol = std::static_pointer_cast<::arrow::DictionaryArray>(src);
Expand All @@ -157,14 +178,14 @@ namespace arrow {
auto indices = scol->indices();
switch (indices->type()->id()) {
case ::arrow::Int8Type::type_id: {
iter_col_copy<::arrow::Int8Array, uint32_t>(dest, indices, len);
iter_col_copy<::arrow::Int8Array, uint32_t>(dest, indices, offset, len);
} break;
case ::arrow::Int16Type::type_id: {
iter_col_copy<::arrow::Int16Array, uint32_t>(dest, indices, len);
iter_col_copy<::arrow::Int16Array, uint32_t>(dest, indices, offset, len);
} break;
case ::arrow::Int32Type::type_id: {
auto sindices = std::static_pointer_cast<::arrow::Int32Array>(indices);
std::memcpy(dest->get_nth<std::int32_t>(0),
std::memcpy(dest->get_nth<std::int32_t>(offset),
(void*)sindices->raw_values(), len * 4);
} break;
default:
Expand All @@ -184,24 +205,24 @@ namespace arrow {
std::int32_t bidx = offsets[i];
std::size_t es = offsets[i + 1] - bidx;
elem.assign(reinterpret_cast<const char*>(values) + bidx, es);
dest->set_nth(i, elem);
dest->set_nth(offset + i, elem);
}
} break;
case ::arrow::Int8Type::type_id: {
auto scol = std::static_pointer_cast<::arrow::Int8Array>(src);
std::memcpy(dest->get_nth<std::int8_t>(0), (void*)scol->raw_values(), len);
std::memcpy(dest->get_nth<std::int8_t>(offset), (void*)scol->raw_values(), len);
} break;
case ::arrow::Int16Type::type_id: {
auto scol = std::static_pointer_cast<::arrow::Int16Array>(src);
std::memcpy(dest->get_nth<std::int16_t>(0), (void*)scol->raw_values(), len * 2);
std::memcpy(dest->get_nth<std::int16_t>(offset), (void*)scol->raw_values(), len * 2);
} break;
case ::arrow::Int32Type::type_id: {
auto scol = std::static_pointer_cast<::arrow::Int32Array>(src);
std::memcpy(dest->get_nth<std::int32_t>(0), (void*)scol->raw_values(), len * 4);
std::memcpy(dest->get_nth<std::int32_t>(offset), (void*)scol->raw_values(), len * 4);
} break;
case ::arrow::Int64Type::type_id: {
auto scol = std::static_pointer_cast<::arrow::Int64Array>(src);
std::memcpy(dest->get_nth<std::int64_t>(0), (void*)scol->raw_values(), len * 8);
std::memcpy(dest->get_nth<std::int64_t>(offset), (void*)scol->raw_values(), len * 8);
} break;
case ::arrow::TimestampType::type_id: {
std::shared_ptr<::arrow::TimestampType> tunit
Expand All @@ -210,47 +231,57 @@ namespace arrow {
switch (tunit->unit()) {
case ::arrow::TimeUnit::MILLI: {
std::memcpy(
dest->get_nth<double>(0), (void*)scol->raw_values(), len * 8);
dest->get_nth<double>(offset), (void*)scol->raw_values(), len * 8);
} break;
case ::arrow::TimeUnit::NANO: {
const int64_t* vals = scol->raw_values();
for (uint32_t i = 0; i < len; i++) {
dest->set_nth<int64_t>(i, vals[i] / 1000000);
dest->set_nth<int64_t>(offset + i, vals[i] / 1000000);
}
} break;
case ::arrow::TimeUnit::MICRO: {
const int64_t* vals = scol->raw_values();
for (uint32_t i = 0; i < len; i++) {
dest->set_nth<int64_t>(i, vals[i] / 1000);
dest->set_nth<int64_t>(offset + i, vals[i] / 1000);
}
} break;
case ::arrow::TimeUnit::SECOND: {
const int64_t* vals = scol->raw_values();
for (uint32_t i = 0; i < len; i++) {
dest->set_nth<int64_t>(i, vals[i] * 1000);
dest->set_nth<int64_t>(offset + i, vals[i] * 1000);
}
} break;
}
} break;
case ::arrow::FloatType::type_id: {
auto scol = std::static_pointer_cast<::arrow::FloatArray>(src);
std::memcpy(dest->get_nth<float>(0), (void*)scol->raw_values(), len * 4);
std::memcpy(dest->get_nth<float>(offset), (void*)scol->raw_values(), len * 4);
} break;
case ::arrow::DoubleType::type_id: {
auto scol = std::static_pointer_cast<::arrow::DoubleArray>(src);
std::memcpy(dest->get_nth<double>(0), (void*)scol->raw_values(), len * 8);
std::memcpy(dest->get_nth<double>(offset), (void*)scol->raw_values(), len * 8);
} break;
case ::arrow::DecimalType::type_id: {
std::shared_ptr<::arrow::Decimal128Array> scol = std::static_pointer_cast<::arrow::DecimalArray>(src);
auto vals = (::arrow::Decimal128 *)scol->raw_values();
for (uint32_t i = 0; i < len; ++i) {
::arrow::Status status = vals[i].ToInteger(dest->get_nth<int64_t>(offset + i));
if (!status.ok()) {
PSP_COMPLAIN_AND_ABORT(status.message());
};
}
} break;
case ::arrow::BooleanType::type_id: {
auto scol = std::static_pointer_cast<::arrow::BooleanArray>(src);
const uint8_t* null_bitmap = scol->values()->data();
for (uint32_t i = 0; i < len; ++i) {
std::uint8_t elem = null_bitmap[i / 8];
bool v = elem & (1 << (i % 8));
dest->set_nth<bool>(i, v);
dest->set_nth<bool>(offset + i, v);
}
} break;
default: {
PSP_COMPLAIN_AND_ABORT(src->type()->name());
std::cerr << "Unknown column type '" << src->type()->name() << "'" << std::endl;
}
}
}
Expand All @@ -259,33 +290,36 @@ namespace arrow {
ArrowLoader::fill_column(t_data_table& tbl, std::shared_ptr<t_column> col,
const std::string& name, std::int32_t cidx, t_dtype type, std::string& raw_type,
bool is_update) {
uint32_t len = m_batches[0]->num_rows();
std::shared_ptr<::arrow::Array> array = m_batches[0]->GetColumnByName(name);
copy_array(col, array, len);
int64_t offset = 0;
std::shared_ptr<::arrow::ChunkedArray> carray = m_table->GetColumnByName(name);

// Fill validity bitmap
std::int64_t null_count = array->null_count();
if (null_count == 0) {
col->valid_raw_fill();
} else {
const uint8_t* null_bitmap = array->null_bitmap_data();
for(auto i = 0; i < carray->num_chunks(); ++i) {
std::shared_ptr<::arrow::Array> array = carray->chunk(i);
int64_t len = array->length();

copy_array(col, array, offset, len);

// Fill validity bitmap
std::int64_t null_count = array->null_count();
if (null_count == 0) {
col->valid_raw_fill();
} else {
const uint8_t* null_bitmap = array->null_bitmap_data();

// arrow packs bools into a bitmap
for (uint32_t i = 0; i < len; ++i) {
std::uint8_t elem = null_bitmap[i / 8];
bool v = elem & (1 << (i % 8));
col->set_valid(i, v);
// arrow packs bools into a bitmap
for (uint32_t i = 0; i < len; ++i) {
std::uint8_t elem = null_bitmap[i / 8];
bool v = elem & (1 << (i % 8));
col->set_valid(offset + i, v);
}
}
offset += len;
}
}

std::uint32_t
ArrowLoader::num_rows() const {
std::uint32_t row_count = 0;
for (auto batch : m_batches) {
row_count += batch->num_rows();
}
return row_count;
return m_table->num_rows();
}

std::vector<std::string>
Expand Down
2 changes: 1 addition & 1 deletion cpp/perspective/src/include/perspective/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace arrow {
const std::string& name, std::int32_t cidx, t_dtype type, std::string& raw_type,
bool is_update);

std::vector<std::shared_ptr<::arrow::RecordBatch>> m_batches;
std::shared_ptr<::arrow::Table> m_table;
std::vector<std::string> m_names;
std::vector<t_dtype> m_types;
};
Expand Down
2 changes: 1 addition & 1 deletion docker/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ RUN ln -s /usr/local/lib/libboost_python37.so /usr/local/lib/libboost_python.so
RUN ln -s /usr/local/lib/libboost_numpy37.so /usr/local/lib/libboost_numpy.so

RUN python3 -m pip install codecov nose2 mock flake8 pytest pytest-cov traitlets ipywidgets faker psutil zerorpc
RUN python3 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow==0.14.1'
RUN python3 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow==0.15.0'

RUN npm install --global yarn
RUN yarn --version
Expand Down