Skip to content

Commit

Permalink
add cf task metrics
Browse files Browse the repository at this point in the history
pre-aggregated to avoid exposing any transient and therefore
high cardinality labels

oldest_created_at allows long-running tasks to be spotted while
still avoiding the high-cardinality labels that would be needed
to identify individual tasks

disable by default for its initial introduction to avoid
nasty surprises
  • Loading branch information
risicle committed Jun 9, 2023
1 parent d60a84e commit 4bc6841
Show file tree
Hide file tree
Showing 11 changed files with 351 additions and 1 deletion.
5 changes: 5 additions & 0 deletions collectors/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func NewCollector(
res.collectors = append(res.collectors, collector)
}

if filter.Enabled(filters.Tasks) {
collector := NewTasksCollector(namespace, environment, deployment)
res.collectors = append(res.collectors, collector)
}

if filter.Enabled(filters.Events) {
collector := NewEventsCollector(namespace, environment, deployment)
res.collectors = append(res.collectors, collector)
Expand Down
263 changes: 263 additions & 0 deletions collectors/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package collectors

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/bosh-prometheus/cf_exporter/models"
)

type TasksCollector struct {
namespace string
environment string
deployment string
taskInfoMetric *prometheus.GaugeVec
tasksCountMetric *prometheus.GaugeVec
tasksMemoryMbSumMetric *prometheus.GaugeVec
tasksDiskQuotaMbSumMetric *prometheus.GaugeVec
tasksOldestCreatedAtMetric *prometheus.GaugeVec
tasksScrapesTotalMetric prometheus.Counter
tasksScrapeErrorsTotalMetric prometheus.Counter
lastTasksScrapeErrorMetric prometheus.Gauge
lastTasksScrapeTimestampMetric prometheus.Gauge
lastTasksScrapeDurationSecondsMetric prometheus.Gauge
}

func NewTasksCollector(
namespace string,
environment string,
deployment string,
) *TasksCollector {
taskInfoMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "info",
Help: "Labeled Cloud Foundry Task information with a constant '1' value.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksCountMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "count",
Help: "Number of Cloud Foundry Tasks.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksMemoryMbSumMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "memory_mb_sum",
Help: "Sum of Cloud Foundry Tasks Memory (Mb).",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksDiskQuotaMbSumMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "disk_quota_mb_sum",
Help: "Sum of Cloud Foundry Tasks Disk Quota (Mb).",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksOldestCreatedAtMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "oldest_created_at",
Help: "Number of seconds since 1970 of creation time of oldest Cloud Foundry task.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksScrapesTotalMetric := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "tasks_scrapes",
Name: "total",
Help: "Total number of scrapes for Cloud Foundry Tasks.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

tasksScrapeErrorsTotalMetric := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "tasks_scrape_errors",
Name: "total",
Help: "Total number of scrape error of Cloud Foundry Tasks.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

lastTasksScrapeErrorMetric := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "",
Name: "last_tasks_scrape_error",
Help: "Whether the last scrape of Tasks metrics from Cloud Foundry resulted in an error (1 for error, 0 for success).",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

lastTasksScrapeTimestampMetric := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "",
Name: "last_tasks_scrape_timestamp",
Help: "Number of seconds since 1970 since last scrape of Tasks metrics from Cloud Foundry.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

lastTasksScrapeDurationSecondsMetric := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "",
Name: "last_tasks_scrape_duration_seconds",
Help: "Duration of the last scrape of Tasks metrics from Cloud Foundry.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

return &TasksCollector{
namespace: namespace,
environment: environment,
deployment: deployment,
taskInfoMetric: taskInfoMetric,
tasksCountMetric: tasksCountMetric,
tasksMemoryMbSumMetric: tasksMemoryMbSumMetric,
tasksDiskQuotaMbSumMetric: tasksDiskQuotaMbSumMetric,
tasksOldestCreatedAtMetric: tasksOldestCreatedAtMetric,
tasksScrapesTotalMetric: tasksScrapesTotalMetric,
tasksScrapeErrorsTotalMetric: tasksScrapeErrorsTotalMetric,
lastTasksScrapeErrorMetric: lastTasksScrapeErrorMetric,
lastTasksScrapeTimestampMetric: lastTasksScrapeTimestampMetric,
lastTasksScrapeDurationSecondsMetric: lastTasksScrapeDurationSecondsMetric,
}
}

