From 5b15222c876705b3380fe8574ad606fad8a82c4f Mon Sep 17 00:00:00 2001 From: Yao Yue Date: Wed, 21 Aug 2019 17:33:45 -0700 Subject: [PATCH] allow optional bounded size array (controlled by watermarks) (#243) * allow optional bounded size array (controlled by watermarks) * fix compiler warnings --- src/data_structure/sarray/sarray.c | 2 +- src/protocol/data/resp/cmd_sarray.h | 8 +- src/server/rds/data/cmd_sarray.c | 168 +++++++++++++++++++--------- src/storage/slab/item.h | 2 +- 4 files changed, 125 insertions(+), 55 deletions(-) diff --git a/src/data_structure/sarray/sarray.c b/src/data_structure/sarray/sarray.c index b79847f7e..1b3183414 100644 --- a/src/data_structure/sarray/sarray.c +++ b/src/data_structure/sarray/sarray.c @@ -108,7 +108,7 @@ _linear_search(uint32_t *idx, uint8_t *body, uint32_t nentry, uint32_t esize, ui static inline bool _binary_search(uint32_t *idx, uint8_t *body, uint32_t nentry, uint32_t esize, uint64_t val) { - uint32_t id, imin, imax; + uint32_t id = 0, imin, imax; uint32_t curr; *idx = 0; diff --git a/src/protocol/data/resp/cmd_sarray.h b/src/protocol/data/resp/cmd_sarray.h index e45572673..0ad448822 100644 --- a/src/protocol/data/resp/cmd_sarray.h +++ b/src/protocol/data/resp/cmd_sarray.h @@ -2,7 +2,7 @@ /** * create: create an empty array or integer width ESIZE - * SArray.create KEY ESIZE + * SArray.create KEY ESIZE [WATERMARK_L] [WATERMARK_H] * * delete: delete an array * SArray.delete KEY @@ -20,7 +20,7 @@ * SArray.insert KEY VALUE [VALUE ...] * * remove: remove a particular value from array - * SArray.remove KEY VALUE + * SArray.remove KEY VALUE [VALUE ...] * * truncate: truncate a array * SArray.truncate KEY COUNT @@ -30,7 +30,7 @@ /* type string #arg #opt */ #define REQ_SARRAY(ACTION) \ - ACTION( REQ_SARRAY_CREATE, "SArray.create", 3, 0 )\ + ACTION( REQ_SARRAY_CREATE, "SArray.create", 3, 2 )\ ACTION( REQ_SARRAY_DELETE, "SArray.delete", 2, 0 )\ ACTION( REQ_SARRAY_LEN, "SArray.len", 2, 0 )\ ACTION( REQ_SARRAY_FIND, "SArray.find", 3, 0 )\ @@ -47,4 +47,6 @@ typedef enum sarray_elem { SARRAY_IDX = 2, SARRAY_CNT = 2, SARRAY_ICNT = 3, /* when an index is also present */ + SARRAY_WML = 3, /* watermark (low) */ + SARRAY_WMH = 4, /* watermark (high) */ } sarray_elem_e; diff --git a/src/server/rds/data/cmd_sarray.c b/src/server/rds/data/cmd_sarray.c index c0eceec6b..00e7c84a0 100644 --- a/src/server/rds/data/cmd_sarray.c +++ b/src/server/rds/data/cmd_sarray.c @@ -10,40 +10,12 @@ #include #include +#define WATERMARK_SIZE (sizeof(uint32_t) * 2) /* entries in u32 */ /* TODO(yao): make MAX_NVAL configurable */ #define MAX_NVAL 255 /* max no. of values to insert/remove in one request */ -static uint64_t vals[MAX_NVAL]; - - -static inline struct item * -_add_key(struct response *rsp, struct bstring *key) -{ - struct element *reply = (struct element *)array_get(rsp->token, 0); - struct item *it; - item_rstatus_e istatus; - - it = item_get(key); - if (it != NULL) { - rsp->type = reply->type = ELEM_ERR; - reply->bstr = str2bstr(RSP_EXIST); - INCR(process_metrics, sarray_create_exist); - - return NULL; - } else { - /* TODO: figure out a TTL story here */ - istatus = item_reserve(&it, key, NULL, SARRAY_HEADER_SIZE, 0, INT32_MAX); - if (istatus != ITEM_OK) { - rsp->type = reply->type = ELEM_ERR; - reply->bstr = str2bstr(RSP_ERR_STORAGE); - INCR(process_metrics, sarray_create_ex); - INCR(process_metrics, process_ex); - } else { - INCR(process_metrics, sarray_create_ok); - } - return it; - } -} +static uint64_t vals[MAX_NVAL]; +static struct bstring null_key = null_bstring; /** * Attempt to extend an item by delta bytes. This is accomplished by first @@ -69,8 +41,13 @@ _realloc_key(struct item **it, const struct bstring *key, uint32_t delta) istatus = item_reserve(&nit, key, NULL, item_nval(*it) + delta, (*it)->olen, (*it)->expire_at); if (istatus != ITEM_OK) { + log_debug("reallocate item for key '%.*s' failed: %d", key->len, + key->data, istatus); return istatus; } + + log_verb("successfully reallocated item for key '%.*s'", key->len, + key->data); /*copy item payload */ cc_memcpy(nit->end, (*it)->end, item_npayload(*it)); @@ -82,6 +59,26 @@ _realloc_key(struct item **it, const struct bstring *key, uint32_t delta) return ITEM_OK; } +static inline uint32_t +_watermark_low(uint32_t *opt) +{ + return *opt; +} + +static inline uint32_t +_watermark_high(uint32_t *opt) +{ + return *(opt + 1); +} + +static inline void +_set_watermark(uint32_t *opt, uint32_t low, uint32_t high) +{ + *opt = low; + ++opt; + *opt = high; +} + void cmd_sarray_create(struct response *rsp, const struct request *req, const struct command *cmd) @@ -89,9 +86,13 @@ cmd_sarray_create(struct response *rsp, const struct request *req, struct element *reply = (struct element *)array_push(rsp->token); struct bstring *key; struct item *it; - int64_t esize; + item_rstatus_e istatus; + uint32_t ntoken; + bool bounded; + int64_t esize, low, high; - ASSERT(array_nelem(req->token) == cmd->narg); + ntoken = array_nelem(req->token); + ASSERT(ntoken >= cmd->narg); INCR(process_metrics, sarray_create); @@ -101,16 +102,50 @@ cmd_sarray_create(struct response *rsp, const struct request *req, return; } - log_verb("before esize"); if (!req_get_int(&esize, req, SARRAY_ESIZE)) { compose_rsp_client_err(rsp, reply, cmd, key); - INCR(process_metrics, sarray_find_ex); + INCR(process_metrics, sarray_create_ex); return; } - log_verb("post parse"); - it = _add_key(rsp, key); + bounded = (cmd->nopt > 0); + if (bounded && cmd->nopt != 2) { + compose_rsp_client_err(rsp, reply, cmd, key); + INCR(process_metrics, sarray_create_ex); + + return; + } + + /* get low & high watermarks */ + if (cmd->nopt > 0 && (!req_get_int(&low, req, SARRAY_WML) || + !req_get_int(&high, req, SARRAY_WMH))) { + compose_rsp_client_err(rsp, reply, cmd, key); + INCR(process_metrics, sarray_create_ex); + + return; + } + + /* add key */ + it = item_get(key); + if (it != NULL) { + rsp->type = reply->type = ELEM_ERR; + reply->bstr = str2bstr(RSP_EXIST); + INCR(process_metrics, sarray_create_exist); + } else { + /* TODO: figure out a TTL story here */ + istatus = item_reserve(&it, key, NULL, SARRAY_HEADER_SIZE, + WATERMARK_SIZE * bounded, INT32_MAX); + if (istatus != ITEM_OK) { + rsp->type = reply->type = ELEM_ERR; + reply->bstr = str2bstr(RSP_ERR_STORAGE); + } else { + if (bounded) { + _set_watermark((uint32_t *)item_optional(it), low, high); + } + INCR(process_metrics, sarray_create_ok); + } + } if (it == NULL) { compose_rsp_storage_err(rsp, reply, cmd, key); INCR(process_metrics, sarray_create_ex); @@ -133,7 +168,7 @@ cmd_sarray_delete(struct response *rsp, const struct request *req, const struct command *cmd) { struct element *reply = (struct element *)array_push(rsp->token); - struct bstring *key; + struct bstring *key = &null_key; ASSERT(array_nelem(req->token) == cmd->narg); @@ -160,7 +195,7 @@ cmd_sarray_len(struct response *rsp, const struct request *req, const struct command *cmd) { struct element *reply = (struct element *)array_push(rsp->token); - struct bstring *key; + struct bstring *key = &null_key; struct item *it; uint32_t nentry; @@ -192,7 +227,7 @@ cmd_sarray_find(struct response *rsp, const struct request *req, const struct command *cmd) { struct element *reply = (struct element *)array_push(rsp->token); - struct bstring *key; + struct bstring *key = &null_key; struct item *it; uint32_t idx; int64_t val; @@ -255,7 +290,7 @@ cmd_sarray_get(struct response *rsp, const struct request *req, const struct command *cmd) { struct element *reply = (struct element *)array_push(rsp->token); - struct bstring *key; + struct bstring *key = &null_key; struct item *it; int64_t idx = 0, cnt = 1; uint64_t val; @@ -339,10 +374,10 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct command *cmd) { struct element *reply = (struct element *)array_push(rsp->token); - struct bstring *key; + struct bstring *key = &null_key; struct item *it; - uint32_t nval = 0, ninserted = 0, delta; - int64_t val; + uint32_t nval = 0, esize; + int64_t delta, val, wml, wmh, nentry, ninserted = 0; sarray_p sa; sarray_rstatus_e status; @@ -377,7 +412,30 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct } } - delta = sarray_esize((sarray_p)item_data(it)) * nval; + /* we always insert everything before trying to truncate down an array + * that is too long. The reason for that is because inserting is the + * only way of ensuring the new values are properly sorted in the array, + * and the truncation (designed to be from the left for now) is indeed + * removing the lowest values. The downside is we may trigger an extra + * realloc of the key and assign it more memory than the final size may + * require. + * + * Example: if item can host at most an array of 10 elements, and we + * create an array with watermarks [6, 8], inserting 6 elements into + * an array of 4 elements will result in the array having 10 elements + * before being trimmed back. So the following logic will try to allocate + * (and keep) memory for 10 elements. + * + * However, this seems acceptable, mostly because we assume insert batch + * size is relatively small compared to watermark settings in most cases, + * and therefore users can configure their watermarks and control their + * batch sizes to ensure insertion at maximum array size stays within a + * single slabclass. + */ + sa = (sarray_p)item_data(it); + esize = sarray_esize(sa); + delta = esize * nval; + if (_realloc_key(&it, key, delta) != ITEM_OK) { compose_rsp_storage_err(rsp, reply, cmd, key); INCR(process_metrics, sarray_insert_ex); @@ -385,7 +443,7 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct return; } - sa = (sarray_p)item_data(it); + sa = (sarray_p)item_data(it); /* item might have changed */ for (uint32_t i = 0; i < nval; ++i) { status = sarray_insert(sa, vals[i]); if (status == SARRAY_EINVALID) { @@ -395,7 +453,6 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct } if (status == SARRAY_EDUP) { - compose_rsp_noop(rsp, reply, cmd, key); INCR(process_metrics, sarray_insert_noop); } else { INCR(process_metrics, sarray_insert_ok); @@ -403,7 +460,18 @@ cmd_sarray_insert(struct response *rsp, const struct request *req, const struct } } - compose_rsp_numeric(rsp, reply, cmd, key, (int64_t)ninserted); + if (it->olen > 0) { + wml = _watermark_low((uint32_t *)item_optional(it)); + wmh = _watermark_high((uint32_t *)item_optional(it)); + nentry = sarray_nentry(sa); + if (nentry > wmh) { + log_verb("truncating '%.*s' from %"PRIu32" down to %"PRIu32" elements", + key->len, key->data, nentry, wml); + sarray_truncate(sa, nentry - wml); + } + } + + compose_rsp_numeric(rsp, reply, cmd, key, ninserted); } void @@ -411,7 +479,7 @@ cmd_sarray_remove(struct response *rsp, const struct request *req, const struct command *cmd) { struct element *reply = (struct element *)array_push(rsp->token); - struct bstring *key; + struct bstring *key = &null_key; struct item *it; uint32_t nval = 0, nremoved = 0; int64_t val; @@ -487,7 +555,7 @@ cmd_sarray_truncate(struct response *rsp, const struct request *req, const struct command *cmd) { struct element *reply = (struct element *)array_push(rsp->token); - struct bstring *key; + struct bstring *key = &null_key; struct item *it; int64_t cnt; sarray_rstatus_e status; diff --git a/src/storage/slab/item.h b/src/storage/slab/item.h index 54d69e7ef..986958e3d 100644 --- a/src/storage/slab/item.h +++ b/src/storage/slab/item.h @@ -166,7 +166,7 @@ item_optional(struct item *it) } /* - * Get start location of item payload + * Get start location of item value */ static inline char * item_data(struct item *it)