Skip to content

Commit

Permalink
INTERNAL: Remove lock_arcus that are not in necessary #236
Browse files Browse the repository at this point in the history
  • Loading branch information
uhm0311 committed Mar 15, 2022
1 parent 222d6cf commit 2a4d2fe
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 0 deletions.
121 changes: 121 additions & 0 deletions libmemcached/arcus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ static pthread_mutex_t azk_mtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t azk_cond = PTHREAD_COND_INITIALIZER;
static int azk_count;

#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_t lock_update_serverlist = PTHREAD_MUTEX_INITIALIZER;
#endif
pthread_mutex_t lock_arcus = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_arcus = PTHREAD_COND_INITIALIZER;

Expand Down Expand Up @@ -104,7 +107,10 @@ static inline arcus_return_t do_arcus_init(memcached_st *mc,
memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_DISTRIBUTION, MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY);
memcached_behavior_set_key_hash(mc, MEMCACHED_HASH_MD5);

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (arcus) {
Expand Down Expand Up @@ -155,7 +161,10 @@ static inline arcus_return_t do_arcus_init(memcached_st *mc,
/* Set the Arcus to memcached as a server manager. */
memcached_set_server_manager(mc, (void *)arcus);
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

return rc;
}
Expand Down Expand Up @@ -310,15 +319,21 @@ arcus_return_t arcus_close(memcached_st *mc)
return rc;
}

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (arcus) {
arcus->pool= NULL;
free(arcus);
arcus= NULL;
}
memcached_set_server_manager(mc, NULL);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

return ARCUS_SUCCESS;
}
Expand Down Expand Up @@ -613,7 +628,10 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc)

inc_count(1);

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus) {
Expand All @@ -626,20 +644,29 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc)

ZOO_LOG_WARN(("Initiating zookeeper client"));

#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_arcus);
#endif
/* Connect to ZooKeeper ensemble. */
arcus->zk.handle= zookeeper_init(arcus->zk.ensemble_list,
do_arcus_zk_watcher_global,
arcus->zk.session_timeout,
&(arcus->zk.myid),
(void *)mc,
ZOO_NO_FLAGS);
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_unlock(&lock_arcus);
#endif
if (not arcus->zk.handle) {
ZOO_LOG_ERROR(("zookeeper_init() failed, reason=%s, zookeeper=%s",
strerror(errno), arcus->zk.ensemble_list));
rc= ARCUS_ERROR; break;
}
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

if (rc != ARCUS_SUCCESS) {
return ARCUS_ERROR;
Expand All @@ -655,28 +682,46 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc)
rc= ARCUS_ERROR; break;
}

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
if (arcus->zk.conn_result != ARCUS_SUCCESS) {
ZOO_LOG_ERROR(("ZooKeeper connection failed, result=%d", arcus->zk.conn_result));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
rc= ARCUS_ERROR; break;
}
if (do_arcus_cluster_validation_check(mc, arcus) < 0) {
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
rc= ARCUS_ERROR; break;
}
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

if (do_add_client_info(arcus) < 0) {
rc= ARCUS_ERROR; break;
}
} while(0);

if (rc != ARCUS_SUCCESS) {
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
zookeeper_close(arcus->zk.handle);
arcus->zk.handle= NULL;
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
}
return rc;
}
Expand All @@ -691,15 +736,24 @@ static inline arcus_return_t do_arcus_zk_close(memcached_st *mc)

clear_count();

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus) {
rc= ARCUS_ERROR; break;
}

#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_arcus);
#endif
/* Delete the (expired) session. */
arcus->zk.myid.client_id= 0;
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_unlock(&lock_arcus);
#endif

/* Clear connect result */
arcus->zk.conn_result= ARCUS_SUCCESS;
Expand All @@ -716,7 +770,10 @@ static inline arcus_return_t do_arcus_zk_close(memcached_st *mc)
}
}
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

return rc;
}
Expand Down Expand Up @@ -771,9 +828,17 @@ void arcus_server_check_for_update(memcached_st *ptr)
#endif
{
/* master's cache list was changed, update member's cache list */
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_update_serverlist);
#else
pthread_mutex_lock(&lock_arcus);
#endif
(void)memcached_pool_update_member(arcus->pool, ptr);
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_unlock(&lock_update_serverlist);
#else
pthread_mutex_unlock(&lock_arcus);
#endif
}
}
}
Expand Down Expand Up @@ -947,9 +1012,17 @@ static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc,
}
}

#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_update_serverlist);
#else
pthread_mutex_lock(&lock_arcus);
#endif
do_arcus_update_cachelist(mc, serverinfo, servercount);
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_unlock(&lock_update_serverlist);
#else
pthread_mutex_unlock(&lock_arcus);
#endif