func (c TasksCollector) Collect(objs *models.CFObjects, ch chan<- prometheus.Metric) {
errorMetric := float64(0)
if objs.Error != nil {
errorMetric = float64(1)
c.tasksScrapeErrorsTotalMetric.Inc()
} else {
c.reportTasksMetrics(objs, ch)
}

c.tasksScrapeErrorsTotalMetric.Collect(ch)
c.tasksScrapesTotalMetric.Inc()
c.tasksScrapesTotalMetric.Collect(ch)

c.lastTasksScrapeErrorMetric.Set(errorMetric)
c.lastTasksScrapeErrorMetric.Collect(ch)

c.lastTasksScrapeTimestampMetric.Set(float64(time.Now().Unix()))
c.lastTasksScrapeTimestampMetric.Collect(ch)
c.lastTasksScrapeDurationSecondsMetric.Set(objs.Took)

c.lastTasksScrapeDurationSecondsMetric.Collect(ch)
}

func (c TasksCollector) Describe(ch chan<- *prometheus.Desc) {
c.taskInfoMetric.Describe(ch)
c.tasksCountMetric.Describe(ch)
c.tasksMemoryMbSumMetric.Describe(ch)
c.tasksDiskQuotaMbSumMetric.Describe(ch)
c.tasksOldestCreatedAtMetric.Describe(ch)
c.tasksScrapesTotalMetric.Describe(ch)
c.tasksScrapeErrorsTotalMetric.Describe(ch)
c.lastTasksScrapeErrorMetric.Describe(ch)
c.lastTasksScrapeTimestampMetric.Describe(ch)
c.lastTasksScrapeDurationSecondsMetric.Describe(ch)
}

func (c TasksCollector) reportTasksMetrics(objs *models.CFObjects, ch chan<- prometheus.Metric) error {
c.taskInfoMetric.Reset()
c.tasksCountMetric.Reset()
c.tasksMemoryMbSumMetric.Reset()
c.tasksDiskQuotaMbSumMetric.Reset()
c.tasksOldestCreatedAtMetric.Reset()

type keyType struct {
application_id string
state string
}
groupedTasks := map[keyType][]*models.Task{}
for _, task := range objs.Tasks {
application_id := "unavailable"
if app, ok := task.Relationships["app"]; ok && app.GUID != "" {
application_id = app.GUID
}
key := keyType{application_id, string(task.State)}

existingValue, ok := groupedTasks[key]
if !ok {
existingValue = []*models.Task{}
}
groupedTasks[key] = append(existingValue, &task)
}

for key, tasks := range groupedTasks {
c.taskInfoMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(1))

c.tasksCountMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(len(tasks)))

memorySum := int64(0)
for _, task := range tasks {
memorySum += int64(task.MemoryInMb)
}
c.tasksMemoryMbSumMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(memorySum))

diskSum := int64(0)
for _, task := range tasks {
diskSum += int64(task.DiskInMb)
}
c.tasksDiskQuotaMbSumMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(diskSum))

createdAtOldest := time.Now()
for _, task := range tasks {
if task.CreatedAt.Before(createdAtOldest) {
createdAtOldest = task.CreatedAt
}
}
c.tasksOldestCreatedAtMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(createdAtOldest.Unix()))
}

c.taskInfoMetric.Collect(ch)
c.tasksCountMetric.Collect(ch)
c.tasksMemoryMbSumMetric.Collect(ch)
c.tasksDiskQuotaMbSumMetric.Collect(ch)
c.tasksOldestCreatedAtMetric.Collect(ch)

return nil
}
5 changes: 5 additions & 0 deletions fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ var (
Key: ccv3.OrderBy,
Values: []string{"-created_at"},
}
TaskActiveStates = ccv3.Query{
Key: ccv3.StatesFilter,
Values: []string{"PENDING", "RUNNING", "CANCELING"},
}
)

