Skip to content

Commit

Permalink
PARQUET-499: Complete PlainEncoder implementation for all primitive t…
Browse files Browse the repository at this point in the history
…ypes and test end to end

Includes tests for end to end plain encoding and decoding of all data types.

Author: Deepak Majeti <deepak.majeti@hp.com>

Closes apache#52 from majetideepak/PARQUET-499 and squashes the following commits:

897859b [Deepak Majeti] minor edits
2067ef5 [Deepak Majeti] renamed a test
dfb19f8 [Deepak Majeti] templated all types
059967a [Deepak Majeti] templated int and real tests
da86d4d [Deepak Majeti] minor fix
4976bec [Deepak Majeti] include pruning
d0f8ab9 [Deepak Majeti] addressed comments
07257c0 [Deepak Majeti] minor format edits
6ca0b30 [Deepak Majeti] fixed formatting and casting issues
9815062 [Deepak Majeti] PARQUET-499

Change-Id: I45b2811e9abc8cad1277a533280d7fc3727d13e7
  • Loading branch information
Deepak Majeti authored and julienledem committed Feb 18, 2016
1 parent ce3621f commit bee7c9a
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 19 deletions.
164 changes: 156 additions & 8 deletions cpp/src/parquet/encodings/plain-encoding-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "parquet/encodings/plain-encoding.h"
#include "parquet/types.h"
#include "parquet/schema/types.h"
#include "parquet/util/bit-util.h"
#include "parquet/util/output.h"
#include "parquet/util/test-common.h"
Expand All @@ -35,11 +36,10 @@ namespace parquet_cpp {

namespace test {

TEST(BooleanTest, TestEncodeDecode) {
TEST(VectorBooleanTest, TestEncodeDecode) {
// PARQUET-454

size_t nvalues = 100;
size_t nbytes = BitUtil::RoundUp(nvalues, 8) / 8;
size_t nvalues = 10000;
size_t nbytes = BitUtil::Ceil(nvalues, 8);

// seed the prng so failure is deterministic
vector<bool> draws = flip_coins_seed(nvalues, 0.5, 0);
Expand All @@ -50,23 +50,171 @@ TEST(BooleanTest, TestEncodeDecode) {
InMemoryOutputStream dst;
encoder.Encode(draws, nvalues, &dst);

std::vector<uint8_t> encode_buffer;
vector<uint8_t> encode_buffer;
dst.Transfer(&encode_buffer);

ASSERT_EQ(nbytes, encode_buffer.size());

std::vector<uint8_t> decode_buffer(nbytes);
vector<uint8_t> decode_buffer(nbytes);
const uint8_t* decode_data = &decode_buffer[0];

decoder.SetData(nvalues, &encode_buffer[0], encode_buffer.size());
size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
ASSERT_EQ(nvalues, values_decoded);

for (size_t i = 0; i < nvalues; ++i) {
ASSERT_EQ(BitUtil::GetArrayBit(decode_data, i), draws[i]) << i;
ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i;
}
}

} // namespace test
template<typename T, int TYPE>
class EncodeDecode{
public:
void init_data(int nvalues) {
num_values_ = nvalues;
input_bytes_.resize(num_values_ * sizeof(T));
output_bytes_.resize(num_values_ * sizeof(T));
draws_ = reinterpret_cast<T*>(input_bytes_.data());
decode_buf_ = reinterpret_cast<T*>(output_bytes_.data());
}

void generate_data() {
// seed the prng so failure is deterministic
random_numbers(num_values_, 0.5, draws_);
}

void encode_decode(ColumnDescriptor *d) {
PlainEncoder<TYPE> encoder(d);
PlainDecoder<TYPE> decoder(d);

InMemoryOutputStream dst;
encoder.Encode(draws_, num_values_, &dst);

dst.Transfer(&encode_buffer_);

decoder.SetData(num_values_, &encode_buffer_[0], encode_buffer_.size());
size_t values_decoded = decoder.Decode(decode_buf_, num_values_);
ASSERT_EQ(num_values_, values_decoded);
}

void verify_results() {
for (size_t i = 0; i < num_values_; ++i) {
ASSERT_EQ(draws_[i], decode_buf_[i]) << i;
}
}

void execute(int nvalues, ColumnDescriptor *d) {
init_data(nvalues);
generate_data();
encode_decode(d);
verify_results();
}

private:
int num_values_;
T* draws_;
T* decode_buf_;
vector<uint8_t> input_bytes_;
vector<uint8_t> output_bytes_;
vector<uint8_t> data_buffer_;
vector<uint8_t> encode_buffer_;
};

template<>
void EncodeDecode<bool, Type::BOOLEAN>::generate_data() {
// seed the prng so failure is deterministic
random_bools(num_values_, 0.5, 0, draws_);
}

template<>
void EncodeDecode<Int96, Type::INT96>::verify_results() {
for (size_t i = 0; i < num_values_; ++i) {
ASSERT_EQ(draws_[i].value[0], decode_buf_[i].value[0]) << i;
ASSERT_EQ(draws_[i].value[1], decode_buf_[i].value[1]) << i;
ASSERT_EQ(draws_[i].value[2], decode_buf_[i].value[2]) << i;
}
}

template<>
void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::generate_data() {
// seed the prng so failure is deterministic
int max_byte_array_len = 12 + sizeof(uint32_t);
size_t nbytes = num_values_ * max_byte_array_len;
data_buffer_.resize(nbytes);
random_byte_array(num_values_, 0.5, data_buffer_.data(), draws_,
max_byte_array_len);
}

template<>
void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::verify_results() {
for (size_t i = 0; i < num_values_; ++i) {
ASSERT_EQ(draws_[i].len, decode_buf_[i].len) << i;
ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, draws_[i].len)) << i;
}
}

static int flba_length = 8;
template<>
void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::generate_data() {
// seed the prng so failure is deterministic
size_t nbytes = num_values_ * flba_length;
data_buffer_.resize(nbytes);
ASSERT_EQ(nbytes, data_buffer_.size());
random_fixed_byte_array(num_values_, 0.5, data_buffer_.data(), flba_length, draws_);
}

template<>
void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::verify_results() {
for (size_t i = 0; i < 1000; ++i) {
ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, flba_length)) << i;
}
}

