Skip to content

Commit

Permalink
Merge pull request #772 from finos/arrow-fixes
Browse files Browse the repository at this point in the history
Arrow fixes
  • Loading branch information
texodus authored Oct 20, 2019
2 parents 403e71f + 6f0f0b7 commit d100066
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 62 deletions.
154 changes: 94 additions & 60 deletions cpp/perspective/src/cpp/arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <perspective/arrow.h>

#include <arrow/api.h>
#include <arrow/util/decimal.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/reader.h>

Expand All @@ -31,7 +32,7 @@ namespace arrow {
return DTYPE_INT16;
} else if (src == "int32") {
return DTYPE_INT32;
} else if (src == "int64") {
} else if (src == "decimal" || src == "decimal128" || src == "int64") {
return DTYPE_INT64;
} else if (src == "float") {
return DTYPE_FLOAT32;
Expand All @@ -40,8 +41,7 @@ namespace arrow {
} else if (src == "timestamp") {
return DTYPE_TIME;
}
std::cout << "No type valid type found for: " << src << std::endl;
PSP_COMPLAIN_AND_ABORT("No type valid type found");
std::cerr << "Unknown column type '" << src << "'" << std::endl;
return DTYPE_STR;
}

Expand All @@ -51,38 +51,59 @@ namespace arrow {
void
ArrowLoader::initialize(const uintptr_t ptr, const uint32_t length) {
io::BufferReader buffer_reader(reinterpret_cast<const std::uint8_t*>(ptr), length);
std::shared_ptr<ipc::RecordBatchFileReader> batch_reader;
auto status = ipc::RecordBatchFileReader::Open(&buffer_reader, &batch_reader);

if (!status.ok()) {
PSP_COMPLAIN_AND_ABORT(status.message());
if (std::memcmp("ARROW1", (const void *)ptr, 6) == 0) {
std::shared_ptr<ipc::RecordBatchFileReader> batch_reader;
::arrow::Status status = ipc::RecordBatchFileReader::Open(&buffer_reader, &batch_reader);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
} else {
std::vector<std::shared_ptr<RecordBatch>> batches;
auto num_batches = batch_reader->num_record_batches();
for (int i = 0; i < num_batches; ++i) {
std::shared_ptr<RecordBatch> chunk;
status = batch_reader->ReadRecordBatch(i, &chunk);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
}
batches.push_back(chunk);
}
status = ::arrow::Table::FromRecordBatches(batches, &m_table);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
};
};
} else {
auto num_batches = batch_reader->num_record_batches();
for (int i = 0; i < num_batches; ++i) {
std::shared_ptr<RecordBatch> chunk;
auto status = batch_reader->ReadRecordBatch(i, &chunk);
std::shared_ptr<ipc::RecordBatchReader> batch_reader;
::arrow::Status status = ipc::RecordBatchStreamReader::Open(&buffer_reader, &batch_reader);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
} else {
status = batch_reader->ReadAll(&m_table);
if (!status.ok()) {
std::cerr << status.message() << std::endl;
PSP_COMPLAIN_AND_ABORT(status.message());
}
m_batches.push_back(chunk);
};
}
}

// Get names and schema
std::shared_ptr<Schema> schema = m_batches[0]->schema();
std::vector<std::shared_ptr<Field>> fields = schema->fields();
std::shared_ptr<Schema> schema = m_table->schema();
std::vector<std::shared_ptr<Field>> fields = schema->fields();

for (auto field : fields) {
m_names.push_back(field->name());
m_types.push_back(convert_type(field->type()->name()));
}
for (auto field : fields) {
m_names.push_back(field->name());
m_types.push_back(convert_type(field->type()->name()));
}
}

