Skip to content

Commit

Permalink
Add ValkeyModuleCtx *module_ctx parameter to all scripting engine c…
Browse files Browse the repository at this point in the history
…allback functions

Signed-off-by: Ricardo Dias <ricardo.dias@percona.com>
  • Loading branch information
rjd15372 committed Dec 19, 2024
1 parent fcc256f commit e21eb99
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 65 deletions.
45 changes: 36 additions & 9 deletions src/function_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ typedef struct loadCtx {
size_t timeout;
} loadCtx;

static void luaEngineFreeFunction(engineCtx *engine_ctx,
static void luaEngineFreeFunction(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
void *compiled_function);

/* Hook for FUNCTION LOAD execution.
Expand All @@ -89,13 +90,18 @@ static void luaEngineLoadHook(lua_State *lua, lua_Debug *ar) {
}
}

static void freeCompiledFunc(luaEngineCtx *lua_engine_ctx, void *compiled_func) {
static void freeCompiledFunc(ValkeyModuleCtx *module_ctx,
luaEngineCtx *lua_engine_ctx,
void *compiled_func) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);

compiledFunction *func = compiled_func;
decrRefCount(func->name);
if (func->desc) {
decrRefCount(func->desc);
}
luaEngineFreeFunction(lua_engine_ctx, func->function);
luaEngineFreeFunction(module_ctx, lua_engine_ctx, func->function);
zfree(func);
}

Expand All @@ -110,11 +116,15 @@ static void freeCompiledFunc(luaEngineCtx *lua_engine_ctx, void *compiled_func)
*
* Return NULL on compilation error and set the error to the err variable
*/
static compiledFunction **luaEngineCreate(engineCtx *engine_ctx,
static compiledFunction **luaEngineCreate(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
char **err) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);

compiledFunction **compiled_functions = NULL;
luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
Expand Down Expand Up @@ -153,7 +163,7 @@ static compiledFunction **luaEngineCreate(engineCtx *engine_ctx,
listIter *iter = listGetIterator(load_ctx.functions, AL_START_HEAD);
listNode *node = NULL;
while ((node = listNext(iter)) != NULL) {
freeCompiledFunc(lua_engine_ctx, listNodeValue(node));
freeCompiledFunc(module_ctx, lua_engine_ctx, listNodeValue(node));
}
listReleaseIterator(iter);
listRelease(load_ctx.functions);
Expand Down Expand Up @@ -198,6 +208,7 @@ static void luaEngineCall(ValkeyModuleCtx *module_ctx,
size_t nkeys,
robj **args,
size_t nargs) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);

luaEngineCtx *lua_engine_ctx = engine_ctx;
Expand All @@ -217,22 +228,38 @@ static void luaEngineCall(ValkeyModuleCtx *module_ctx,
lua_pop(lua, 1); /* Pop error handler */
}

static size_t luaEngineGetUsedMemoy(engineCtx *engine_ctx) {
static size_t luaEngineGetUsedMemoy(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);

luaEngineCtx *lua_engine_ctx = engine_ctx;
return luaMemory(lua_engine_ctx->lua);
}

static size_t luaEngineFunctionMemoryOverhead(void *compiled_function) {
static size_t luaEngineFunctionMemoryOverhead(ValkeyModuleCtx *module_ctx,
void *compiled_function) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);

return zmalloc_size(compiled_function);
}

static size_t luaEngineMemoryOverhead(engineCtx *engine_ctx) {
static size_t luaEngineMemoryOverhead(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);

luaEngineCtx *lua_engine_ctx = engine_ctx;
return zmalloc_size(lua_engine_ctx);
}

static void luaEngineFreeFunction(engineCtx *engine_ctx,
static void luaEngineFreeFunction(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
void *compiled_function) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);

luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
luaFunctionCtx *f_ctx = compiled_function;
Expand Down
37 changes: 17 additions & 20 deletions src/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ static size_t functionMallocSize(functionInfo *fi) {
return zmalloc_size(fi) +
sdsAllocSize(fi->name) +
(fi->desc ? sdsAllocSize(fi->desc) : 0) +
fi->li->ei->engine->get_function_memory_overhead(fi->function);
fi->li->ei->engine->get_function_memory_overhead(fi->li->ei->module_ctx, fi->function);
}