int num_values = 10000;

TEST(BoolEncodeDecode, TestEncodeDecode) {
EncodeDecode<bool, Type::BOOLEAN> obj;
obj.execute(num_values, nullptr);
}

TEST(Int32EncodeDecode, TestEncodeDecode) {
EncodeDecode<int32_t, Type::INT32> obj;
obj.execute(num_values, nullptr);
}

TEST(Int64EncodeDecode, TestEncodeDecode) {
EncodeDecode<int64_t, Type::INT64> obj;
obj.execute(num_values, nullptr);
}

TEST(FloatEncodeDecode, TestEncodeDecode) {
EncodeDecode<float, Type::FLOAT> obj;
obj.execute(num_values, nullptr);
}

TEST(DoubleEncodeDecode, TestEncodeDecode) {
EncodeDecode<double, Type::DOUBLE> obj;
obj.execute(num_values, nullptr);
}

TEST(Int96EncodeDecode, TestEncodeDecode) {
EncodeDecode<Int96, Type::INT96> obj;
obj.execute(num_values, nullptr);
}

TEST(BAEncodeDecode, TestEncodeDecode) {
EncodeDecode<ByteArray, Type::BYTE_ARRAY> obj;
obj.execute(num_values, nullptr);
}

TEST(FLBAEncodeDecode, TestEncodeDecode) {
schema::NodePtr node;
node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL,
Type::FIXED_LEN_BYTE_ARRAY, flba_length, LogicalType::UTF8);
ColumnDescriptor d(node, 0, 0);
EncodeDecode<FixedLenByteArray, Type::FIXED_LEN_BYTE_ARRAY> obj;
obj.execute(num_values, &d);
}

} // namespace test
} // namespace parquet_cpp
38 changes: 29 additions & 9 deletions cpp/src/parquet/encodings/plain-encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,26 @@ inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) {
}

// Template specialization for BYTE_ARRAY
// BA does not currently own its data
// the lifetime is tied to the input stream
template <>
inline int PlainDecoder<Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
int max_values) {
max_values = std::min(max_values, num_values_);
for (int i = 0; i < max_values; ++i) {
buffer[i].len = *reinterpret_cast<const uint32_t*>(data_);
if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException();
uint32_t len = buffer[i].len = *reinterpret_cast<const uint32_t*>(data_);
if (len_ < sizeof(uint32_t) + len) ParquetException::EofException();
buffer[i].ptr = data_ + sizeof(uint32_t);
data_ += sizeof(uint32_t) + buffer[i].len;
len_ -= sizeof(uint32_t) + buffer[i].len;
data_ += sizeof(uint32_t) + len;
len_ -= sizeof(uint32_t) + len;
}
num_values_ -= max_values;
return max_values;
}

