Skip to content

Commit

Permalink
Resource Interpreter implements the interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: zhangyukun <38148677+jameszhangyukun@users.noreply.github.com>
  • Loading branch information
jameszhangyukun committed Nov 15, 2022
1 parent e7c9cd0 commit 8c42e34
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ func (a *resourceCustomAccessor) GetRetentionLuaScript() string {
}

func (a *resourceCustomAccessor) GetReplicaResourceLuaScript() string {
return a.replicaRevision.LuaScript
return a.replicaResource.LuaScript
}

func (a *resourceCustomAccessor) GetReplicaRevisionLuaScript() string {
return a.replicaResource.LuaScript
return a.replicaRevision.LuaScript
}

func (a *resourceCustomAccessor) GetStatusReflectionLuaScript() string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var resourceInterpreterCustomizationsGVR = schema.GroupVersionResource{

// ConfigManager can list custom resource interpreter.
type ConfigManager interface {
LuaScriptAccessors() map[schema.GroupVersionKind]LuaScriptAccessor
LuaScriptAccessors() map[schema.GroupVersionKind]CustomAccessor
HasSynced() bool
}

Expand All @@ -36,8 +36,8 @@ type interpreterConfigManager struct {
}

// LuaScriptAccessors returns all cached configurations.
func (configManager *interpreterConfigManager) LuaScriptAccessors() map[schema.GroupVersionKind]LuaScriptAccessor {
return configManager.configuration.Load().(map[schema.GroupVersionKind]LuaScriptAccessor)
func (configManager *interpreterConfigManager) LuaScriptAccessors() map[schema.GroupVersionKind]CustomAccessor {
return configManager.configuration.Load().(map[schema.GroupVersionKind]CustomAccessor)
}

// HasSynced returns true when the cache is synced.
Expand All @@ -46,7 +46,7 @@ func (configManager *interpreterConfigManager) HasSynced() bool {
return true
}

if configManager.HasSynced() {
if configuration, err := configManager.lister.List(labels.Everything()); err == nil && len(configuration) == 0 {
configManager.initialSynced.Store(true)
return true
}
Expand All @@ -61,7 +61,7 @@ func NewInterpreterConfigManager(inform genericmanager.SingleClusterInformerMana
initialSynced: &atomic.Value{},
configuration: &atomic.Value{},
}
manager.configuration.Store(make(map[schema.GroupVersionKind]LuaScriptAccessor))
manager.configuration.Store(make(map[schema.GroupVersionKind]CustomAccessor))
manager.initialSynced.Store(false)
configHandlers := fedinformer.NewHandlerOnEvents(
func(_ interface{}) { manager.updateConfiguration() },
Expand All @@ -88,7 +88,6 @@ func (configManager *interpreterConfigManager) updateConfiguration() {
key := schema.FromAPIVersionAndKind(config.Spec.Target.APIVersion, config.Spec.Target.Kind)
configs[key] = NewResourceCustomAccessorAccessor(config)
}

configManager.configuration.Store(configs)
configManager.initialSynced.Store(true)
}
177 changes: 80 additions & 97 deletions pkg/resourceinterpreter/configurableinterpreter/configurable.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package configurableinterpreter

import (
"fmt"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -11,6 +9,7 @@ import (
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/resourceinterpreter/configurableinterpreter/configmanager"
"github.com/karmada-io/karmada/pkg/resourceinterpreter/configurableinterpreter/luavm"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
)

Expand All @@ -30,128 +29,112 @@ func NewConfigurableInterpreter(informer genericmanager.SingleClusterInformerMan

// HookEnabled tells if any hook exist for specific resource gvk and operation type.
func (c *ConfigurableInterpreter) HookEnabled(kind schema.GroupVersionKind, operationType configv1alpha1.InterpreterOperation) bool {
if !c.configManager.HasSynced() {
klog.Errorf("not yet ready to handle request")
return false
}
accessors, exist := c.configManager.LuaScriptAccessors()[kind]
if !exist {
return false
}
switch operationType {
case configv1alpha1.InterpreterOperationAggregateStatus:
return len(accessors.GetStatusAggregationLuaScript()) > 0
case configv1alpha1.InterpreterOperationInterpretHealth:
return len(accessors.GetHealthInterpretationLuaScript()) > 0
case configv1alpha1.InterpreterOperationInterpretDependency:
return len(accessors.GetDependencyInterpretationLuaScript()) > 0
case configv1alpha1.InterpreterOperationInterpretReplica:
return len(accessors.GetReplicaResourceLuaScript()) > 0
case configv1alpha1.InterpreterOperationInterpretStatus:
return len(accessors.GetStatusReflectionLuaScript()) > 0
case configv1alpha1.InterpreterOperationRetain:
return len(accessors.GetRetentionLuaScript()) > 0
case configv1alpha1.InterpreterOperationReviseReplica:
return len(accessors.GetReplicaRevisionLuaScript()) > 0
}
return false
_, exist := c.getInterpreter(kind, operationType)
return exist
}

// GetReplicas returns the desired replicas of the object as well as the requirements of each replica.
func (c *ConfigurableInterpreter) GetReplicas(object *unstructured.Unstructured) (int32, *workv1alpha2.ReplicaRequirements, error) {
klog.Infof("ConfigurableInterpreter Execute ReviseReplica")
customAccessor := c.configManager.LuaScriptAccessors()[object.GroupVersionKind()]
if len(customAccessor.GetReplicaResourceLuaScript()) == 0 {
return 0, nil, fmt.Errorf("customized interpreter operation GetReplicas for %q not found", object.GroupVersionKind())
func (c *ConfigurableInterpreter) GetReplicas(object *unstructured.Unstructured) (replicas int32, requires *workv1alpha2.ReplicaRequirements, enabled bool, err error) {
luaScript, enabled := c.getInterpreter(object.GroupVersionKind(), configv1alpha1.InterpreterOperationInterpretReplica)
if !enabled {
return
}

luaScript := customAccessor.GetReplicaResourceLuaScript
klog.Infof("lua script %s", luaScript)

return 0, nil, nil
vm := luavm.VM{UseOpenLibs: false}
replicas, requires, err = vm.GetReplicas(object, luaScript)
return
}

// ReviseReplica revises the replica of the given object.
func (c *ConfigurableInterpreter) ReviseReplica(object *unstructured.Unstructured, replica int64) (*unstructured.Unstructured, error) {
klog.Infof("ConfigurableInterpreter Execute ReviseReplica")
customAccessor := c.configManager.LuaScriptAccessors()[object.GroupVersionKind()]
if len(customAccessor.GetReplicaRevisionLuaScript()) == 0 {
return nil, fmt.Errorf("customized interpreter operation ReviseReplica for %q not found", object.GroupVersionKind())
func (c *ConfigurableInterpreter) ReviseReplica(object *unstructured.Unstructured, replica int64) (revised *unstructured.Unstructured, enabled bool, err error) {
luaScript, enabled := c.getInterpreter(object.GroupVersionKind(), configv1alpha1.InterpreterOperationReviseReplica)
if !enabled {
return
}

luaScript := customAccessor.GetReplicaRevisionLuaScript()
klog.Infof("lua script %s", luaScript)

return nil, nil
vm := luavm.VM{UseOpenLibs: false}
revised, err = vm.ReviseReplica(object, replica, luaScript)
return
}

// Retain returns the objects that based on the "desired" object but with values retained from the "observed" object.
func (c *ConfigurableInterpreter) Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (retained *unstructured.Unstructured, err error) {
klog.Infof("ConfigurableInterpreter Execute Retain")
customAccessor := c.configManager.LuaScriptAccessors()[desired.GroupVersionKind()]
if len(customAccessor.GetRetentionLuaScript()) == 0 {
return nil, fmt.Errorf("customized interpreter operation Retain for %q not found", desired.GroupVersionKind())
func (c *ConfigurableInterpreter) Retain(desired *unstructured.Unstructured, observed *unstructured.Unstructured) (retained *unstructured.Unstructured, enabled bool, err error) {
luaScript, enabled := c.getInterpreter(desired.GroupVersionKind(), configv1alpha1.InterpreterOperationRetain)
if !enabled {
return
}

luaScript := customAccessor.GetRetentionLuaScript()
klog.Infof("lua script %s", luaScript)

return nil, err
vm := luavm.VM{UseOpenLibs: false}
retained, err = vm.Retain(desired, observed, luaScript)
return
}

// AggregateStatus returns the objects that based on the 'object' but with status aggregated.
func (c *ConfigurableInterpreter) AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
klog.Infof("ConfigurableInterpreter Execute AggregateStatus")
customAccessor := c.configManager.LuaScriptAccessors()[object.GroupVersionKind()]
if len(customAccessor.GetStatusAggregationLuaScript()) == 0 {
return nil, fmt.Errorf("customized interpreter AggregateStatus Retain for %q not found", object.GroupVersionKind())
func (c *ConfigurableInterpreter) AggregateStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (status *unstructured.Unstructured, enabled bool, err error) {
luaScript, enabled := c.getInterpreter(object.GroupVersionKind(), configv1alpha1.InterpreterOperationAggregateStatus)
if !enabled {
return
}

luaScript := customAccessor.GetStatusAggregationLuaScript()
klog.Infof("lua script %s", luaScript)

return nil, nil
vm := luavm.VM{UseOpenLibs: false}
status, err = vm.AggregateStatus(object, aggregatedStatusItems, luaScript)
return
}

// GetDependencies returns the dependent resources of the given object.
func (c *ConfigurableInterpreter) GetDependencies(object *unstructured.Unstructured) (dependencies []configv1alpha1.DependentObjectReference, err error) {
klog.Infof("ConfigurableInterpreter Execute GetDependencies")

customAccessor := c.configManager.LuaScriptAccessors()[object.GroupVersionKind()]
if len(customAccessor.GetDependencyInterpretationLuaScript()) == 0 {
return nil, fmt.Errorf("customized interpreter GetDependencies Retain for %q not found", object.GroupVersionKind())
func (c *ConfigurableInterpreter) GetDependencies(object *unstructured.Unstructured) (dependencies []configv1alpha1.DependentObjectReference, enabled bool, err error) {
luaScript, enabled := c.getInterpreter(object.GroupVersionKind(), configv1alpha1.InterpreterOperationInterpretDependency)
if !enabled {
return
}

luaScript := customAccessor.GetDependencyInterpretationLuaScript()
klog.Infof("lua script %s", luaScript)

return nil, err
vm := luavm.VM{UseOpenLibs: false}
dependencies, err = vm.GetDependencies(object, luaScript)
return
}

// ReflectStatus returns the status of the object.
func (c *ConfigurableInterpreter) ReflectStatus(object *unstructured.Unstructured) (status *runtime.RawExtension, err error) {
klog.Infof("ConfigurableInterpreter Execute ReflectStatus")
customAccessor := c.configManager.LuaScriptAccessors()[object.GroupVersionKind()]
if len(customAccessor.GetStatusAggregationLuaScript()) == 0 {
return nil, fmt.Errorf("customized interpreter GetDependencies Retain for %q not found", object.GroupVersionKind())
func (c *ConfigurableInterpreter) ReflectStatus(object *unstructured.Unstructured) (status *runtime.RawExtension, enabled bool, err error) {
luaScript, enabled := c.getInterpreter(object.GroupVersionKind(), configv1alpha1.InterpreterOperationInterpretStatus)
if !enabled {
return
}

luaScript := customAccessor.GetStatusAggregationLuaScript()
klog.Infof("lua script %s", luaScript)

return nil, err
vm := luavm.VM{UseOpenLibs: false}
status, err = vm.ReflectStatus(object, luaScript)
return
}

// InterpretHealth returns the health state of the object.
func (c *ConfigurableInterpreter) InterpretHealth(object *unstructured.Unstructured) (bool, error) {
klog.Infof("ConfigurableInterpreter Execute InterpretHealth")
customAccessor := c.configManager.LuaScriptAccessors()[object.GroupVersionKind()]
if len(customAccessor.GetHealthInterpretationLuaScript()) == 0 {
return false, fmt.Errorf("customized interpreter GetHealthInterpretation for %q not found", object.GroupVersionKind())
func (c *ConfigurableInterpreter) InterpretHealth(object *unstructured.Unstructured) (health bool, enabled bool, err error) {
luaScript, enabled := c.getInterpreter(object.GroupVersionKind(), configv1alpha1.InterpreterOperationInterpretHealth)
if !enabled {
return
}
vm := luavm.VM{UseOpenLibs: false}
health, err = vm.InterpretHealth(object, luaScript)
return
}

luaScript := customAccessor.GetHealthInterpretationLuaScript()
klog.Infof("lua script %s", luaScript)

return false, nil
func (c *ConfigurableInterpreter) getInterpreter(kind schema.GroupVersionKind, operationType configv1alpha1.InterpreterOperation) (string, bool) {
if !c.configManager.HasSynced() {
klog.Errorf("not yet ready to handle request")
return "", false
}
accessors, exist := c.configManager.LuaScriptAccessors()[kind]
if !exist {
return "", false
}
var script string
switch operationType {
case configv1alpha1.InterpreterOperationAggregateStatus:
script = accessors.GetStatusAggregationLuaScript()
case configv1alpha1.InterpreterOperationInterpretHealth:
script = accessors.GetHealthInterpretationLuaScript()
case configv1alpha1.InterpreterOperationInterpretDependency:
script = accessors.GetDependencyInterpretationLuaScript()
case configv1alpha1.InterpreterOperationInterpretReplica:
script = accessors.GetReplicaResourceLuaScript()
case configv1alpha1.InterpreterOperationInterpretStatus:
script = accessors.GetStatusReflectionLuaScript()
case configv1alpha1.InterpreterOperationRetain:
script = accessors.GetRetentionLuaScript()
case configv1alpha1.InterpreterOperationReviseReplica:
script = accessors.GetReplicaRevisionLuaScript()
}
return script, len(script) > 0
}
28 changes: 9 additions & 19 deletions pkg/resourceinterpreter/configurableinterpreter/luavm/lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (vm VM) Retain(desired *unstructured.Unstructured, observed *unstructured.U
}

// AggregateStatus returns the objects that based on the 'object' but with status aggregated by lua.
func (vm VM) AggregateStatus(object *unstructured.Unstructured, item []map[string]interface{}, script string) (*unstructured.Unstructured, error) {
func (vm VM) AggregateStatus(object *unstructured.Unstructured, items []workv1alpha2.AggregatedStatusItem, script string) (*unstructured.Unstructured, error) {
l := lua.NewState(lua.Options{
SkipOpenLibs: !vm.UseOpenLibs,
})
Expand Down Expand Up @@ -245,7 +245,7 @@ func (vm VM) AggregateStatus(object *unstructured.Unstructured, item []map[strin
if err != nil {
return nil, err
}
args[1], err = decodeValue(l, item)
args[1], err = decodeValue(l, items)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -344,7 +344,7 @@ func (vm VM) ReflectStatus(object *unstructured.Unstructured, script string) (st
if err != nil {
return
}
err = l.CallByParam(lua.P{Fn: f, NRet: 2, Protect: true}, args...)
err = l.CallByParam(lua.P{Fn: f, NRet: 1, Protect: true}, args...)
if err != nil {
return nil, err
}
Expand All @@ -354,26 +354,16 @@ func (vm VM) ReflectStatus(object *unstructured.Unstructured, script string) (st
return nil, fmt.Errorf("expect the returned replica type is table but got %s", luaStatusResult.Type())
}

luaExistResult := l.Get(l.GetTop())
var exist bool
exist, err = ConvertLuaResultToBool(luaExistResult)
resultMap := make(map[string]interface{})
jsonBytes, err := luajson.Encode(luaStatusResult)
if err != nil {
return nil, err
}

if exist {
resultMap := make(map[string]interface{})
jsonBytes, err := luajson.Encode(luaStatusResult)
if err != nil {
return nil, err
}
err = json.Unmarshal(jsonBytes, &resultMap)
if err != nil {
return nil, err
}
return helper.BuildStatusRawExtension(resultMap)
err = json.Unmarshal(jsonBytes, &resultMap)
if err != nil {
return nil, err
}
return nil, err
return helper.BuildStatusRawExtension(resultMap)
}

// GetDependencies returns the dependent resources of the given object by lua.
Expand Down
21 changes: 4 additions & 17 deletions pkg/resourceinterpreter/configurableinterpreter/luavm/lua_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

func TestGetReplicas(t *testing.T) {
var replicas int32 = 1
// quantity := *resource.NewQuantity(1000, resource.BinarySI)
vm := VM{UseOpenLibs: false}
tests := []struct {
name string
Expand Down Expand Up @@ -190,33 +189,21 @@ func TestAggregateDeploymentStatus(t *testing.T) {
oldObj, _ := helper.ToUnstructured(oldDeploy)
newObj, _ := helper.ToUnstructured(newDeploy)

var aggregateItem []map[string]interface{}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
temp := make(map[string]interface{})
if err := json.Unmarshal(item.Status.Raw, &temp); err != nil {
t.Error(err.Error())
}
aggregateItem = append(aggregateItem, temp)
}

tests := []struct {
name string
curObj *unstructured.Unstructured
aggregatedStatusItems []map[string]interface{}
aggregatedStatusItems []workv1alpha2.AggregatedStatusItem
expectedObj *unstructured.Unstructured
luaScript string
}{
{
name: "Test AggregateDeploymentStatus",
curObj: oldObj,
aggregatedStatusItems: aggregateItem,
aggregatedStatusItems: aggregatedStatusItems,
expectedObj: newObj,
luaScript: `function AggregateStatus(desiredObj, statusItems)
for i = 1, #statusItems do
desiredObj.status.readyReplicas = desiredObj.status.readyReplicas + statusItems[i].readyReplicas
desiredObj.status.readyReplicas = desiredObj.status.readyReplicas + statusItems[i].status.readyReplicas
end
return desiredObj
end`,
Expand Down Expand Up @@ -399,7 +386,7 @@ func TestStatusReflection(t *testing.T) {
if observedObj.status == nil then
return false, nil
end
return true, observedObj.status
return observedObj.status
end`,
},
}
Expand Down
Loading

0 comments on commit 8c42e34

Please sign in to comment.