Skip to content
This repository was archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-610] Shrink hashmap to use less memory (#815)
Browse files Browse the repository at this point in the history
* Shrink hashmap to use less memory

This patch addes shrink on hashmap after the map is built.

This patch also increased the memory limit for hashmap.

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* remove debug log

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix format

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix test

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan authored Apr 8, 2022
1 parent f2bfb3b commit 8cf8ebe
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import scala.util.Random

import org.apache.spark.AccumulatorSuite
import org.apache.spark.SparkConf
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.test.SharedSparkSession
Expand All @@ -32,6 +33,11 @@ import org.apache.spark.sql.types._
class SortSuite extends SparkPlanTest with SharedSparkSession {
import testImplicits.newProductEncoder
import testImplicits.localSeqToDatasetHolder

override protected def sparkConf: SparkConf = {
val conf = super.sparkConf
conf.set("spark.memory.offHeap.size", String.valueOf("5000m"))
}

test("basic sorting using ExternalSort") {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class HashRelationKernel::Impl {
}
}
}
hash_relation_->Minimize();
return arrow::Status::OK();
}

Expand Down
45 changes: 0 additions & 45 deletions native-sql-engine/cpp/src/codegen/common/hash_relation.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,6 @@ class HashRelation {
arrow::Status AppendKeyColumn(std::shared_ptr<arrow::Array> in,
const std::vector<std::shared_ptr<UnsafeArray>>& payloads,
bool semi = false) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
// This Key should be Hash Key
auto typed_array = std::make_shared<ArrayType>(in);
std::shared_ptr<UnsafeRow> payload = std::make_shared<UnsafeRow>(payloads.size());
Expand Down Expand Up @@ -202,9 +199,6 @@ class HashRelation {
arrow::Status AppendKeyColumn(std::shared_ptr<arrow::Array> in,
std::shared_ptr<KeyArrayType> original_key,
bool semi = false) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
// This Key should be Hash Key
auto typed_array = std::make_shared<ArrayType>(in);
if (original_key->null_count() == 0) {
Expand Down Expand Up @@ -236,9 +230,6 @@ class HashRelation {
arrow::Status AppendKeyColumn(std::shared_ptr<arrow::Array> in,
std::shared_ptr<StringArray> original_key,
bool semi = false) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
// This Key should be Hash Key
auto typed_array = std::make_shared<ArrayType>(in);
if (original_key->null_count() == 0) {
Expand Down Expand Up @@ -272,28 +263,19 @@ class HashRelation {
template <typename CType,
typename std::enable_if_t<is_number_or_decimal_type<CType>::value>* = nullptr>
int Get(int32_t v, CType payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
auto res = safeLookup(hash_table_, payload, v, &arrayid_list_);
if (res == -1) return -1;

return 0;
}

int Get(int32_t v, std::string payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
auto res = safeLookup(hash_table_, payload.data(), payload.size(), v, &arrayid_list_);
if (res == -1) return -1;
return 0;
}

int Get(int32_t v, std::shared_ptr<UnsafeRow> payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
auto res = safeLookup(hash_table_, payload, v, &arrayid_list_);
if (res == -1) return -1;
return 0;
Expand All @@ -302,32 +284,20 @@ class HashRelation {
template <typename CType,
typename std::enable_if_t<is_number_or_decimal_type<CType>::value>* = nullptr>
int IfExists(int32_t v, CType payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
return safeLookup(hash_table_, payload, v);
}

int IfExists(int32_t v, std::string payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
return safeLookup(hash_table_, payload.data(), payload.size(), v);
}

int IfExists(int32_t v, std::shared_ptr<UnsafeRow> payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
return safeLookup(hash_table_, payload, v);
}

template <typename CType,
typename std::enable_if_t<is_number_or_decimal_type<CType>::value>* = nullptr>
int Get(CType payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
if (sizeof(payload) <= 8) {
if (has_cached_ && *(CType*)recent_cached_key_ == payload) {
return recent_cached_key_probe_res_;
Expand All @@ -347,9 +317,6 @@ class HashRelation {
}

int Get(std::string payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
int32_t v = hash32(payload, true);
auto res = safeLookup(hash_table_, payload.data(), payload.size(), v, &arrayid_list_);
if (res == -1) return -1;
Expand All @@ -359,17 +326,11 @@ class HashRelation {
template <typename CType,
typename std::enable_if_t<is_number_alike<CType>::value>* = nullptr>
int IfExists(CType payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
int32_t v = hash32(payload, true);
return safeLookup(hash_table_, payload, v);
}

int IfExists(std::string payload) {
if (hash_table_ == nullptr) {
throw std::runtime_error("HashRelation Get failed, hash_table is null.");
}
int32_t v = hash32(payload, true);
return safeLookup(hash_table_, payload.data(), payload.size(), v);
}
Expand Down Expand Up @@ -461,7 +422,6 @@ class HashRelation {

arrow::Status Insert(int32_t v, std::shared_ptr<UnsafeRow> payload, uint32_t array_id,
uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!append(hash_table_, payload.get(), v, (char*)&index, sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
Expand All @@ -471,7 +431,6 @@ class HashRelation {

template <typename CType>
arrow::Status Insert(int32_t v, CType payload, uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!append(hash_table_, payload, v, (char*)&index, sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
Expand All @@ -481,7 +440,6 @@ class HashRelation {

arrow::Status Insert(int32_t v, const char* payload, size_t payload_len,
uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!append(hash_table_, payload, payload_len, v, (char*)&index,
sizeof(ArrayItemIndex))) {
Expand All @@ -492,7 +450,6 @@ class HashRelation {

arrow::Status InsertSkipDup(int32_t v, std::shared_ptr<UnsafeRow> payload,
uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!appendNewKey(hash_table_, payload.get(), v, (char*)&index,
sizeof(ArrayItemIndex))) {
Expand All @@ -503,7 +460,6 @@ class HashRelation {

template <typename CType>
arrow::Status InsertSkipDup(int32_t v, CType payload, uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!appendNewKey(hash_table_, payload, v, (char*)&index, sizeof(ArrayItemIndex))) {
return arrow::Status::CapacityError("Insert to HashMap failed.");
Expand All @@ -513,7 +469,6 @@ class HashRelation {

arrow::Status InsertSkipDup(int32_t v, const char* payload, size_t payload_len,
uint32_t array_id, uint32_t id) {
assert(hash_table_ != nullptr);
auto index = ArrayItemIndex(array_id, id);
if (!appendNewKey(hash_table_, payload, payload_len, v, (char*)&index,
sizeof(ArrayItemIndex))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "codegen/arrow_compute/ext/array_item_index.h"
#include "third_party/row_wise_memory/unsafe_row.h"

#define MAX_HASH_MAP_CAPACITY (1 << 29) // must be power of 2
#define MAX_HASH_MAP_CAPACITY (1 << 30) // must be power of 2

#define HASH_NEW_KEY -1
#define HASH_FOUND_MATCH -2
Expand Down

0 comments on commit 8cf8ebe

Please sign in to comment.