From 6249e98d3c9f7d8aef4439e17f878d96b3727ff0 Mon Sep 17 00:00:00 2001 From: Zhijun Fu Date: Thu, 13 Jun 2019 18:02:38 +0800 Subject: [PATCH] change mutex to recursive_mutex --- cpp/src/plasma/client.cc | 46 ++++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index 78f8da7817849..16bedbea15b51 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -302,7 +302,7 @@ class PlasmaClient::Impl : public std::enable_shared_from_this deletion_cache_; /// A mutex which protects this class. - std::mutex client_mutex_; + std::recursive_mutex client_mutex_; #ifdef PLASMA_CUDA /// Cuda Device Manager. @@ -343,7 +343,7 @@ uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) { } bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); const auto elem = objects_in_use_.find(object_id); return (elem != objects_in_use_.end()); @@ -388,7 +388,7 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id, Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata, int64_t metadata_size, std::shared_ptr* data, int device_num) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size " << data_size << " and metadata size " << metadata_size; @@ -457,7 +457,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size, Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id, const std::string& data, const std::string& metadata) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_; // Compute the object hash. @@ -510,7 +510,7 @@ Status PlasmaClient::Impl::GetBuffers( data + object->data_offset, object->data_size + object->metadata_size); } else { #ifdef PLASMA_CUDA - std::lock_guard lock(gpu_mutex); + std::lock_guard lock(gpu_mutex); auto iter = gpu_object_map.find(object_ids[i]); ARROW_CHECK(iter != gpu_object_map.end()); iter->second->client_count++; @@ -576,7 +576,7 @@ Status PlasmaClient::Impl::GetBuffers( data + object->data_offset, object->data_size + object->metadata_size); } else { #ifdef PLASMA_CUDA - std::lock_guard lock(gpu_mutex); + std::lock_guard lock(gpu_mutex); auto handle = gpu_object_map.find(object_ids[i]); if (handle == gpu_object_map.end()) { std::shared_ptr context; @@ -615,7 +615,7 @@ Status PlasmaClient::Impl::GetBuffers( Status PlasmaClient::Impl::Get(const std::vector& object_ids, int64_t timeout_ms, std::vector* out) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); const auto wrap_buffer = [=](const ObjectID& object_id, const std::shared_ptr& buffer) { @@ -628,7 +628,7 @@ Status PlasmaClient::Impl::Get(const std::vector& object_ids, Status PlasmaClient::Impl::Get(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms, ObjectBuffer* out) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); const auto wrap_buffer = [](const ObjectID& object_id, const std::shared_ptr& buffer) { return buffer; }; @@ -646,7 +646,7 @@ Status PlasmaClient::Impl::MarkObjectUnused(const ObjectID& object_id) { } Status PlasmaClient::Impl::Release(const ObjectID& object_id) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); // If the client is already disconnected, ignore release requests. if (store_conn_ < 0) { @@ -657,7 +657,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) { #ifdef PLASMA_CUDA if (object_entry->second->object.device_num != 0) { - std::lock_guard lock(gpu_mutex); + std::lock_guard lock(gpu_mutex); auto iter = gpu_object_map.find(object_id); ARROW_CHECK(iter != gpu_object_map.end()); if (--iter->second->client_count == 0) { @@ -685,7 +685,7 @@ Status PlasmaClient::Impl::Release(const ObjectID& object_id) { // This method is used to query whether the plasma store contains an object. Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); // Check if we already have a reference to the object. if (objects_in_use_.count(object_id) > 0) { @@ -705,7 +705,7 @@ Status PlasmaClient::Impl::Contains(const ObjectID& object_id, bool* has_object) } Status PlasmaClient::Impl::List(ObjectTable* objects) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); RETURN_NOT_OK(SendListRequest(store_conn_)); std::vector buffer; RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaListReply, &buffer)); @@ -784,7 +784,7 @@ uint64_t PlasmaClient::Impl::ComputeObjectHash(const uint8_t* data, int64_t data } Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); // Make sure this client has a reference to the object before sending the // request to Plasma. @@ -812,7 +812,7 @@ Status PlasmaClient::Impl::Seal(const ObjectID& object_id) { } Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); auto object_entry = objects_in_use_.find(object_id); ARROW_CHECK(object_entry != objects_in_use_.end()) << "Plasma client called abort on an object without a reference to it"; @@ -828,7 +828,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { #ifdef PLASMA_CUDA if (object_entry->second->object.device_num != 0) { - std::lock_guard lock(gpu_mutex); + std::lock_guard lock(gpu_mutex); auto iter = gpu_object_map.find(object_id); ARROW_CHECK(iter != gpu_object_map.end()); ARROW_CHECK(iter->second->client_count == 1); @@ -851,7 +851,7 @@ Status PlasmaClient::Impl::Abort(const ObjectID& object_id) { } Status PlasmaClient::Impl::Delete(const std::vector& object_ids) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); std::vector not_in_use_ids; for (auto& object_id : object_ids) { @@ -876,7 +876,7 @@ Status PlasmaClient::Impl::Delete(const std::vector& object_ids) { } Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); // Send a request to the store to evict objects. RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes)); @@ -888,7 +888,7 @@ Status PlasmaClient::Impl::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) } Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); // Get the plasma object data. We pass in a timeout of 0 to indicate that // the operation should timeout immediately. @@ -905,7 +905,7 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) { } Status PlasmaClient::Impl::Subscribe(int* fd) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); int sock[2]; // Create a non-blocking socket pair. This will only be used to send @@ -929,7 +929,7 @@ Status PlasmaClient::Impl::Subscribe(int* fd) { Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); auto object_info = flatbuffers::GetRoot(buffer); ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID)); @@ -946,7 +946,7 @@ Status PlasmaClient::Impl::DecodeNotification(const uint8_t* buffer, ObjectID* o Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); auto notification = ReadMessageAsync(fd); if (notification == NULL) { @@ -958,7 +958,7 @@ Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id, Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, const std::string& manager_socket_name, int release_delay, int num_retries) { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); RETURN_NOT_OK(ConnectIpcSocketRetry(store_socket_name, num_retries, -1, &store_conn_)); if (manager_socket_name != "") { @@ -977,7 +977,7 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name, } Status PlasmaClient::Impl::Disconnect() { - std::lock_guard guard(client_mutex_); + std::lock_guard guard(client_mutex_); // NOTE: We purposefully do not finish sending release calls for objects in // use, so that we don't duplicate PlasmaClient::Release calls (when handling