void
ArrowLoader::fill_table(t_data_table& tbl, const std::string& index, std::uint32_t offset,
std::uint32_t limit, bool is_update) {
bool implicit_index = false;
std::shared_ptr<Schema> schema = m_batches[0]->schema();
std::shared_ptr<Schema> schema = m_table->schema();
std::vector<std::shared_ptr<Field>> fields = schema->fields();

for (long unsigned int cidx = 0; cidx < m_names.size(); ++cidx) {
Expand Down Expand Up @@ -125,17 +146,17 @@ namespace arrow {
template <typename T, typename V>
void
iter_col_copy(std::shared_ptr<t_column> dest, std::shared_ptr<::arrow::Array> src,
const uint32_t len) {
const int64_t offset, const int64_t len) {
std::shared_ptr<T> scol = std::static_pointer_cast<T>(src);
const typename T::value_type* vals = scol->raw_values();
for (uint32_t i = 0; i < len; i++) {
dest->set_nth<V>(i, static_cast<V>(vals[i]));
dest->set_nth<V>(offset + i, static_cast<V>(vals[i]));
}
}

void
copy_array(std::shared_ptr<t_column> dest, std::shared_ptr<::arrow::Array> src,
const uint32_t len) {
const int64_t offset, const int64_t len) {
switch (src->type()->id()) {
case ::arrow::DictionaryType::type_id: {
auto scol = std::static_pointer_cast<::arrow::DictionaryArray>(src);
Expand All @@ -157,14 +178,14 @@ namespace arrow {
auto indices = scol->indices();
switch (indices->type()->id()) {
case ::arrow::Int8Type::type_id: {
iter_col_copy<::arrow::Int8Array, uint32_t>(dest, indices, len);
iter_col_copy<::arrow::Int8Array, uint32_t>(dest, indices, offset, len);
} break;
case ::arrow::Int16Type::type_id: {
iter_col_copy<::arrow::Int16Array, uint32_t>(dest, indices, len);
iter_col_copy<::arrow::Int16Array, uint32_t>(dest, indices, offset, len);
} break;
case ::arrow::Int32Type::type_id: {
auto sindices = std::static_pointer_cast<::arrow::Int32Array>(indices);
std::memcpy(dest->get_nth<std::int32_t>(0),
std::memcpy(dest->get_nth<std::int32_t>(offset),
(void*)sindices->raw_values(), len * 4);
} break;
default:
Expand All @@ -184,24 +205,24 @@ namespace arrow {
std::int32_t bidx = offsets[i];
std::size_t es = offsets[i + 1] - bidx;
elem.assign(reinterpret_cast<const char*>(values) + bidx, es);
dest->set_nth(i, elem);
dest->set_nth(offset + i, elem);
}
} break;
case ::arrow::Int8Type::type_id: {
auto scol = std::static_pointer_cast<::arrow::Int8Array>(src);
std::memcpy(dest->get_nth<std::int8_t>(0), (void*)scol->raw_values(), len);
std::memcpy(dest->get_nth<std::int8_t>(offset), (void*)scol->raw_values(), len);
} break;
case ::arrow::Int16Type::type_id: {
auto scol = std::static_pointer_cast<::arrow::Int16Array>(src);
std::memcpy(dest->get_nth<std::int16_t>(0), (void*)scol->raw_values(), len * 2);
std::memcpy(dest->get_nth<std::int16_t>(offset), (void*)scol->raw_values(), len * 2);
} break;
case ::arrow::Int32Type::type_id: {
auto scol = std::static_pointer_cast<::arrow::Int32Array>(src);
std::memcpy(dest->get_nth<std::int32_t>(0), (void*)scol->raw_values(), len * 4);
std::memcpy(dest->get_nth<std::int32_t>(offset), (void*)scol->raw_values(), len * 4);
} break;
case ::arrow::Int64Type::type_id: {
auto scol = std::static_pointer_cast<::arrow::Int64Array>(src);
std::memcpy(dest->get_nth<std::int64_t>(0), (void*)scol->raw_values(), len * 8);
std::memcpy(dest->get_nth<std::int64_t>(offset), (void*)scol->raw_values(), len * 8);
} break;
case ::arrow::TimestampType::type_id: {
std::shared_ptr<::arrow::TimestampType> tunit
Expand All @@ -210,47 +231,57 @@ namespace arrow {
switch (tunit->unit()) {
case ::arrow::TimeUnit::MILLI: {
std::memcpy(
dest->get_nth<double>(0), (void*)scol->raw_values(), len * 8);
dest->get_nth<double>(offset), (void*)scol->raw_values(), len * 8);
} break;
case ::arrow::TimeUnit::NANO: {
const int64_t* vals = scol->raw_values();
for (uint32_t i = 0; i < len; i++) {
dest->set_nth<int64_t>(i, vals[i] / 1000000);
dest->set_nth<int64_t>(offset + i, vals[i] / 1000000);
}
} break;
case ::arrow::TimeUnit::MICRO: {
const int64_t* vals = scol->raw_values();
for (uint32_t i = 0; i < len; i++) {
dest->set_nth<int64_t>(i, vals[i] / 1000);
dest->set_nth<int64_t>(offset + i, vals[i] / 1000);
}
} break;
case ::arrow::TimeUnit::SECOND: {
const int64_t* vals = scol->raw_values();
for (uint32_t i = 0; i < len; i++) {
dest->set_nth<int64_t>(i, vals[i] * 1000);
dest->set_nth<int64_t>(offset + i, vals[i] * 1000);
}
} break;
}
} break;
case ::arrow::FloatType::type_id: {
auto scol = std::static_pointer_cast<::arrow::FloatArray>(src);
std::memcpy(dest->get_nth<float>(0), (void*)scol->raw_values(), len * 4);
std::memcpy(dest->get_nth<float>(offset), (void*)scol->raw_values(), len * 4);
} break;
case ::arrow::DoubleType::type_id: {
auto scol = std::static_pointer_cast<::arrow::DoubleArray>(src);
std::memcpy(dest->get_nth<double>(0), (void*)scol->raw_values(), len * 8);
std::memcpy(dest->get_nth<double>(offset), (void*)scol->raw_values(), len * 8);
} break;
case ::arrow::DecimalType::type_id: {
std::shared_ptr<::arrow::Decimal128Array> scol = std::static_pointer_cast<::arrow::DecimalArray>(src);
auto vals = (::arrow::Decimal128 *)scol->raw_values();
for (uint32_t i = 0; i < len; ++i) {
::arrow::Status status = vals[i].ToInteger(dest->get_nth<int64_t>(offset + i));
if (!status.ok()) {
PSP_COMPLAIN_AND_ABORT(status.message());
};
}
} break;
case ::arrow::BooleanType::type_id: {
auto scol = std::static_pointer_cast<::arrow::BooleanArray>(src);
const uint8_t* null_bitmap = scol->values()->data();
for (uint32_t i = 0; i < len; ++i) {
std::uint8_t elem = null_bitmap[i / 8];
bool v = elem & (1 << (i % 8));
dest->set_nth<bool>(i, v);
dest->set_nth<bool>(offset + i, v);
}
} break;
default: {
PSP_COMPLAIN_AND_ABORT(src->type()->name());
std::cerr << "Unknown column type '" << src->type()->name() << "'" << std::endl;
}
}
}
Expand All @@ -259,33 +290,36 @@ namespace arrow {
ArrowLoader::fill_column(t_data_table& tbl, std::shared_ptr<t_column> col,
const std::string& name, std::int32_t cidx, t_dtype type, std::string& raw_type,
bool is_update) {
uint32_t len = m_batches[0]->num_rows();
std::shared_ptr<::arrow::Array> array = m_batches[0]->GetColumnByName(name);
copy_array(col, array, len);
int64_t offset = 0;
std::shared_ptr<::arrow::ChunkedArray> carray = m_table->GetColumnByName(name);

// Fill validity bitmap
std::int64_t null_count = array->null_count();
if (null_count == 0) {
col->valid_raw_fill();
} else {
const uint8_t* null_bitmap = array->null_bitmap_data();
for(auto i = 0; i < carray->num_chunks(); ++i) {
std::shared_ptr<::arrow::Array> array = carray->chunk(i);
int64_t len = array->length();

copy_array(col, array, offset, len);

// Fill validity bitmap
std::int64_t null_count = array->null_count();
if (null_count == 0) {
col->valid_raw_fill();
} else {
const uint8_t* null_bitmap = array->null_bitmap_data();

// arrow packs bools into a bitmap
for (uint32_t i = 0; i < len; ++i) {
std::uint8_t elem = null_bitmap[i / 8];
bool v = elem & (1 << (i % 8));
col->set_valid(i, v);
// arrow packs bools into a bitmap
for (uint32_t i = 0; i < len; ++i) {
std::uint8_t elem = null_bitmap[i / 8];
bool v = elem & (1 << (i % 8));
col->set_valid(offset + i, v);
}
}
offset += len;
}
}

std::uint32_t
ArrowLoader::num_rows() const {
std::uint32_t row_count = 0;
for (auto batch : m_batches) {
row_count += batch->num_rows();
}
return row_count;
return m_table->num_rows();
}

std::vector<std::string>
Expand Down
2 changes: 1 addition & 1 deletion cpp/perspective/src/include/perspective/arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace arrow {
const std::string& name, std::int32_t cidx, t_dtype type, std::string& raw_type,
bool is_update);

std::vector<std::shared_ptr<::arrow::RecordBatch>> m_batches;
std::shared_ptr<::arrow::Table> m_table;
std::vector<std::string> m_names;
std::vector<t_dtype> m_types;
};
Expand Down
2 changes: 1 addition & 1 deletion docker/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ RUN ln -s /usr/local/lib/libboost_python37.so /usr/local/lib/libboost_python.so
RUN ln -s /usr/local/lib/libboost_numpy37.so /usr/local/lib/libboost_numpy.so

RUN python3 -m pip install codecov nose2 mock flake8 pytest pytest-cov traitlets ipywidgets faker psutil zerorpc
RUN python3 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow==0.14.1'
RUN python3 -m pip install 'numpy>=1.13.1' 'pandas>=0.22.0' 'pyarrow==0.15.0'

RUN npm install --global yarn
RUN yarn --version
Expand Down

0 comments on commit d100066

Please sign in to comment.