type CFConfig struct {
Expand Down Expand Up @@ -67,6 +71,7 @@ func (c *Fetcher) workInit() {
c.worker.PushIf("security_groups", c.fetchSecurityGroups, filters.SecurityGroups)
c.worker.PushIf("stacks", c.fetchStacks, filters.Stacks)
c.worker.PushIf("buildpacks", c.fetchBuildpacks, filters.Buildpacks)
c.worker.PushIf("tasks", c.fetchTasks, filters.Tasks)
c.worker.PushIf("service_brokers", c.fetchServiceBrokers, filters.Services)
c.worker.PushIf("service_offerings", c.fetchServiceOfferings, filters.Services)
c.worker.PushIf("service_instances", c.fetchServiceInstances, filters.ServiceInstances)
Expand Down
8 changes: 8 additions & 0 deletions fetcher/fetcher_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ func (c *Fetcher) fetchBuildpacks(session *SessionExt, entry *models.CFObjects)
return err
}

func (c *Fetcher) fetchTasks(session *SessionExt, entry *models.CFObjects) error {
tasks, err := session.GetTasks()
if err == nil {
loadIndex(entry.Tasks, tasks, func(r models.Task) string { return r.GUID })
}
return err
}

func (c *Fetcher) fetchServiceBrokers(session *SessionExt, entry *models.CFObjects) error {
servicebrokers, _, err := session.V3().GetServiceBrokers(LargeQuery)
if err == nil {
Expand Down
11 changes: 11 additions & 0 deletions fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var _ = Describe("Fetcher", func() {
"security_groups",
"stacks",
"buildpacks",
"tasks",
"service_brokers",
"service_offerings",
"service_instances",
Expand Down Expand Up @@ -120,6 +121,16 @@ var _ = Describe("Fetcher", func() {
})
})

When("tasks filter is set", func() {
BeforeEach(func() {
active = []string{ filters.Tasks }
expected = []string{ "info", "tasks" }
})
It("plans only specific jobs", func() {
Ω(jobs).Should(ConsistOf(expected))
})
})

When("isolationsegments filter is set", func() {
BeforeEach(func() {
active = []string{filters.IsolationSegments}
Expand Down
14 changes: 14 additions & 0 deletions fetcher/sessionext.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ func (s SessionExt) GetApplications() ([]models.Application, error) {
return res, err
}

func (s SessionExt) GetTasks() ([]models.Task, error) {
res := []models.Task{}
_, _, err := s.V3().MakeListRequest(ccv3.RequestParams{
RequestName: "GetTasks",
Query: []ccv3.Query{LargeQuery, TaskActiveStates},
ResponseBody: models.Task{},
AppendToList: func(item interface{}) error {
res = append(res, item.(models.Task))
return nil
},
})
return res, err
}

func (s SessionExt) GetOrganizationQuotas() ([]models.Quota, error) {
res := []models.Quota{}
_, _, err := s.V3().MakeListRequest(ccv3.RequestParams{
Expand Down
27 changes: 27 additions & 0 deletions fetcher/sessionext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,33 @@ var _ = Describe("Extensions", func() {
})
})

Context("fetching tasks", func() {
It("no error occurs", func() {
server.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/v3/tasks", "per_page=5000&states=PENDING,RUNNING,CANCELING"),
ghttp.RespondWith(http.StatusOK, serializeList(
models.Task{
GUID: "guid1",
State: constant.TaskPending,
},
models.Task{
GUID: "guid2",
State: constant.TaskCanceling,
},
)),
),
)
objs, err := target.GetTasks()
Ω(err).ShouldNot(HaveOccurred())
Ω(objs).Should(HaveLen(2))
Ω(objs[0].GUID).Should(Equal("guid1"))
Ω(objs[0].State).Should(Equal(constant.TaskPending))
Ω(objs[1].GUID).Should(Equal("guid2"))
Ω(objs[1].State).Should(Equal(constant.TaskCanceling))
})
})

Context("fetching org quotas", func() {
It("no error occurs", func() {
server.AppendHandlers(
Expand Down
Loading

0 comments on commit 4bc6841

Please sign in to comment.