Skip to content

Commit

Permalink
change mutex to recursive_mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijunfu committed Jun 13, 2019
1 parent 54bbdf4 commit 6249e98
Showing 1 changed file with 23 additions and 23 deletions.
46 changes: 23 additions & 23 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
/// A hash set to record the ids that users want to delete but still in use.
std::unordered_set<ObjectID> deletion_cache_;
/// A mutex which protects this class.
std::mutex client_mutex_;
std::recursive_mutex client_mutex_;

#ifdef PLASMA_CUDA
/// Cuda Device Manager.
Expand Down Expand Up @@ -343,7 +343,7 @@ uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
}

bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

const auto elem = objects_in_use_.find(object_id);
return (elem != objects_in_use_.end());
Expand Down Expand Up @@ -388,7 +388,7 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
std::shared_ptr<Buffer>* data, int device_num) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
<< data_size << " and metadata size " << metadata_size;
Expand Down Expand Up @@ -457,7 +457,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
const std::string& data,
const std::string& metadata) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
// Compute the object hash.
Expand Down Expand Up @@ -510,7 +510,7 @@ Status PlasmaClient::Impl::GetBuffers(
data + object->data_offset, object->data_size + object->metadata_size);
} else {
#ifdef PLASMA_CUDA
std::lock_guard<std::mutex> lock(gpu_mutex);
std::lock_guard<std::recursive_mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_ids[i]);
ARROW_CHECK(iter != gpu_object_map.end());
iter->second->client_count++;
Expand Down Expand Up @@ -576,7 +576,7 @@ Status PlasmaClient::Impl::GetBuffers(
data + object->data_offset, object->data_size + object->metadata_size);
} else {
#ifdef PLASMA_CUDA
std::lock_guard<std::mutex> lock(gpu_mutex);
std::lock_guard<std::recursive_mutex> lock(gpu_mutex);
auto handle = gpu_object_map.find(object_ids[i]);
if (handle == gpu_object_map.end()) {
std::shared_ptr<CudaContext> context;
Expand Down Expand Up @@ -615,7 +615,7 @@ Status PlasmaClient::Impl::GetBuffers(

Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,
int64_t timeout_ms, std::vector<ObjectBuffer>* out) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

const auto wrap_buffer = [=](const ObjectID& object_id,
const std::shared_ptr<Buffer>& buffer) {
Expand All @@ -628,7 +628,7 @@ Status PlasmaClient::Impl::Get(const std::vector<ObjectID>& object_ids,

Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects,
int64_t timeout_ms, ObjectBuffer* out) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

const auto wrap_buffer = [](const ObjectID& object_id,
const std::shared_ptr<Buffer>& buffer) { return buffer; };
Expand All @@ -646,7 +646,7 @@ Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) {
}

Status PlasmaClient::Impl::Release(const ObjectID& object_id) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

// If the client is already disconnected, ignore release requests.
if (store_conn_ < 0) {
Expand All @@ -657,7 +657,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {

#ifdef PLASMA_CUDA
if (object_entry->second->object.device_num != 0) {
std::lock_guard<std::mutex> lock(gpu_mutex);
std::lock_guard<std::recursive_mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_id);
ARROW_CHECK(iter != gpu_object_map.end());
if (--iter->second->client_count == 0) {
Expand Down Expand Up @@ -685,7 +685,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) {

// This method is used to query whether the plasma store contains an object.
Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

// Check if we already have a reference to the object.
if (objects_in_use_.count(object_id) > 0) {
Expand All @@ -705,7 +705,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object)
}

Status PlasmaClient::Impl::List(ObjectTable* objects) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
RETURN_NOT_OK(SendListRequest(store_conn_));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer));
Expand Down Expand Up @@ -784,7 +784,7 @@ uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t data
}

Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

// Make sure this client has a reference to the object before sending the
// request to Plasma.
Expand Down Expand Up @@ -812,7 +812,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) {
}

Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
auto object_entry = objects_in_use_.find(object_id);
ARROW_CHECK(object_entry != objects_in_use_.end())
<< "Plasma client called abort on an object without a reference to it";
Expand All @@ -828,7 +828,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {

#ifdef PLASMA_CUDA
if (object_entry->second->object.device_num != 0) {
std::lock_guard<std::mutex> lock(gpu_mutex);
std::lock_guard<std::recursive_mutex> lock(gpu_mutex);
auto iter = gpu_object_map.find(object_id);
ARROW_CHECK(iter != gpu_object_map.end());
ARROW_CHECK(iter->second->client_count == 1);
Expand All @@ -851,7 +851,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) {
}

Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

std::vector<ObjectID> not_in_use_ids;
for (auto& object_id : object_ids) {
Expand All @@ -876,7 +876,7 @@ Status PlasmaClient::Impl::Delete(const std::vector<ObjectID>& object_ids) {
}

Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

// Send a request to the store to evict objects.
RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes));
Expand All @@ -888,7 +888,7 @@ Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted)
}

Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

// Get the plasma object data. We pass in a timeout of 0 to indicate that
// the operation should timeout immediately.
Expand All @@ -905,7 +905,7 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
}

Status PlasmaClient::Impl::Subscribe(int* fd) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

int sock[2];
// Create a non-blocking socket pair. This will only be used to send
Expand All @@ -929,7 +929,7 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* object_id,
int64_t* data_size,
int64_t* metadata_size) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(buffer);
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
Expand All @@ -946,7 +946,7 @@ Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* o

Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
int64_t* data_size, int64_t* metadata_size) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

auto notification = ReadMessageAsync(fd);
if (notification == NULL) {
Expand All @@ -958,7 +958,7 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
const std::string& manager_socket_name,
int release_delay, int num_retries) {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_));
if (manager_socket_name != "") {
Expand All @@ -977,7 +977,7 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
}

Status PlasmaClient::Impl::Disconnect() {
std::lock_guard<std::mutex> guard(client_mutex_);
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

// NOTE: We purposefully do not finish sending release calls for objects in
// use, so that we don't duplicate PlasmaClient::Release calls (when handling
Expand Down

0 comments on commit 6249e98

Please sign in to comment.