Skip to content

Commit

Permalink
bug fixing gearman#61
Browse files Browse the repository at this point in the history
- use redis HMSET/HGETALL commands to put/fetch  data and priority
  • Loading branch information
p-alik committed Jan 9, 2017
1 parent 9530317 commit b9ef482
Showing 1 changed file with 87 additions and 35 deletions.
122 changes: 87 additions & 35 deletions libgearman-server/plugins/queue/redis/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,74 @@ class Hiredis : public Queue {
return _redis;
}

/*
* hmset(std::vector<char> key, const void *data, size_t data_size, gearman_job_priority_t priority)
*
* returns true if hiredis HMSET succeeded
*/
bool hmset(std::vector<char> key, const void *data, size_t data_size, gearman_job_priority_t priority) {
redisContext* context = this->redis();
int argc = 6;
std::string _priority = std::to_string((uint32_t)priority);

const size_t argvlen[argc] = {
(const size_t)5,
(const size_t)key.size(),
(const size_t)4,
(const size_t)data_size,
(const size_t)8,
_priority.size()
};

std::vector<const char*> argv {"HMSET"};
argv.push_back( &key[0] );
argv.push_back( "data" );
argv.push_back( static_cast<const char*>(data) );
argv.push_back( "priority" );
argv.push_back( _priority.c_str() );

redisReply *reply = (redisReply *)redisCommandArgv(context, argv.size(), &(argv[0]), &(argvlen[0]) );
if (!reply) {
return false;
}

bool res = (reply->type == REDIS_REPLY_STATUS);

freeReplyObject(reply);

return res;
}

/*
* std::pair<std::string, gearman_job_priority_t> fetch(char *key)
*
* returns pair<data, prioriry> selected by the key
*/
std::pair<std::string, gearman_job_priority_t> fetch(char *key)
{
std::pair<std::string, gearman_job_priority_t> res;
redisContext* context = this->redis();
redisReply *reply= (redisReply*)redisCommand(context, "HGETALL %s", key);
if (reply == NULL)
{
return res;
}

// 2 x (key + value)
assert(reply->elements == 4);

std::string data(reply->element[1]->str);
uint32_t p = (uint32_t)std::stoi(reply->element[3]->str);

freeReplyObject(reply);

gearman_job_priority_t priority = static_cast<gearman_job_priority_t>(p);

res = std::make_pair(reply->element[1]->str, priority);

return res;
}

std::string server;
std::string service;
std::string password;
Expand Down Expand Up @@ -175,10 +243,11 @@ typedef std::vector<char> vchar_t;

static size_t build_key(vchar_t &key,
const char *unique,
size_t unique_size,
size_t unique_size,
const char *function_name,
size_t function_name_size)
{
//FIXME key.size > snprintf result
key.resize(function_name_size +unique_size +GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX_SIZE +4);
int key_size= snprintf(&key[0], key.size(), GEARMAND_KEY_LITERAL,
GEARMAND_QUEUE_GEARMAND_DEFAULT_PREFIX,
Expand Down Expand Up @@ -217,11 +286,9 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context,
const char *function_name,
size_t function_name_size,
const void *data, size_t data_size,
gearman_job_priority_t,
gearman_job_priority_t priority,
int64_t when)
{
gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;

if (when) // No support for EPOCH jobs
{
return gearmand_gerror("hiredis queue does not support epoch jobs", GEARMAND_QUEUE_ERROR);
Expand All @@ -237,18 +304,14 @@ static gearmand_error_t _hiredis_add(gearman_server_st *, void *context,
GEARMAN_DEFAULT_LOG_PARAM,
"hires key: %u", (uint32_t)key.size());

redisReply *reply= (redisReply*)redisCommand(queue->redis(), "SET %s %b", &key[0], data, data_size);
gearmand_log_debug(GEARMAN_DEFAULT_LOG_PARAM, "got reply");
if (reply == NULL)
{
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
GEARMAND_QUEUE_ERROR,
"failed to insert '%.*s' into redis", key.size(), &key[0]);
}
freeReplyObject(reply);
gearmand::plugins::queue::Hiredis *queue= (gearmand::plugins::queue::Hiredis *)context;
if (queue->hmset(key, data, data_size, priority))
return GEARMAND_SUCCESS;

return GEARMAND_SUCCESS;
return gearmand_log_gerror(
GEARMAN_DEFAULT_LOG_PARAM,
GEARMAND_QUEUE_ERROR,
"failed to insert '%.*s' into redis", key.size(), &key[0]);
}

static gearmand_error_t _hiredis_flush(gearman_server_st *, void *)
Expand All @@ -258,7 +321,7 @@ static gearmand_error_t _hiredis_flush(gearman_server_st *, void *)

static gearmand_error_t _hiredis_done(gearman_server_st *, void *context,
const char *unique,
size_t unique_size,
size_t unique_size,
const char *function_name,
size_t function_name_size)
{
Expand All @@ -271,7 +334,7 @@ static gearmand_error_t _hiredis_done(gearman_server_st *, void *context,
std::vector<char> key;
build_key(key, unique, unique_size, function_name, function_name_size);

redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %s", &key[0]);
redisReply *reply= (redisReply*)redisCommand(queue->redis(), "DEL %b", &key[0], key.size());
if (reply == NULL)
{
return gearmand_log_gerror(
Expand Down Expand Up @@ -332,30 +395,19 @@ static gearmand_error_t _hiredis_replay(gearman_server_st *server, void *context
continue;
}

redisReply *get_reply= (redisReply*)redisCommand(queue->redis(), "GET %s", reply->element[x]->str);
if (get_reply == NULL)
{
gearmand_log_debug(
GEARMAN_DEFAULT_LOG_PARAM,
"GET %s failed: %s", reply->element[x]->str, queue->redis()->errstr);
continue;
}
std::pair<std::string, gearman_job_priority_t> record = queue->fetch(reply->element[x]->str);

/* need to make a copy here ... gearman_server_job_free will free it later */
char * data = (char*)malloc(get_reply->len);
if (data == NULL)
{
return gearmand_perror(errno, "malloc");
}
memcpy(data, get_reply->str, get_reply->len);
//FIXME insure gearman_server_job_free gets free it later
std::string data = record.first;
gearman_job_priority_t priority = static_cast<gearman_job_priority_t>(record.second);

(void)(add_fn)(server, add_context,
unique, strlen(unique),
function_name, strlen(function_name),
data, get_reply->len,
GEARMAN_JOB_PRIORITY_NORMAL, 0);
freeReplyObject(get_reply);
data.c_str(), data.size(),
priority, 0);
}

freeReplyObject(reply);

return GEARMAND_SUCCESS;
Expand Down

0 comments on commit b9ef482

Please sign in to comment.