diff --git a/libmemcached/arcus.cc b/libmemcached/arcus.cc index 26c6c6f2..90baedd8 100644 --- a/libmemcached/arcus.cc +++ b/libmemcached/arcus.cc @@ -74,6 +74,9 @@ static pthread_mutex_t azk_mtx = PTHREAD_MUTEX_INITIALIZER; static pthread_cond_t azk_cond = PTHREAD_COND_INITIALIZER; static int azk_count; +#ifdef REMOVE_LOCK_ARCUS +pthread_mutex_t lock_update_serverlist = PTHREAD_MUTEX_INITIALIZER; +#endif pthread_mutex_t lock_arcus = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t cond_arcus = PTHREAD_COND_INITIALIZER; @@ -104,7 +107,10 @@ static inline arcus_return_t do_arcus_init(memcached_st *mc, memcached_behavior_set(mc, MEMCACHED_BEHAVIOR_DISTRIBUTION, MEMCACHED_DISTRIBUTION_CONSISTENT_KETAMA_SPY); memcached_behavior_set_key_hash(mc, MEMCACHED_HASH_MD5); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif do { arcus= static_cast(memcached_get_server_manager(mc)); if (arcus) { @@ -155,7 +161,10 @@ static inline arcus_return_t do_arcus_init(memcached_st *mc, /* Set the Arcus to memcached as a server manager. */ memcached_set_server_manager(mc, (void *)arcus); } while(0); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif return rc; } @@ -310,7 +319,10 @@ arcus_return_t arcus_close(memcached_st *mc) return rc; } +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif arcus= static_cast(memcached_get_server_manager(mc)); if (arcus) { arcus->pool= NULL; @@ -318,7 +330,10 @@ arcus_return_t arcus_close(memcached_st *mc) arcus= NULL; } memcached_set_server_manager(mc, NULL); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif return ARCUS_SUCCESS; } @@ -613,7 +628,10 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc) inc_count(1); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif do { arcus= static_cast(memcached_get_server_manager(mc)); if (not arcus) { @@ -626,6 +644,9 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc) ZOO_LOG_WARN(("Initiating zookeeper client")); +#ifdef REMOVE_LOCK_ARCUS + pthread_mutex_lock(&lock_arcus); +#endif /* Connect to ZooKeeper ensemble. */ arcus->zk.handle= zookeeper_init(arcus->zk.ensemble_list, do_arcus_zk_watcher_global, @@ -633,13 +654,19 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc) &(arcus->zk.myid), (void *)mc, ZOO_NO_FLAGS); +#ifdef REMOVE_LOCK_ARCUS + pthread_mutex_unlock(&lock_arcus); +#endif if (not arcus->zk.handle) { ZOO_LOG_ERROR(("zookeeper_init() failed, reason=%s, zookeeper=%s", strerror(errno), arcus->zk.ensemble_list)); rc= ARCUS_ERROR; break; } } while(0); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif if (rc != ARCUS_SUCCESS) { return ARCUS_ERROR; @@ -655,17 +682,29 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc) rc= ARCUS_ERROR; break; } +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif if (arcus->zk.conn_result != ARCUS_SUCCESS) { ZOO_LOG_ERROR(("ZooKeeper connection failed, result=%d", arcus->zk.conn_result)); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif rc= ARCUS_ERROR; break; } if (do_arcus_cluster_validation_check(mc, arcus) < 0) { +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif rc= ARCUS_ERROR; break; } +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif if (do_add_client_info(arcus) < 0) { rc= ARCUS_ERROR; break; @@ -673,10 +712,16 @@ static inline arcus_return_t do_arcus_zk_connect(memcached_st *mc) } while(0); if (rc != ARCUS_SUCCESS) { +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif zookeeper_close(arcus->zk.handle); arcus->zk.handle= NULL; +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif } return rc; } @@ -691,15 +736,24 @@ static inline arcus_return_t do_arcus_zk_close(memcached_st *mc) clear_count(); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif do { arcus= static_cast(memcached_get_server_manager(mc)); if (not arcus) { rc= ARCUS_ERROR; break; } +#ifdef REMOVE_LOCK_ARCUS + pthread_mutex_lock(&lock_arcus); +#endif /* Delete the (expired) session. */ arcus->zk.myid.client_id= 0; +#ifdef REMOVE_LOCK_ARCUS + pthread_mutex_unlock(&lock_arcus); +#endif /* Clear connect result */ arcus->zk.conn_result= ARCUS_SUCCESS; @@ -716,7 +770,10 @@ static inline arcus_return_t do_arcus_zk_close(memcached_st *mc) } } } while(0); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif return rc; } @@ -771,9 +828,17 @@ void arcus_server_check_for_update(memcached_st *ptr) #endif { /* master's cache list was changed, update member's cache list */ +#ifdef REMOVE_LOCK_ARCUS + pthread_mutex_lock(&lock_update_serverlist); +#else pthread_mutex_lock(&lock_arcus); +#endif (void)memcached_pool_update_member(arcus->pool, ptr); +#ifdef REMOVE_LOCK_ARCUS + pthread_mutex_unlock(&lock_update_serverlist); +#else pthread_mutex_unlock(&lock_arcus); +#endif } } } @@ -947,9 +1012,17 @@ static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc, } } +#ifdef REMOVE_LOCK_ARCUS + pthread_mutex_lock(&lock_update_serverlist); +#else pthread_mutex_lock(&lock_arcus); +#endif do_arcus_update_cachelist(mc, serverinfo, servercount); +#ifdef REMOVE_LOCK_ARCUS + pthread_mutex_unlock(&lock_update_serverlist); +#else pthread_mutex_unlock(&lock_arcus); +#endif libmemcached_free(mc, serverinfo); } @@ -1035,7 +1108,10 @@ static inline void do_arcus_zk_update_cachelist(memcached_st *mc, { arcus_st *arcus; +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif do { arcus= static_cast(memcached_get_server_manager(mc)); if (not arcus) { @@ -1062,7 +1138,10 @@ static inline void do_arcus_zk_update_cachelist(memcached_st *mc, libmemcached_free(mc, serverinfo); } } while(0); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif } static inline void do_arcus_zk_watch_and_update_cachelist(memcached_st *mc, @@ -1114,9 +1193,15 @@ static inline void do_arcus_zk_watcher_cachelist(zhandle_t *zh __attribute__((un ZOO_LOG_INFO(("ZOO_CHILD_EVENT from ZK cache list")); memcached_st *mc= static_cast(ctx_mc); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif arcus_st *arcus= static_cast(memcached_get_server_manager(mc)); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif if (not arcus || not arcus->zk.handle) { ZOO_LOG_ERROR(("arcus is null")); return; @@ -1147,11 +1232,17 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh, return; } +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif arcus= static_cast(memcached_get_server_manager(mc)); if (not arcus || not arcus->zk.handle) { ZOO_LOG_ERROR(("arcus is null")); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif return; } @@ -1160,6 +1251,9 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh, ZOO_LOG_WARN(("SESSION_STATE=CONNECTED, to %s", arcus->zk.ensemble_list)); const clientid_t *id= zoo_client_id(zh); +#ifdef REMOVE_LOCK_ARCUS + pthread_mutex_lock(&lock_arcus); +#endif if (arcus->zk.myid.client_id == 0 or arcus->zk.myid.client_id != id->client_id) { ZOO_LOG_DEBUG(("Previous sessionid : 0x%llx", (long long) arcus->zk.myid.client_id)); arcus->zk.myid= *id; @@ -1172,12 +1266,18 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh, else if (state == ZOO_CONNECTING_STATE or state == ZOO_ASSOCIATING_STATE) { ZOO_LOG_WARN(("SESSION_STATE=CONNECTING, to %s", arcus->zk.ensemble_list)); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif } else if (state == ZOO_AUTH_FAILED_STATE) { arcus->zk.conn_result= ARCUS_AUTH_FAILED; +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif if (arcus->zk_mgr.running) { ZOO_LOG_WARN(("SESSION_STATE=AUTH_FAILED, create a new client after closing failed one")); pthread_mutex_lock(&arcus->zk_mgr.lock); @@ -1192,7 +1292,10 @@ static inline void do_arcus_zk_watcher_global(zhandle_t *zh, else if (state == ZOO_EXPIRED_SESSION_STATE) { ZOO_LOG_WARN(("SESSION_STATE=EXPIRED_SESSION, create a new client after closing expired one")); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif if (arcus->zk_mgr.running) { pthread_mutex_lock(&arcus->zk_mgr.lock); arcus->zk_mgr.request.reconnect_process= true; @@ -1211,7 +1314,10 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc) arcus_return_t rc= ARCUS_SUCCESS; /* Start arcus zk manager thread */ +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif do { arcus= static_cast(memcached_get_server_manager(mc)); if (not arcus || not arcus->zk.handle) { @@ -1235,7 +1341,10 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc) do_arcus_zk_manager_wakeup(mc, true); pthread_mutex_unlock(&arcus->zk_mgr.lock); } while(0); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif if (rc != ARCUS_SUCCESS) { return rc; @@ -1243,7 +1352,10 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc) ZOO_LOG_WARN(("Waiting for the cache server list...")); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif arcus= static_cast(memcached_get_server_manager(mc)); if (arcus && arcus->zk.is_initializing) { struct timeval now; @@ -1259,7 +1371,10 @@ static inline arcus_return_t do_arcus_zk_manager_start(memcached_st *mc) rc= ARCUS_ERROR; } } +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif if (rc == ARCUS_SUCCESS) { ZOO_LOG_WARN(("Done")); @@ -1271,9 +1386,15 @@ static inline void do_arcus_zk_manager_stop(memcached_st *mc) { arcus_st *arcus; +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_lock(&lock_arcus); +#endif arcus= static_cast(memcached_get_server_manager(mc)); +#ifdef REMOVE_LOCK_ARCUS +#else pthread_mutex_unlock(&lock_arcus); +#endif if (arcus && arcus->zk_mgr.running) { ZOO_LOG_WARN(("Wait for the arcus zookeeper manager to stop")); arcus->zk_mgr.reqstop= true; diff --git a/libmemcached/constants.h b/libmemcached/constants.h index 18686c64..e5477083 100644 --- a/libmemcached/constants.h +++ b/libmemcached/constants.h @@ -66,6 +66,7 @@ #define POOL_UPDATE_SERVERLIST 1 #define POOL_MORE_CONCURRENCY 1 #define KETAMA_HASH_COLLSION 1 +#define REMOVE_LOCK_ARCUS 1 /* Public defines */ #define MEMCACHED_DEFAULT_PORT 11211