libmemcached_free(mc, serverinfo);
}
Expand Down Expand Up @@ -1035,7 +1108,10 @@ static inline void do_arcus_zk_update_cachelist(memcached_st *mc,
{
arcus_st *arcus;

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus) {
Expand All @@ -1062,7 +1138,10 @@ static inline void do_arcus_zk_update_cachelist(memcached_st *mc,
libmemcached_free(mc, serverinfo);
}
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
}

static inline void do_arcus_zk_watch_and_update_cachelist(memcached_st *mc,
Expand Down Expand Up @@ -1114,9 +1193,15 @@ static inline void do_arcus_zk_watcher_cachelist(zhandle_t *zh __attribute__((un
ZOO_LOG_INFO(("ZOO_CHILD_EVENT from ZK cache list"));

memcached_st *mc= static_cast<memcached_st *>(ctx_mc);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus_st *arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
if (not arcus || not arcus->zk.handle) {
ZOO_LOG_ERROR(("arcus is null"));
return;
Expand Down Expand Up @@ -1147,11 +1232,17 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh,
return;
}

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus || not arcus->zk.handle) {
ZOO_LOG_ERROR(("arcus is null"));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
return;
}

Expand All @@ -1160,6 +1251,9 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh,
ZOO_LOG_WARN(("SESSION_STATE=CONNECTED, to %s", arcus->zk.ensemble_list));

const clientid_t *id= zoo_client_id(zh);
#ifdef REMOVE_LOCK_ARCUS
pthread_mutex_lock(&lock_arcus);
#endif
if (arcus->zk.myid.client_id == 0 or arcus->zk.myid.client_id != id->client_id) {
ZOO_LOG_DEBUG(("Previous sessionid : 0x%llx", (long long) arcus->zk.myid.client_id));
arcus->zk.myid= *id;
Expand All @@ -1172,12 +1266,18 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh,
else if (state == ZOO_CONNECTING_STATE or state == ZOO_ASSOCIATING_STATE)
{
ZOO_LOG_WARN(("SESSION_STATE=CONNECTING, to %s", arcus->zk.ensemble_list));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
}
else if (state == ZOO_AUTH_FAILED_STATE)
{
arcus->zk.conn_result= ARCUS_AUTH_FAILED;
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
if (arcus->zk_mgr.running) {
ZOO_LOG_WARN(("SESSION_STATE=AUTH_FAILED, create a new client after closing failed one"));
pthread_mutex_lock(&arcus->zk_mgr.lock);
Expand All @@ -1192,7 +1292,10 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh,
else if (state == ZOO_EXPIRED_SESSION_STATE)
{
ZOO_LOG_WARN(("SESSION_STATE=EXPIRED_SESSION, create a new client after closing expired one"));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
if (arcus->zk_mgr.running) {
pthread_mutex_lock(&arcus->zk_mgr.lock);
arcus->zk_mgr.request.reconnect_process= true;
Expand All @@ -1211,7 +1314,10 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc)
arcus_return_t rc= ARCUS_SUCCESS;

/* Start arcus zk manager thread */
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
do {
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (not arcus || not arcus->zk.handle) {
Expand All @@ -1235,15 +1341,21 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc)
do_arcus_zk_manager_wakeup(mc, true);
pthread_mutex_unlock(&arcus->zk_mgr.lock);
} while(0);
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

if (rc != ARCUS_SUCCESS) {
return rc;
}

ZOO_LOG_WARN(("Waiting for the cache server list..."));

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
if (arcus && arcus->zk.is_initializing) {
struct timeval now;
Expand All @@ -1259,7 +1371,10 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc)
rc= ARCUS_ERROR;
}
}
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif

if (rc == ARCUS_SUCCESS) {
ZOO_LOG_WARN(("Done"));
Expand All @@ -1271,9 +1386,15 @@ static inline void do_arcus_zk_manager_stop(memcached_st *mc)
{
arcus_st *arcus;

#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_lock(&lock_arcus);
#endif
arcus= static_cast<arcus_st *>(memcached_get_server_manager(mc));
#ifdef REMOVE_LOCK_ARCUS
#else
pthread_mutex_unlock(&lock_arcus);
#endif
if (arcus && arcus->zk_mgr.running) {
ZOO_LOG_WARN(("Wait for the arcus zookeeper manager to stop"));
arcus->zk_mgr.reqstop= true;
Expand Down
1 change: 1 addition & 0 deletions libmemcached/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#define POOL_UPDATE_SERVERLIST 1
#define POOL_MORE_CONCURRENCY 1
#define KETAMA_HASH_COLLSION 1
#define REMOVE_LOCK_ARCUS 1

/* Public defines */
#define MEMCACHED_DEFAULT_PORT 11211
Expand Down

0 comments on commit 2a4d2fe

Please sign in to comment.