Skip to content

Commit

Permalink
fully remove demultiplexer dependency from the checks command
Browse files Browse the repository at this point in the history
  • Loading branch information
GustavoCaso committed Mar 6, 2025
1 parent ae14a2c commit c4692f9
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 61 deletions.
81 changes: 58 additions & 23 deletions comp/api/api/apiimpl/internal/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,26 @@ import (

"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
"github.com/DataDog/datadog-agent/comp/api/api/types"
"github.com/DataDog/datadog-agent/comp/collector/collector"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/autodiscoveryimpl"
pkgcollector "github.com/DataDog/datadog-agent/pkg/collector"
"github.com/DataDog/datadog-agent/pkg/collector/check"
"github.com/DataDog/datadog-agent/pkg/collector/check/stats"
"github.com/DataDog/datadog-agent/pkg/util/option"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/metrics"
)

// SetupHandlers adds the specific handlers for /check endpoints
func SetupHandlers(
r *mux.Router,
collector option.Option[collector.Component],
autodiscovery autodiscovery.Component,
demultiplexer demultiplexer.Component,
) *mux.Router {
r.HandleFunc("/", listChecks).Methods("GET")
r.HandleFunc("/{name}", listCheck).Methods("GET", "DELETE")
r.HandleFunc("/{name}/reload", reloadCheck).Methods("POST")
r.HandleFunc("/run", func(w http.ResponseWriter, r *http.Request) {
runChecks(collector, autodiscovery, demultiplexer, w, r)
runChecks(autodiscovery, demultiplexer, w, r)
}).Methods("POST")

return r
Expand All @@ -61,21 +60,8 @@ func listCheck(w http.ResponseWriter, _ *http.Request) {
w.Write([]byte("Not yet implemented."))
}

type memoryProfileConfig struct {
dir string
frames string
gc string
combine string
sort string
limit string
diff string
filters string
unit string
verbose string
}

// TODO (component): Use collector once it implement GetLoaderErrors
func runChecks(_ option.Option[collector.Component], autodiscovery autodiscovery.Component, _ demultiplexer.Component, w http.ResponseWriter, r *http.Request) {
func runChecks(autodiscovery autodiscovery.Component, demultiplexer demultiplexer.Component, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var checkRequest types.CheckRequest

Expand Down Expand Up @@ -179,23 +165,54 @@ func runChecks(_ option.Option[collector.Component], autodiscovery autodiscovery
var instancesData []*stats.Stats
result := types.CheckResponse{}
metadata := make(map[string]map[string]interface{})
// instanceMetadata := make(map[string]interface{})

for _, c := range cs {
s := runCheck(c, times, pause)

time.Sleep(time.Duration(delay) * time.Millisecond)

instancesData = append(instancesData, s)
metadata[string(c.ID())] = check.GetMetadata(c, false)
// instanceMetadata[c.String()] = c.GetInstanceMetadata()
}

agg := demultiplexer.Aggregator()
series, sketches := agg.GetSeriesAndSketches(time.Now())
serviceChecks := agg.GetServiceChecks()
events := agg.GetEvents()
eventsPlatformEvents := agg.GetEventPlatformEvents()
aggregatorData := types.AggregatorData{}

if len(series) != 0 {
aggregatorData.Series = series
}

if len(sketches) != 0 {
s := make([]*metrics.SketchSeries, 0, len(sketches))
for _, sketch := range sketches {
s = append(s, sketch)
}
aggregatorData.SketchSeries = s
}

if len(serviceChecks) != 0 {
aggregatorData.ServiceCheck = serviceChecks
}

if len(events) != 0 {
aggregatorData.Events = events
}

if len(eventsPlatformEvents) != 0 {
aggregatorData.EventPlatformEvents = toEpEvents(eventsPlatformEvents)
}

result.Results = instancesData
result.Metadata = metadata
// result.InstanceMetadata = instanceMetadata
result.AggregatorData = aggregatorData

instancesJSON, _ := json.Marshal(result)
checkResult, _ := json.Marshal(result)

w.Write(instancesJSON)
w.Write(checkResult)
}

func fetchCheckNameError(w http.ResponseWriter, checkName string) {
Expand Down Expand Up @@ -320,3 +337,21 @@ func runCheck(c check.Check, times int, pause int) *stats.Stats {

return s
}

// toEpEvents transforms the raw event platform messages to EventPlatformEvent which are better for json formatting
func toEpEvents(events map[string][]*message.Message) map[string][]types.EventPlatformEvent {
result := make(map[string][]types.EventPlatformEvent)
for eventType, messages := range events {
var events []types.EventPlatformEvent
for _, m := range messages {
e := types.EventPlatformEvent{EventType: eventType, RawEvent: string(m.GetContent())}
err := json.Unmarshal([]byte(e.RawEvent), &e.UnmarshalledEvent)
if err == nil {
e.RawEvent = ""
}
events = append(events, e)
}
result[eventType] = events
}
return result
}
2 changes: 1 addition & 1 deletion comp/api/api/apiimpl/server_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (server *apiServer) startCMDServer(
server.endpointProviders,
server.taggerComp,
)))
cmdMux.Handle("/check/", http.StripPrefix("/check", check.SetupHandlers(checkMux, server.collector, server.autoConfig, server.demultiplexer)))
cmdMux.Handle("/check/", http.StripPrefix("/check", check.SetupHandlers(checkMux, server.autoConfig, server.demultiplexer)))
cmdMux.Handle("/", gwmux)

// Add some observability in the API server
Expand Down
32 changes: 27 additions & 5 deletions comp/api/api/types/checkRequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,36 @@
// Package types provides types for the API package
package types

import "github.com/DataDog/datadog-agent/pkg/collector/check/stats"
import (
"github.com/DataDog/datadog-agent/pkg/collector/check/stats"
"github.com/DataDog/datadog-agent/pkg/metrics"
"github.com/DataDog/datadog-agent/pkg/metrics/event"
"github.com/DataDog/datadog-agent/pkg/metrics/servicecheck"
)

// EventPlatformEvent represents an event from the event platform
type EventPlatformEvent struct {
RawEvent string `json:",omitempty"`
EventType string
UnmarshalledEvent map[string]interface{} `json:",omitempty"`
}

// AggregatorData represents the data from the aggregator
type AggregatorData struct {
Series metrics.Series `json:"series"`
SketchSeries []*metrics.SketchSeries `json:"sketch_series"`
ServiceCheck servicecheck.ServiceChecks `json:"service_check"`
Events event.Events `json:"events"`
EventPlatformEvents map[string][]EventPlatformEvent `json:"event_platform_events"`
}

// CheckResponse represents the response of a check
type CheckResponse struct {
Results []*stats.Stats `json:"results"`
Errors []string `json:"errors"`
Warnings []string `json:"warnings"`
Metadata map[string]map[string]interface{} `json:"metadata"`
Results []*stats.Stats `json:"results"`
Errors []string `json:"errors"`
Warnings []string `json:"warnings"`
Metadata map[string]map[string]interface{} `json:"metadata"`
AggregatorData AggregatorData `json:"aggregator_data"`
}

// MemoryProfileConfig represents the configuration for memory profiling
Expand Down
130 changes: 98 additions & 32 deletions pkg/cli/subcommands/check/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,24 @@ import (

"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/olekukonko/tablewriter"
"github.com/spf13/cobra"
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer"
"github.com/DataDog/datadog-agent/comp/aggregator/demultiplexer/demultiplexerimpl"
apiTypes "github.com/DataDog/datadog-agent/comp/api/api/types"
"github.com/DataDog/datadog-agent/comp/api/authtoken"
authtokenimpl "github.com/DataDog/datadog-agent/comp/api/authtoken/createandfetchimpl"
"github.com/DataDog/datadog-agent/comp/core"
"github.com/DataDog/datadog-agent/comp/core/config"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/secrets"
"github.com/DataDog/datadog-agent/comp/core/sysprobeconfig/sysprobeconfigimpl"
"github.com/DataDog/datadog-agent/comp/forwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl"
orchestratorForwarderImpl "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorimpl"
haagentfx "github.com/DataDog/datadog-agent/comp/haagent/fx"
"github.com/DataDog/datadog-agent/comp/metadata/inventorychecks/inventorychecksimpl"
logscompression "github.com/DataDog/datadog-agent/comp/serializer/logscompression/fx"
metricscompression "github.com/DataDog/datadog-agent/comp/serializer/metricscompression/fx"
"github.com/DataDog/datadog-agent/pkg/aggregator"
"github.com/DataDog/datadog-agent/pkg/api/util"
"github.com/DataDog/datadog-agent/pkg/cli/standalone"
"github.com/DataDog/datadog-agent/pkg/collector/check/stats"
"github.com/DataDog/datadog-agent/pkg/config/model"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/serializer"
"github.com/DataDog/datadog-agent/pkg/util/defaultpaths"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/scrubber"
Expand Down Expand Up @@ -123,9 +113,6 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command {
cliParams.cmd = cmd
cliParams.args = args

eventplatforParams := eventplatformimpl.NewDefaultParams()
eventplatforParams.UseNoopEventPlatformForwarder = true

disableCmdPort()
return fxutil.OneShot(run,
fx.Supply(cliParams),
Expand All @@ -137,17 +124,6 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command {
core.Bundle(),
authtokenimpl.Module(),
fx.Supply(context.Background()),
forwarder.Bundle(defaultforwarder.NewParams(defaultforwarder.WithNoopForwarder())),
logscompression.Module(),
metricscompression.Module(),
fx.Provide(func() serializer.MetricSerializer { return nil }),
// Initializing the aggregator with a flush interval of 0 (to disable the flush goroutines)
demultiplexerimpl.Module(demultiplexerimpl.NewDefaultParams(demultiplexerimpl.WithFlushInterval(0))),
orchestratorForwarderImpl.Module(orchestratorForwarderImpl.NewNoopParams()),
eventplatformimpl.Module(eventplatforParams),
eventplatformreceiverimpl.Module(),
haagentfx.Module(),

getPlatformModules(),
)
},
Expand Down Expand Up @@ -190,7 +166,7 @@ func MakeCommand(globalParamsGetter func() GlobalParams) *cobra.Command {
func run(
config config.Component,
cliParams *cliParams,
demultiplexer demultiplexer.Component,
_ authtoken.Component,
) error {
previousIntegrationTracing := false
previousIntegrationTracingExhaustive := false
Expand Down Expand Up @@ -307,7 +283,6 @@ func run(

var checkFileOutput bytes.Buffer
var instancesData []interface{}
printer := aggregator.AgentDemultiplexerPrinter{DemultiplexerWithAggregator: demultiplexer}

for _, c := range result.Results {
inventoryData := map[string]interface{}{}
Expand All @@ -319,11 +294,9 @@ func run(
}

if cliParams.formatJSON {
aggregatorData := printer.GetMetricsDataForPrint()

// There is only one checkID per run so we'll just access that
instanceData := map[string]interface{}{
"aggregator": aggregatorData,
"aggregator": result.AggregatorData,
"runner": c,
"inventory": inventoryData,
}
Expand Down Expand Up @@ -382,7 +355,7 @@ func run(
return fmt.Errorf("no diff data found in %s", profileDataDir)
}
} else {
printer.PrintMetrics(&checkFileOutput, cliParams.formatTable)
printMetrics(result.AggregatorData, &checkFileOutput, cliParams.formatTable)

p := func(data string) {
fmt.Println(data)
Expand Down Expand Up @@ -656,6 +629,99 @@ func createHiddenBooleanFlag(cmd *cobra.Command, p *bool, name string, value boo
cmd.Flags().MarkHidden(name) //nolint:errcheck
}

func printMetrics(aggregatorData apiTypes.AggregatorData, checkFileOutput *bytes.Buffer, formatTable bool) {
if len(aggregatorData.Series) != 0 {
fmt.Fprintf(color.Output, "=== %s ===\n", color.BlueString("Series"))

if formatTable {
headers, data := aggregatorData.Series.MarshalStrings()
var buffer bytes.Buffer

// plain table with no borders
table := tablewriter.NewWriter(&buffer)
table.SetHeader(headers)
table.SetAutoWrapText(false)
table.SetAutoFormatHeaders(true)
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
table.SetAlignment(tablewriter.ALIGN_LEFT)
table.SetCenterSeparator("")
table.SetColumnSeparator("")
table.SetRowSeparator("")
table.SetHeaderLine(false)
table.SetBorder(false)
table.SetTablePadding("\t")

table.AppendBulk(data)
table.Render()
fmt.Println(buffer.String())
checkFileOutput.WriteString(buffer.String() + "\n")
} else {
j, _ := json.MarshalIndent(aggregatorData.Series, "", " ")
fmt.Println(string(j))
checkFileOutput.WriteString(string(j) + "\n")
}
}
if len(aggregatorData.SketchSeries) != 0 {
fmt.Fprintf(color.Output, "=== %s ===\n", color.BlueString("Sketches"))
j, _ := json.MarshalIndent(aggregatorData.SketchSeries, "", " ")
fmt.Println(string(j))
checkFileOutput.WriteString(string(j) + "\n")
}

if len(aggregatorData.ServiceCheck) != 0 {
fmt.Fprintf(color.Output, "=== %s ===\n", color.BlueString("Service Checks"))

if formatTable {
headers, data := aggregatorData.ServiceCheck.MarshalStrings()
var buffer bytes.Buffer

// plain table with no borders
table := tablewriter.NewWriter(&buffer)
table.SetHeader(headers)
table.SetAutoWrapText(false)
table.SetAutoFormatHeaders(true)
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
table.SetAlignment(tablewriter.ALIGN_LEFT)
table.SetCenterSeparator("")
table.SetColumnSeparator("")
table.SetRowSeparator("")
table.SetHeaderLine(false)
table.SetBorder(false)
table.SetTablePadding("\t")

table.AppendBulk(data)
table.Render()
fmt.Println(buffer.String())
checkFileOutput.WriteString(buffer.String() + "\n")
} else {
j, _ := json.MarshalIndent(aggregatorData.ServiceCheck, "", " ")
fmt.Println(string(j))
checkFileOutput.WriteString(string(j) + "\n")
}
}

if len(aggregatorData.Events) != 0 {
fmt.Fprintf(color.Output, "=== %s ===\n", color.BlueString("Events"))
checkFileOutput.WriteString("=== Events ===\n")
j, _ := json.MarshalIndent(aggregatorData.Events, "", " ")
fmt.Println(string(j))
checkFileOutput.WriteString(string(j) + "\n")
}

for k, v := range aggregatorData.EventPlatformEvents {
if len(v) > 0 {
if translated, ok := stats.EventPlatformNameTranslations[k]; ok {
k = translated
}
fmt.Fprintf(color.Output, "=== %s ===\n", color.BlueString(k))
checkFileOutput.WriteString(fmt.Sprintf("=== %s ===\n", k))
j, _ := json.MarshalIndent(v, "", " ")
fmt.Println(string(j))
checkFileOutput.WriteString(string(j) + "\n")
}
}
}

// disableCmdPort overrrides the `cmd_port` configuration so that when the
// server starts up, it does not do so on the same port as a running agent.
//
Expand Down

0 comments on commit c4692f9

Please sign in to comment.