static size_t libraryMallocSize(functionLibInfo *li) {
Expand All @@ -145,7 +145,9 @@ static void engineFunctionDispose(void *obj) {
sdsfree(fi->desc);
}
engine *engine = fi->li->ei->engine;
engine->free_function(engine->engine_ctx, fi->function);
engine->free_function(fi->li->ei->module_ctx,
engine->engine_ctx,
fi->function);
zfree(fi);
}

Expand Down Expand Up @@ -469,7 +471,7 @@ int functionsRegisterEngine(const char *engine_name,
*ei = (engineInfo){
.name = engine_name_sds,
.engineModule = engine_module,
.module_ctx = engine_module ? moduleAllocateContext() : NULL,
.module_ctx = engine_module ? moduleAllocateScriptingEngineContext(engine_module) : NULL,
.engine = eng,
.c = c,
};
Expand All @@ -480,7 +482,7 @@ int functionsRegisterEngine(const char *engine_name,

engine_cache_memory += zmalloc_size(ei) + sdsAllocSize(ei->name) +
zmalloc_size(eng) +
eng->get_engine_memory_overhead(eng->engine_ctx);
eng->get_engine_memory_overhead(ei->module_ctx, eng->engine_ctx);

return C_OK;
}
Expand Down Expand Up @@ -515,6 +517,7 @@ int functionsUnregisterEngine(const char *engine_name) {
freeClient(ei->c);
if (ei->engineModule != NULL) {
serverAssert(ei->module_ctx != NULL);
moduleFreeContext(ei->module_ctx);
zfree(ei->module_ctx);
}
zfree(ei);
Expand Down Expand Up @@ -811,12 +814,6 @@ static void fcallCommandGeneric(client *c, int ro) {
scriptRunCtx run_ctx;
if (scriptPrepareForRun(&run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK) return;

if (fi->li->ei->engineModule != NULL) {
moduleScriptingEngineInitContext(fi->li->ei->module_ctx,
fi->li->ei->engineModule,
run_ctx.original_client);
}

engine->call(fi->li->ei->module_ctx,
engine->engine_ctx,
&run_ctx,
Expand All @@ -826,10 +823,6 @@ static void fcallCommandGeneric(client *c, int ro) {
c->argv + 3 + numkeys,
c->argc - 3 - numkeys);
scriptResetRun(&run_ctx);

if (fi->li->ei->engineModule != NULL) {
moduleFreeContext(fi->li->ei->module_ctx);
}
}

/*
Expand Down Expand Up @@ -1128,7 +1121,7 @@ void functionFreeLibMetaData(functionsLibMetaData *md) {
if (md->engine) sdsfree(md->engine);
}

static void freeCompiledFunctions(engine *engine,
static void freeCompiledFunctions(engineInfo *ei,
compiledFunction **compiled_functions,
size_t num_compiled_functions,
size_t free_function_from_idx) {
Expand All @@ -1139,7 +1132,9 @@ static void freeCompiledFunctions(engine *engine,
decrRefCount(func->desc);
}
if (i >= free_function_from_idx) {
engine->free_function(engine->engine_ctx, func->function);
ei->engine->free_function(ei->module_ctx,
ei->engine->engine_ctx,
func->function);
}
zfree(func);
}
Expand Down Expand Up @@ -1188,7 +1183,8 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC
size_t num_compiled_functions = 0;
char *compile_error = NULL;
compiledFunction **compiled_functions =
engine->create(engine->engine_ctx,
engine->create(ei->module_ctx,
engine->engine_ctx,
md.code,
timeout,
&num_compiled_functions,
Expand All @@ -1210,15 +1206,15 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC
func->f_flags,
err);
if (ret == C_ERR) {
freeCompiledFunctions(engine,
freeCompiledFunctions(ei,
compiled_functions,
num_compiled_functions,
i);
goto error;
}
}

freeCompiledFunctions(engine,
freeCompiledFunctions(ei,
compiled_functions,
num_compiled_functions,
num_compiled_functions);
Expand Down Expand Up @@ -1310,7 +1306,8 @@ unsigned long functionsMemory(void) {
while ((entry = dictNext(iter))) {
engineInfo *ei = dictGetVal(entry);
engine *engine = ei->engine;
engines_memory += engine->get_used_memory(engine->engine_ctx);
engines_memory += engine->get_used_memory(ei->module_ctx,
engine->engine_ctx);
}
dictReleaseIterator(iter);

Expand Down
14 changes: 10 additions & 4 deletions src/functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ typedef struct engine {
*
* Returns NULL on error and set err to be the error message */
compiledFunction **(*create)(
ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
const char *code,
size_t timeout,
Expand All @@ -91,18 +92,23 @@ typedef struct engine {
size_t nargs);

/* get current used memory by the engine */
size_t (*get_used_memory)(engineCtx *engine_ctx);
size_t (*get_used_memory)(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx);

/* Return memory overhead for a given function,
* such memory is not counted as engine memory but as general
* structs memory that hold different information */
size_t (*get_function_memory_overhead)(void *compiled_function);
size_t (*get_function_memory_overhead)(ValkeyModuleCtx *module_ctx,
void *compiled_function);

/* Return memory overhead for engine (struct size holding the engine)*/
size_t (*get_engine_memory_overhead)(engineCtx *engine_ctx);
size_t (*get_engine_memory_overhead)(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx);

/* free the given function */
void (*free_function)(engineCtx *engine_ctx, void *compiled_function);
void (*free_function)(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
void *compiled_function);
} engine;

/* Hold information about an engine.
Expand Down
24 changes: 7 additions & 17 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -880,13 +880,13 @@ void moduleCallCommandUnblockedHandler(client *c) {
moduleReleaseTempClient(c);
}

/* Allocates the memory necessary to hold the ValkeyModuleCtx structure, and
* returns the pointer to the allocated memory.
*
* Used by the scripting engines implementation to cache the context structure.
*/
ValkeyModuleCtx *moduleAllocateContext(void) {
return (ValkeyModuleCtx *)zcalloc(sizeof(ValkeyModuleCtx));
/* Allocates the memory necessary to hold the ValkeyModuleCtx structure,
* initializes the context for use by the server to call scripting engines
* callback functions, and returns the pointer to the allocated memory. */
ValkeyModuleCtx *moduleAllocateScriptingEngineContext(ValkeyModule *module) {
ValkeyModuleCtx *ctx = (ValkeyModuleCtx *)zcalloc(sizeof(ValkeyModuleCtx));
moduleCreateContext(ctx, module, VALKEYMODULE_CTX_NEW_CLIENT);
return ctx;
}

/* Create a module ctx and keep track of the nesting level.
Expand Down Expand Up @@ -931,16 +931,6 @@ void moduleCreateContext(ValkeyModuleCtx *out_ctx, ValkeyModule *module, int ctx
}
}

/* Initialize a module context to be used by scripting engines callback
* functions.
*/
void moduleScriptingEngineInitContext(ValkeyModuleCtx *out_ctx,
ValkeyModule *module,
client *client) {
moduleCreateContext(out_ctx, module, VALKEYMODULE_CTX_NONE);
out_ctx->client = client;
}

/* This command binds the normal command invocation with commands
* exported by modules. */
void ValkeyModuleCommandDispatcher(client *c) {
Expand Down
6 changes: 1 addition & 5 deletions src/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@

typedef struct ValkeyModuleCtx ValkeyModuleCtx;
typedef struct ValkeyModule ValkeyModule;
typedef struct client client;

ValkeyModuleCtx *moduleAllocateContext(void);
void moduleScriptingEngineInitContext(ValkeyModuleCtx *out_ctx,
ValkeyModule *module,
client *client);
ValkeyModuleCtx *moduleAllocateScriptingEngineContext(ValkeyModule *module);
void moduleFreeContext(ValkeyModuleCtx *ctx);

#endif /* _MODULE_H_ */
5 changes: 5 additions & 0 deletions src/valkeymodule.h
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ typedef struct ValkeyModuleScriptingEngineCallResult {
} ValkeyModuleScriptingEngineCallResult;

typedef ValkeyModuleScriptingEngineCompiledFunction **(*ValkeyModuleScriptingEngineCreateFunctionsLibraryFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
const char *code,
size_t timeout,
Expand All @@ -892,15 +893,19 @@ typedef void (*ValkeyModuleScriptingEngineCallFunctionFunc)(
size_t nargs);

typedef size_t (*ValkeyModuleScriptingEngineGetUsedMemoryFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx);

typedef size_t (*ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc)(
ValkeyModuleCtx *module_ctx,
void *compiled_function);

typedef size_t (*ValkeyModuleScriptingEngineGetEngineMemoryOverheadFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx);

typedef void (*ValkeyModuleScriptingEngineFreeFunctionFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
void *compiled_function);

Expand Down
Loading

0 comments on commit e21eb99

Please sign in to comment.