Skip to content

Commit

Permalink
[oap-native-sql] Add memory pool for JNI based reservation (apache#95)
Browse files Browse the repository at this point in the history
* [oap-native-sql] Add memory pool for JNI based reservation

* Fix for multi-threading

* Object oriented memory pool implementation

* nit

* Change block size to 8MB
  • Loading branch information
zhztheplayer authored Nov 6, 2020
1 parent 874c590 commit 863689b
Show file tree
Hide file tree
Showing 18 changed files with 656 additions and 38 deletions.
138 changes: 138 additions & 0 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <iostream> // IWYU pragma: keep
#include <limits>
#include <memory>
#include <mutex>

#include "arrow/status.h"
#include "arrow/util/logging.h" // IWYU pragma: keep
Expand Down Expand Up @@ -538,4 +539,141 @@ int64_t ProxyMemoryPool::max_memory() const { return impl_->max_memory(); }

std::string ProxyMemoryPool::backend_name() const { return impl_->backend_name(); }


ReservationListener::~ReservationListener() {}

ReservationListener::ReservationListener() {}

class ReservationListenableMemoryPool::ReservationListenableMemoryPoolImpl {
public:
explicit ReservationListenableMemoryPoolImpl(
MemoryPool* pool, std::shared_ptr<ReservationListener> listener, int64_t block_size)
: pool_(pool),
listener_(listener),
block_size_(block_size),
blocks_reserved_(0),
bytes_reserved_(0) {}

Status Allocate(int64_t size, uint8_t** out) {
RETURN_NOT_OK(UpdateReservation(size));
Status error = pool_->Allocate(size, out);
if (!error.ok()) {
RETURN_NOT_OK(UpdateReservation(-size));
return error;
}
return Status::OK();
}

Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
bool reserved = false;
int64_t diff = new_size - old_size;
if (new_size >= old_size) {
RETURN_NOT_OK(UpdateReservation(diff));
reserved = true;
}
Status error = pool_->Reallocate(old_size, new_size, ptr);
if (!error.ok()) {
if (reserved) {
RETURN_NOT_OK(UpdateReservation(-diff));
}
return error;
}
if (!reserved) {
RETURN_NOT_OK(UpdateReservation(diff));
}
return Status::OK();
}

void Free(uint8_t* buffer, int64_t size) {
pool_->Free(buffer, size);
// fixme currently method ::Free doesn't allow Status return
Status s = UpdateReservation(-size);
if (!s.ok()) {
ARROW_LOG(FATAL) << "Failed to update reservation while freeing bytes: "
<< s.message();
return;
}
}

Status UpdateReservation(int64_t diff) {
int64_t granted = Reserve(diff);
if (granted == 0) {
return Status::OK();
}
if (granted < 0) {
RETURN_NOT_OK(listener_->OnRelease(-granted));
return Status::OK();
}
RETURN_NOT_OK(listener_->OnReservation(granted));
return Status::OK();
}

int64_t Reserve(int64_t diff) {
std::lock_guard<std::mutex> lock(mutex_);
bytes_reserved_ += diff;
int64_t new_block_count;
if (bytes_reserved_ == 0) {
new_block_count = 0;
} else {
new_block_count = (bytes_reserved_ - 1) / block_size_ + 1;
}
int64_t bytes_granted = (new_block_count - blocks_reserved_) * block_size_;
blocks_reserved_ = new_block_count;
return bytes_granted;
}

int64_t bytes_allocated() { return pool_->bytes_allocated(); }

int64_t max_memory() { return pool_->max_memory(); }

std::string backend_name() { return pool_->backend_name(); }

std::shared_ptr<ReservationListener> get_listener() { return listener_; }

private:
MemoryPool* pool_;
std::shared_ptr<ReservationListener> listener_;
int64_t block_size_;
int64_t blocks_reserved_;
int64_t bytes_reserved_;
std::mutex mutex_;
};

ReservationListenableMemoryPool::~ReservationListenableMemoryPool() {}

ReservationListenableMemoryPool::ReservationListenableMemoryPool(
MemoryPool* pool, std::shared_ptr<ReservationListener> listener, int64_t block_size) {
impl_.reset(new ReservationListenableMemoryPoolImpl(pool, listener, block_size));
}

Status ReservationListenableMemoryPool::Allocate(int64_t size, uint8_t** out) {
return impl_->Allocate(size, out);
}

Status ReservationListenableMemoryPool::Reallocate(int64_t old_size, int64_t new_size,
uint8_t** ptr) {
return impl_->Reallocate(old_size, new_size, ptr);
}

void ReservationListenableMemoryPool::Free(uint8_t* buffer, int64_t size) {
return impl_->Free(buffer, size);
}

int64_t ReservationListenableMemoryPool::bytes_allocated() const {
return impl_->bytes_allocated();
}

int64_t ReservationListenableMemoryPool::max_memory() const {
return impl_->max_memory();
}

std::string ReservationListenableMemoryPool::backend_name() const {
return impl_->backend_name();
}

std::shared_ptr<ReservationListener> ReservationListenableMemoryPool::get_listener() {
return impl_->get_listener();
}


} // namespace arrow
37 changes: 37 additions & 0 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,43 @@ class ARROW_EXPORT ProxyMemoryPool : public MemoryPool {
std::unique_ptr<ProxyMemoryPoolImpl> impl_;
};

class ARROW_EXPORT ReservationListener {
public:
virtual ~ReservationListener();

virtual Status OnReservation(int64_t size) = 0;
virtual Status OnRelease(int64_t size) = 0;

protected:
ReservationListener();
};

class ARROW_EXPORT ReservationListenableMemoryPool : public MemoryPool {
public:
explicit ReservationListenableMemoryPool(MemoryPool* pool,
std::shared_ptr<ReservationListener> listener,
int64_t block_size = 8 * 1024 * 1024);
~ReservationListenableMemoryPool() override;

Status Allocate(int64_t size, uint8_t** out) override;

Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;

void Free(uint8_t* buffer, int64_t size) override;

int64_t bytes_allocated() const override;

int64_t max_memory() const override;

std::string backend_name() const override;

std::shared_ptr<ReservationListener> get_listener();

private:
class ReservationListenableMemoryPoolImpl;
std::unique_ptr<ReservationListenableMemoryPoolImpl> impl_;
};

/// Return a process-wide memory pool based on the system allocator.
ARROW_EXPORT MemoryPool* system_memory_pool();

Expand Down
Loading

0 comments on commit 863689b

Please sign in to comment.