// Template specialization for FIXED_LEN_BYTE_ARRAY
// FLBA does not currently own its data
// the lifetime is tied to the input stream
template <>
inline int PlainDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Decode(
FixedLenByteArray* buffer, int max_values) {
Expand Down Expand Up @@ -161,11 +165,21 @@ class PlainEncoder<Type::BOOLEAN> : public Encoder<Type::BOOLEAN> {
Encoder<Type::BOOLEAN>(descr, Encoding::PLAIN) {}

virtual void Encode(const bool* src, int num_values, OutputStream* dst) {
throw ParquetException("this API for encoding bools not implemented");
size_t bytes_required = BitUtil::Ceil(num_values, 8);
std::vector<uint8_t> tmp_buffer(bytes_required);

BitWriter bit_writer(&tmp_buffer[0], bytes_required);
for (size_t i = 0; i < num_values; ++i) {
bit_writer.PutValue(src[i], 1);
}
bit_writer.Flush();

// Write the result to the output stream
dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
}

void Encode(const std::vector<bool>& src, int num_values, OutputStream* dst) {
size_t bytes_required = BitUtil::RoundUp(num_values, 8) / 8;
size_t bytes_required = BitUtil::Ceil(num_values, 8);

// TODO(wesm)
// Use a temporary buffer for now and copy, because the BitWriter is not
Expand Down Expand Up @@ -193,15 +207,21 @@ inline void PlainEncoder<TYPE>::Encode(const T* buffer, int num_values,
template <>
inline void PlainEncoder<Type::BYTE_ARRAY>::Encode(const ByteArray* src,
int num_values, OutputStream* dst) {
ParquetException::NYI("byte array encoding");
for (size_t i = 0; i < num_values; ++i) {
// Write the result to the output stream
dst->Write(reinterpret_cast<const uint8_t*>(&src[i].len), sizeof(uint32_t));
dst->Write(reinterpret_cast<const uint8_t*>(src[i].ptr), src[i].len);
}
}

template <>
inline void PlainEncoder<Type::FIXED_LEN_BYTE_ARRAY>::Encode(
const FixedLenByteArray* src, int num_values, OutputStream* dst) {
ParquetException::NYI("FLBA encoding");
for (size_t i = 0; i < num_values; ++i) {
// Write the result to the output stream
dst->Write(reinterpret_cast<const uint8_t*>(src[i].ptr), descr_->type_length());
}
}

} // namespace parquet_cpp

#endif
82 changes: 80 additions & 2 deletions cpp/src/parquet/util/test-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
#define PARQUET_UTIL_TEST_COMMON_H

#include <iostream>
#include <limits>
#include <random>
#include <vector>

#include "parquet/types.h"

using std::vector;

namespace parquet_cpp {
Expand Down Expand Up @@ -81,7 +84,6 @@ static inline vector<bool> flip_coins_seed(size_t n, double p, uint32_t seed) {
return draws;
}


static inline vector<bool> flip_coins(size_t n, double p) {
std::random_device rd;
std::mt19937 gen(rd());
Expand All @@ -104,8 +106,84 @@ void random_bytes(int n, uint32_t seed, std::vector<uint8_t>* out) {
}
}

} // namespace test
template <typename T>
void random_numbers(int n, uint32_t seed, T* out) {
std::mt19937 gen(seed);
std::uniform_real_distribution<T> d(std::numeric_limits<T>::lowest(),
std::numeric_limits<T>::max());
for (int i = 0; i < n; ++i) {
out[i] = d(gen);
}
}

void random_bools(int n, double p, uint32_t seed, bool* out) {
std::mt19937 gen(seed);
std::bernoulli_distribution d(p);
for (int i = 0; i < n; ++i) {
out[i] = d(gen);
}
}

template <>
void random_numbers(int n, uint32_t seed, int32_t* out) {
std::mt19937 gen(seed);
std::uniform_int_distribution<int32_t> d(std::numeric_limits<int32_t>::lowest(),
std::numeric_limits<int32_t>::max());
for (int i = 0; i < n; ++i) {
out[i] = d(gen);
}
}

template <>
void random_numbers(int n, uint32_t seed, int64_t* out) {
std::mt19937 gen(seed);
std::uniform_int_distribution<int64_t> d(std::numeric_limits<int64_t>::lowest(),
std::numeric_limits<int64_t>::max());
for (int i = 0; i < n; ++i) {
out[i] = d(gen);
}
}

template <>
void random_numbers(int n, uint32_t seed, Int96* out) {
std::mt19937 gen(seed);
std::uniform_int_distribution<uint32_t> d(std::numeric_limits<uint32_t>::lowest(),
std::numeric_limits<uint32_t>::max());
for (int i = 0; i < n; ++i) {
out[i].value[0] = d(gen);
out[i].value[1] = d(gen);
out[i].value[2] = d(gen);
}
}

void random_fixed_byte_array(int n, uint32_t seed, uint8_t *buf, int len,
FLBA* out) {
std::mt19937 gen(seed);
std::uniform_int_distribution<int> d(0, 255);
for (int i = 0; i < n; ++i) {
out[i].ptr = buf;
for (int j = 0; j < len; ++j) {
buf[j] = d(gen) & 0xFF;
}
buf += len;
}
}

void random_byte_array(int n, uint32_t seed, uint8_t *buf,
ByteArray* out, int max_size) {
std::mt19937 gen(seed);
std::uniform_int_distribution<int> d1(0, max_size);
std::uniform_int_distribution<int> d2(0, 255);
for (int i = 0; i < n; ++i) {
out[i].len = d1(gen);
out[i].ptr = buf;
for (int j = 0; j < out[i].len; ++j) {
buf[j] = d2(gen) & 0xFF;
}
buf += out[i].len;
}
}
} // namespace test
} // namespace parquet_cpp

#endif // PARQUET_UTIL_TEST_COMMON_H

0 comments on commit bee7c9a

Please sign in to comment.