diff --git a/api/v2/workloadapi/workload_helper.go b/api/v2/workloadapi/workload_helper.go new file mode 100644 index 000000000..effabefae --- /dev/null +++ b/api/v2/workloadapi/workload_helper.go @@ -0,0 +1,27 @@ +/* + * Copyright 2024 The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package workloadapi + +// ResourceName returns the unique key of Workload. +func (x *Workload) ResourceName() string { + return x.Uid +} + +// ResourceName returns the unique key of Service. +func (x *Service) ResourceName() string { + return x.Namespace + "/" + x.Hostname +} diff --git a/daemon/manager/manager.go b/daemon/manager/manager.go index 9a7cd40e0..07839e6eb 100644 --- a/daemon/manager/manager.go +++ b/daemon/manager/manager.go @@ -83,7 +83,7 @@ func Execute(configs *options.BootstrapConfigs) error { log.Info("controller Start successful") defer c.Stop() - statusServer := status.NewServer(c, configs) + statusServer := status.NewServer(c.GetXdsClient(), configs) statusServer.StartServer() defer func() { _ = statusServer.StopServer() diff --git a/pkg/controller/workload/cache/service_cache.go b/pkg/controller/workload/cache/service_cache.go new file mode 100644 index 000000000..e9398c48c --- /dev/null +++ b/pkg/controller/workload/cache/service_cache.go @@ -0,0 +1,64 @@ +/* + * Copyright 2024 The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cache + +import ( + "sync" + + "kmesh.net/kmesh/api/v2/workloadapi" +) + +type ServiceCache interface { + List() []*workloadapi.Service + AddOrUpdateService(svc *workloadapi.Service) + DeleteService(resourceName string) +} + +type serviceCache struct { + mutex sync.RWMutex + // keyed by namespace/hostname->service + servicesByResourceName map[string]*workloadapi.Service +} + +func NewServiceCache() *serviceCache { + return &serviceCache{ + servicesByResourceName: make(map[string]*workloadapi.Service), + } +} + +func (s *serviceCache) AddOrUpdateService(svc *workloadapi.Service) { + s.mutex.Lock() + defer s.mutex.Unlock() + s.servicesByResourceName[svc.ResourceName()] = svc +} + +func (s *serviceCache) DeleteService(resourceName string) { + s.mutex.Lock() + defer s.mutex.Unlock() + delete(s.servicesByResourceName, resourceName) +} + +func (s *serviceCache) List() []*workloadapi.Service { + s.mutex.RLock() + defer s.mutex.RUnlock() + out := make([]*workloadapi.Service, 0, len(s.servicesByResourceName)) + for _, svc := range s.servicesByResourceName { + out = append(out, svc) + } + + return out +} diff --git a/pkg/controller/workload/workload_processor.go b/pkg/controller/workload/workload_processor.go index 868e8bf3d..dcc7d4fc2 100644 --- a/pkg/controller/workload/workload_processor.go +++ b/pkg/controller/workload/workload_processor.go @@ -55,6 +55,7 @@ type Processor struct { Sm *kmeshsecurity.SecretManager nodeName string WorkloadCache cache.WorkloadCache + ServiceCache cache.ServiceCache } type Endpoint struct { @@ -71,6 +72,7 @@ func newProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor { bpf: bpf.NewCache(workloadMap), nodeName: os.Getenv("NODE_NAME"), WorkloadCache: cache.NewWorkloadCache(), + ServiceCache: cache.NewServiceCache(), } } @@ -279,6 +281,7 @@ func (p *Processor) removeServiceResource(resources []string) error { ) for _, name := range resources { + p.ServiceCache.DeleteService(name) serviceId := p.hashName.StrToNum(name) skDelete.ServiceId = serviceId if err = p.bpf.ServiceLookup(&skDelete, &svDelete); err == nil { @@ -575,10 +578,8 @@ func (p *Processor) handleService(service *workloadapi.Service) error { sk = bpf.ServiceKey{} sv = bpf.ServiceValue{} ) - - NamespaceHostname := []string{service.GetNamespace(), service.GetHostname()} - serviceName := strings.Join(NamespaceHostname, "/") - + p.ServiceCache.AddOrUpdateService(service) + serviceName := service.ResourceName() serviceId := p.hashName.StrToNum(serviceName) sk.ServiceId = serviceId // if service has exist, just need update frontend port info diff --git a/pkg/status/api.go b/pkg/status/api.go new file mode 100644 index 000000000..8cbed740a --- /dev/null +++ b/pkg/status/api.go @@ -0,0 +1,152 @@ +/* + * Copyright 2024 The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package status + +import ( + "net" + + "kmesh.net/kmesh/api/v2/workloadapi" +) + +type Workload struct { + Uid string `json:"uid,omitempty"` + Addresses []string `json:"addresses"` + Waypoint *Waypoint `json:"waypoint,omitempty"` + Protocol string `json:"protocol"` + Name string `json:"name"` + Namespace string `json:"namespace"` + ServiceAccount string `json:"serviceAccount"` + WorkloadName string `json:"workloadName"` + WorkloadType string `json:"workloadType"` + CanonicalName string `json:"canonicalName"` + CanonicalRevision string `json:"canonicalRevision"` + ClusterID string `json:"clusterId"` + TrustDomain string `json:"trustDomain,omitempty"` + Locality Locality `json:"locality,omitempty"` + Node string `json:"node"` + Network string `json:"network,omitempty"` + Status string `json:"status"` + ApplicationTunnel ApplicationTunnel `json:"applicationTunnel,omitempty"` +} + +type Locality struct { + Region string `json:"region,omitempty"` + Zone string `json:"zone,omitempty"` + Subzone string `json:"subzone,omitempty"` +} + +type ApplicationTunnel struct { + Protocol string `json:"protocol"` + Port uint32 `json:"port,omitempty"` +} + +type Waypoint struct { + Destination string `json:"destination"` +} + +type LoadBalancer struct { + Mode string `json:"mode"` + RoutingPreferences []string `json:"routingPreferences"` +} + +type Service struct { + Name string `json:"name"` + Namespace string `json:"namespace"` + Hostname string `json:"hostname"` + Addresses []string `json:"vips"` + Ports []*workloadapi.Port `json:"ports"` + LoadBalancer *LoadBalancer `json:"loadBalancer"` + Waypoint *Waypoint `json:"waypoint"` +} + +type NetworkAddress struct { + // Network represents the network this address is on. + Network string + // Address presents the IP (v4 or v6). + Address net.IP +} + +func ConvertWorkload(w *workloadapi.Workload) *Workload { + ips := make([]string, 0, len(w.Addresses)) + for _, addr := range w.Addresses { + ips = append(ips, net.IP(addr).String()) + } + var waypoint string + if waypointAddress := w.Waypoint.GetAddress(); waypointAddress != nil { + waypoint = waypointAddress.Network + "/" + net.IP(waypointAddress.Address).String() + } else if host := w.Waypoint.GetHostname(); host != nil { + waypoint = host.Namespace + "/" + host.Hostname + } + + out := &Workload{ + Uid: w.Uid, + Addresses: ips, + Waypoint: &Waypoint{Destination: waypoint}, + Protocol: w.TunnelProtocol.String(), + Name: w.Name, + Namespace: w.Namespace, + ServiceAccount: w.ServiceAccount, + WorkloadName: w.WorkloadName, + WorkloadType: w.WorkloadType.String(), + CanonicalName: w.CanonicalName, + CanonicalRevision: w.CanonicalRevision, + ClusterID: w.ClusterId, + TrustDomain: w.TrustDomain, + Node: w.Node, + Network: w.Network, + Status: w.Status.String(), + } + if w.Locality != nil { + out.Locality = Locality{Region: w.Locality.Region, Zone: w.Locality.Zone, Subzone: w.Locality.Subzone} + } + if w.ApplicationTunnel != nil { + out.ApplicationTunnel = ApplicationTunnel{Protocol: w.ApplicationTunnel.Protocol.String(), Port: w.ApplicationTunnel.Port} + } + return out +} + +func ConvertService(s *workloadapi.Service) *Service { + vips := make([]string, 0, len(s.Addresses)) + for _, addr := range s.Addresses { + vips = append(vips, addr.Network+"/"+net.IP(addr.Address).String()) + } + var waypoint string + if waypointAddress := s.Waypoint.GetAddress(); waypointAddress != nil { + waypoint = waypointAddress.Network + "/" + net.IP(waypointAddress.Address).String() + } else if host := s.Waypoint.GetHostname(); host != nil { + waypoint = host.Namespace + "/" + host.Hostname + } + + out := &Service{ + Name: s.Name, + Namespace: s.Namespace, + Hostname: s.Hostname, + Addresses: vips, + Ports: s.Ports, + Waypoint: &Waypoint{Destination: waypoint}, + } + + if s.LoadBalancing != nil { + routingPreferences := make([]string, 0, len(s.LoadBalancing.RoutingPreference)) + for _, p := range s.LoadBalancing.RoutingPreference { + routingPreferences = append(routingPreferences, p.String()) + } + out.LoadBalancer = &LoadBalancer{Mode: s.LoadBalancing.Mode.String(), RoutingPreferences: routingPreferences} + } + + return out +} diff --git a/pkg/status/status_server.go b/pkg/status/status_server.go index 5a16a67f0..cdb08128e 100644 --- a/pkg/status/status_server.go +++ b/pkg/status/status_server.go @@ -25,9 +25,9 @@ import ( // nolint "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" adminv2 "kmesh.net/kmesh/api/v2/admin" + "kmesh.net/kmesh/api/v2/workloadapi/security" "kmesh.net/kmesh/daemon/options" "kmesh.net/kmesh/pkg/controller" "kmesh.net/kmesh/pkg/controller/ads" @@ -51,17 +51,17 @@ const ( ) type Server struct { - config *options.BootstrapConfigs - controller *controller.Controller - mux *http.ServeMux - server *http.Server + config *options.BootstrapConfigs + xdsClient *controller.XdsClient + mux *http.ServeMux + server *http.Server } -func NewServer(c *controller.Controller, configs *options.BootstrapConfigs) *Server { +func NewServer(c *controller.XdsClient, configs *options.BootstrapConfigs) *Server { s := &Server{ - config: configs, - controller: c, - mux: http.NewServeMux(), + config: configs, + xdsClient: c, + mux: http.NewServeMux(), } s.server = &http.Server{ Addr: adminAddr, @@ -75,6 +75,7 @@ func NewServer(c *controller.Controller, configs *options.BootstrapConfigs) *Ser s.mux.HandleFunc(patternBpfAdsMaps, s.bpfAdsMaps) s.mux.HandleFunc(patternConfigDumpAds, s.configDumpAds) s.mux.HandleFunc(patternConfigDumpWorkload, s.configDumpWorkload) + // TODO: add dump certificate, authorizationPolicies and services s.mux.HandleFunc(patternReadyProbe, s.readyProbe) @@ -108,7 +109,7 @@ func (s *Server) httpOptions(w http.ResponseWriter, r *http.Request) { } func (s *Server) bpfAdsMaps(w http.ResponseWriter, r *http.Request) { - client := s.controller.GetXdsClient() + client := s.xdsClient if client == nil || client.AdsController == nil { w.WriteHeader(http.StatusBadRequest) fmt.Fprintf(w, "\t%s\n", "invalid ClientMode") @@ -130,7 +131,7 @@ func (s *Server) bpfAdsMaps(w http.ResponseWriter, r *http.Request) { } func (s *Server) configDumpAds(w http.ResponseWriter, r *http.Request) { - client := s.controller.GetXdsClient() + client := s.xdsClient if client == nil || client.AdsController == nil { w.WriteHeader(http.StatusBadRequest) fmt.Fprintf(w, "\t%s\n", "invalid ClientMode") @@ -151,23 +152,40 @@ func (s *Server) configDumpAds(w http.ResponseWriter, r *http.Request) { })) } +type WorkloadDump struct { + Workloads []*Workload + Services []*Service + // TODO: add authorization + Policies []*security.Authorization +} + func (s *Server) configDumpWorkload(w http.ResponseWriter, r *http.Request) { - client := s.controller.GetXdsClient() + client := s.xdsClient if client == nil || client.WorkloadController == nil { w.WriteHeader(http.StatusBadRequest) fmt.Fprintf(w, "\t%s\n", "invalid ClientMode") return } - w.WriteHeader(http.StatusOK) workloads := client.WorkloadController.Processor.WorkloadCache.List() - data, _ := Marshal(workloads) - fmt.Fprintln(w, string(data)) + services := client.WorkloadController.Processor.ServiceCache.List() + workloadDump := WorkloadDump{ + Workloads: make([]*Workload, 0, len(workloads)), + Services: make([]*Service, 0, len(services)), + } + for _, w := range workloads { + workloadDump.Workloads = append(workloadDump.Workloads, ConvertWorkload(w)) + } + for _, s := range services { + workloadDump.Services = append(workloadDump.Services, ConvertService(s)) + } + printWorkloadDump(w, workloadDump) } func (s *Server) readyProbe(w http.ResponseWriter, r *http.Request) { // TODO: Add some components check w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) } func (s *Server) StartServer() { @@ -183,23 +201,13 @@ func (s *Server) StopServer() error { return s.server.Close() } -// Marshal marshals a slice of proto messages to json. -func Marshal[S ~[]E, E proto.Message](s S) ([]byte, error) { - raw := make([]json.RawMessage, len(s)) - for i, w := range s { - data, err := marshalWithIndent(w) - if err != nil { - return nil, err - } - raw[i] = data +func printWorkloadDump(w http.ResponseWriter, wd WorkloadDump) { + data, err := json.MarshalIndent(wd, "", " ") + if err != nil { + log.Errorf("Failed to marshal WorkloadDump: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return } - return json.MarshalIndent(raw, "", " ") -} - -// marshalWithIndent marshals a proto message with indent to json. -func marshalWithIndent(msg proto.Message) ([]byte, error) { - return protojson.MarshalOptions{ - Multiline: true, - Indent: " ", - }.Marshal(msg) + w.WriteHeader(http.StatusOK) + _, _ = w.Write(data) } diff --git a/pkg/status/status_server_test.go b/pkg/status/status_server_test.go new file mode 100644 index 000000000..5c61db532 --- /dev/null +++ b/pkg/status/status_server_test.go @@ -0,0 +1,130 @@ +/* + * Copyright 2024 The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package status + +import ( + "net/http" + "net/http/httptest" + "net/netip" + "testing" + + "istio.io/istio/pilot/test/util" + + "kmesh.net/kmesh/api/v2/workloadapi" + "kmesh.net/kmesh/pkg/controller" + "kmesh.net/kmesh/pkg/controller/workload" + "kmesh.net/kmesh/pkg/controller/workload/cache" +) + +func TestServer_configDumpWorkload(t *testing.T) { + w1 := &workloadapi.Workload{ + Uid: "cluster0//Pod/ns/name", + Namespace: "ns", + Name: "name", + Addresses: [][]byte{netip.AddrFrom4([4]byte{1, 2, 3, 4}).AsSlice()}, + Network: "testnetwork", + CanonicalName: "foo", + CanonicalRevision: "latest", + WorkloadType: workloadapi.WorkloadType_POD, + WorkloadName: "name", + Status: workloadapi.WorkloadStatus_HEALTHY, + ClusterId: "cluster0", + Services: map[string]*workloadapi.PortList{ + "ns/hostname": { + Ports: []*workloadapi.Port{ + { + ServicePort: 80, + TargetPort: 8080, + }, + { + ServicePort: 81, + TargetPort: 8180, + }, + { + ServicePort: 82, + TargetPort: 82, + }, + }, + }, + }, + Waypoint: &workloadapi.GatewayAddress{ + Destination: &workloadapi.GatewayAddress_Address{ + Address: &workloadapi.NetworkAddress{ + Network: "testnetwork", + Address: []byte{192, 168, 1, 10}, + }, + }, + }, + } + svc := &workloadapi.Service{ + Name: "svc", + Namespace: "ns", + Hostname: "hostname", + Ports: []*workloadapi.Port{ + { + ServicePort: 80, + TargetPort: 8080, + }, + { + ServicePort: 81, + TargetPort: 0, + }, + { + ServicePort: 82, + TargetPort: 0, + }, + }, + Waypoint: &workloadapi.GatewayAddress{ + Destination: &workloadapi.GatewayAddress_Address{ + Address: &workloadapi.NetworkAddress{ + Network: "testnetwork", + Address: []byte{192, 168, 1, 11}, + }, + }, + }} + fakeWorkloadCache := cache.NewWorkloadCache() + fakeServiceCache := cache.NewServiceCache() + fakeWorkloadCache.AddWorkload(w1) + fakeServiceCache.AddOrUpdateService(svc) + // Create a new instance of the Server struct + server := &Server{ + xdsClient: &controller.XdsClient{ + WorkloadController: &workload.Controller{ + Processor: &workload.Processor{ + WorkloadCache: fakeWorkloadCache, + ServiceCache: fakeServiceCache, + }, + }, + }, + } + + // Create a new HTTP request and response + req := httptest.NewRequest(http.MethodGet, "/configDumpWorkload", nil) + w := httptest.NewRecorder() + + // Call the configDumpWorkload function + server.configDumpWorkload(w, req) + + // Check the response status code + if w.Code != http.StatusOK { + t.Errorf("Expected status code %d, but got %d", http.StatusOK, w.Code) + } + + util.RefreshGoldenFile(t, w.Body.Bytes(), "./testdata/workload_configdump.json") + + util.CompareContent(t, w.Body.Bytes(), "./testdata/workload_configdump.json") +} diff --git a/pkg/status/testdata/workload_configdump.json b/pkg/status/testdata/workload_configdump.json new file mode 100644 index 000000000..a294c7a42 --- /dev/null +++ b/pkg/status/testdata/workload_configdump.json @@ -0,0 +1,54 @@ +{ + "Workloads": [ + { + "uid": "cluster0//Pod/ns/name", + "addresses": [ + "1.2.3.4" + ], + "waypoint": { + "destination": "testnetwork/192.168.1.10" + }, + "protocol": "NONE", + "name": "name", + "namespace": "ns", + "serviceAccount": "", + "workloadName": "name", + "workloadType": "POD", + "canonicalName": "foo", + "canonicalRevision": "latest", + "clusterId": "cluster0", + "locality": {}, + "node": "", + "network": "testnetwork", + "status": "HEALTHY", + "applicationTunnel": { + "protocol": "" + } + } + ], + "Services": [ + { + "name": "svc", + "namespace": "ns", + "hostname": "hostname", + "vips": [], + "ports": [ + { + "service_port": 80, + "target_port": 8080 + }, + { + "service_port": 81 + }, + { + "service_port": 82 + } + ], + "loadBalancer": null, + "waypoint": { + "destination": "testnetwork/192.168.1.11" + } + } + ], + "Policies": null +} \ No newline at end of file