-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
273 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
#ifndef __HIREDIS_LIBSDEVENT_H__ | ||
#define __HIREDIS_LIBSDEVENT_H__ | ||
#include <systemd/sd-event.h> | ||
#include "../hiredis.h" | ||
#include "../async.h" | ||
|
||
#define REDIS_LIBSDEVENT_DELETED 0x01 | ||
#define REDIS_LIBSDEVENT_ENTERED 0x02 | ||
|
||
typedef struct redisLibsdeventEvents { | ||
redisAsyncContext *context; | ||
struct sd_event *event; | ||
struct sd_event_source *fdSource; | ||
struct sd_event_source *timerSource; | ||
struct sd_event_source *deferSource; | ||
int fd; | ||
short flags; | ||
short state; | ||
} redisLibsdeventEvents; | ||
|
||
static void redisLibsdeventDestroy(redisLibsdeventEvents *e) { | ||
if(e->fdSource) { | ||
e->fdSource = sd_event_source_disable_unref(e->fdSource); | ||
} | ||
if(e->timerSource) { | ||
e->timerSource = sd_event_source_disable_unref(e->timerSource); | ||
} | ||
if(e->deferSource) { | ||
e->deferSource = sd_event_source_disable_unref(e->deferSource); | ||
} | ||
sd_event_unref(e->event); | ||
hi_free(e); | ||
} | ||
|
||
static int redisLibsdeventTimeoutHandler(sd_event_source *s, uint64_t usec, void *userdata) { | ||
((void)s); | ||
((void)usec); | ||
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; | ||
redisAsyncHandleTimeout(e->context); | ||
return 0; | ||
} | ||
|
||
static int redisLibsdeventHandler(sd_event_source *s, int fd, uint32_t event, void *userdata) { | ||
((void)s); | ||
((void)fd); | ||
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; | ||
e->state |= REDIS_LIBSDEVENT_ENTERED; | ||
|
||
#define CHECK_DELETED() if (e->state & REDIS_LIBSDEVENT_DELETED) {\ | ||
redisLibsdeventDestroy(e);\ | ||
return 0; \ | ||
} | ||
|
||
if ((event & EPOLLIN) && e->context && (e->state & REDIS_LIBSDEVENT_DELETED) == 0) { | ||
redisAsyncHandleRead(e->context); | ||
CHECK_DELETED(); | ||
} | ||
|
||
if ((event & EPOLLOUT) && e->context && (e->state & REDIS_LIBSDEVENT_DELETED) == 0) { | ||
redisAsyncHandleWrite(e->context); | ||
CHECK_DELETED(); | ||
} | ||
|
||
e->state &= ~REDIS_LIBSDEVENT_ENTERED; | ||
#undef CHECK_DELETED | ||
|
||
return 0; | ||
} | ||
|
||
static void redisLibsdeventAddRead(void *userdata) { | ||
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; | ||
|
||
if (e->flags & EPOLLIN) { | ||
return; | ||
} | ||
|
||
e->flags |= EPOLLIN; | ||
|
||
if(e->flags & EPOLLOUT) { | ||
sd_event_source_set_io_events(e->fdSource, e->flags); | ||
} else { | ||
sd_event_add_io(e->event, &e->fdSource, e->fd, e->flags, redisLibsdeventHandler, e); | ||
} | ||
} | ||
|
||
static void redisLibsdeventDelRead(void *userdata) { | ||
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; | ||
|
||
e->flags &= ~EPOLLIN; | ||
|
||
if(e->flags) { | ||
sd_event_source_set_io_events(e->fdSource, e->flags); | ||
} else { | ||
e->fdSource = sd_event_source_disable_unref(e->fdSource); | ||
} | ||
} | ||
|
||
static void redisLibsdeventAddWrite(void *userdata) { | ||
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; | ||
|
||
if (e->flags & EPOLLOUT) { | ||
return; | ||
} | ||
|
||
e->flags |= EPOLLOUT; | ||
|
||
if(e->flags & EPOLLIN) { | ||
sd_event_source_set_io_events(e->fdSource, e->flags); | ||
} else { | ||
sd_event_add_io(e->event, &e->fdSource, e->fd, e->flags, redisLibsdeventHandler, e); | ||
} | ||
} | ||
|
||
static void redisLibsdeventDelWrite(void *userdata) { | ||
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; | ||
|
||
e->flags &= ~EPOLLOUT; | ||
|
||
if(e->flags) { | ||
sd_event_source_set_io_events(e->fdSource, e->flags); | ||
} else { | ||
e->fdSource = sd_event_source_disable_unref(e->fdSource); | ||
} | ||
} | ||
|
||
static void redisLibsdeventCleanup(void *userdata) { | ||
redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; | ||
|
||
if(!e) { | ||
return; | ||
} | ||
|
||
if (e->state & REDIS_LIBSDEVENT_ENTERED) { | ||
e->state |= REDIS_LIBSDEVENT_DELETED; | ||
} else { | ||
redisLibsdeventDestroy(e); | ||
} | ||
} | ||
|
||
static void redisLibsdeventSetTimeout(void *userdata, struct timeval tv) { | ||
redisLibsdeventEvents *e = (redisLibsdeventEvents *)userdata; | ||
|
||
uint64_t usec = tv.tv_sec * 1000000 + tv.tv_usec; | ||
if(!e->timerSource) { | ||
sd_event_add_time_relative(e->event, &e->timerSource, CLOCK_MONOTONIC, usec, 1, redisLibsdeventTimeoutHandler, e); | ||
} else { | ||
sd_event_source_set_time_relative(e->timerSource, usec); | ||
} | ||
} | ||
|
||
static int redisLibsdeventAttach(redisAsyncContext *ac, struct sd_event *event) { | ||
redisContext *c = &(ac->c); | ||
redisLibsdeventEvents *e; | ||
|
||
/* Nothing should be attached when something is already attached */ | ||
if (ac->ev.data != NULL) | ||
return REDIS_ERR; | ||
|
||
/* Create container for context and r/w events */ | ||
e = (redisLibsdeventEvents*)hi_calloc(1, sizeof(*e)); | ||
if (e == NULL) | ||
return REDIS_ERR; | ||
|
||
/* Initialize and increase event refcount */ | ||
e->context = ac; | ||
e->event = event; | ||
e->fd = c->fd; | ||
sd_event_ref(event); | ||
|
||
/* Register functions to start/stop listening for events */ | ||
ac->ev.addRead = redisLibsdeventAddRead; | ||
ac->ev.delRead = redisLibsdeventDelRead; | ||
ac->ev.addWrite = redisLibsdeventAddWrite; | ||
ac->ev.delWrite = redisLibsdeventDelWrite; | ||
ac->ev.cleanup = redisLibsdeventCleanup; | ||
ac->ev.scheduleTimer = redisLibsdeventSetTimeout; | ||
ac->ev.data = e; | ||
|
||
return REDIS_OK; | ||
} | ||
#endif |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
#include <stdio.h> | ||
#include <stdlib.h> | ||
#include <string.h> | ||
#include <signal.h> | ||
|
||
#include <hiredis.h> | ||
#include <async.h> | ||
#include <adapters/libsdevent.h> | ||
|
||
void debugCallback(redisAsyncContext *c, void *r, void *privdata) { | ||
(void)privdata; //unused | ||
redisReply *reply = r; | ||
if (reply == NULL) { | ||
/* The DEBUG SLEEP command will almost always fail, because we have set a 1 second timeout */ | ||
printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error"); | ||
return; | ||
} | ||
/* Disconnect after receiving the reply of DEBUG SLEEP (which will not)*/ | ||
redisAsyncDisconnect(c); | ||
} | ||
|
||
void getCallback(redisAsyncContext *c, void *r, void *privdata) { | ||
redisReply *reply = r; | ||
if (reply == NULL) { | ||
printf("`GET key` error: %s\n", c->errstr ? c->errstr : "unknown error"); | ||
return; | ||
} | ||
printf("`GET key` result: argv[%s]: %s\n", (char*)privdata, reply->str); | ||
|
||
/* start another request that demonstrate timeout */ | ||
redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %f", 1.5); | ||
} | ||
|
||
void connectCallback(const redisAsyncContext *c, int status) { | ||
if (status != REDIS_OK) { | ||
printf("connect error: %s\n", c->errstr); | ||
return; | ||
} | ||
printf("Connected...\n"); | ||
} | ||
|
||
void disconnectCallback(const redisAsyncContext *c, int status) { | ||
if (status != REDIS_OK) { | ||
printf("disconnect because of error: %s\n", c->errstr); | ||
return; | ||
} | ||
printf("Disconnected...\n"); | ||
} | ||
|
||
int main (int argc, char **argv) { | ||
signal(SIGPIPE, SIG_IGN); | ||
|
||
struct sd_event *event; | ||
sd_event_default(&event); | ||
|
||
redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); | ||
if (c->err) { | ||
printf("Error: %s\n", c->errstr); | ||
redisAsyncFree(c); | ||
return 1; | ||
} | ||
|
||
redisLibsdeventAttach(c,event); | ||
redisAsyncSetConnectCallback(c,connectCallback); | ||
redisAsyncSetDisconnectCallback(c,disconnectCallback); | ||
redisAsyncSetTimeout(c, (struct timeval){ .tv_sec = 1, .tv_usec = 0}); | ||
|
||
/* | ||
In this demo, we first `set key`, then `get key` to demonstrate the basic usage of libuv adapter. | ||
Then in `getCallback`, we start a `debug sleep` command to create 1.5 second long request. | ||
Because we have set a 1 second timeout to the connection, the command will always fail with a | ||
timeout error, which is shown in the `debugCallback`. | ||
*/ | ||
|
||
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); | ||
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); | ||
|
||
/* sd-event does not quit when there are no handlers registered. Manually exit after 1.5 seconds */ | ||
sd_event_source *s; | ||
sd_event_add_time_relative(event, &s, CLOCK_MONOTONIC, 1500000, 1, NULL, NULL); | ||
|
||
sd_event_loop(event); | ||
sd_event_source_disable_unref(s); | ||
sd_event_unref(event); | ||
return 0; | ||
} |