Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cf task metrics #56

Merged
merged 2 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions collectors/collectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func NewCollector(
res.collectors = append(res.collectors, collector)
}

if filter.Enabled(filters.Tasks) {
collector := NewTasksCollector(namespace, environment, deployment)
res.collectors = append(res.collectors, collector)
}

if filter.Enabled(filters.Events) {
collector := NewEventsCollector(namespace, environment, deployment)
res.collectors = append(res.collectors, collector)
Expand Down
263 changes: 263 additions & 0 deletions collectors/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package collectors

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/bosh-prometheus/cf_exporter/models"
)

type TasksCollector struct {
namespace string
environment string
deployment string
taskInfoMetric *prometheus.GaugeVec
tasksCountMetric *prometheus.GaugeVec
tasksMemoryMbSumMetric *prometheus.GaugeVec
tasksDiskQuotaMbSumMetric *prometheus.GaugeVec
tasksOldestCreatedAtMetric *prometheus.GaugeVec
tasksScrapesTotalMetric prometheus.Counter
tasksScrapeErrorsTotalMetric prometheus.Counter
lastTasksScrapeErrorMetric prometheus.Gauge
lastTasksScrapeTimestampMetric prometheus.Gauge
lastTasksScrapeDurationSecondsMetric prometheus.Gauge
}

func NewTasksCollector(
namespace string,
environment string,
deployment string,
) *TasksCollector {
taskInfoMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "info",
Help: "Labeled Cloud Foundry Task information with a constant '1' value.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksCountMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "count",
Help: "Number of Cloud Foundry Tasks.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksMemoryMbSumMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "memory_mb_sum",
Help: "Sum of Cloud Foundry Tasks Memory (Mb).",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksDiskQuotaMbSumMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "disk_quota_mb_sum",
Help: "Sum of Cloud Foundry Tasks Disk Quota (Mb).",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksOldestCreatedAtMetric := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "task",
Name: "oldest_created_at",
Help: "Number of seconds since 1970 of creation time of oldest Cloud Foundry task.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
[]string{"application_id", "state"},
)

tasksScrapesTotalMetric := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "tasks_scrapes",
Name: "total",
Help: "Total number of scrapes for Cloud Foundry Tasks.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

tasksScrapeErrorsTotalMetric := prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "tasks_scrape_errors",
Name: "total",
Help: "Total number of scrape error of Cloud Foundry Tasks.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

lastTasksScrapeErrorMetric := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "",
Name: "last_tasks_scrape_error",
Help: "Whether the last scrape of Tasks metrics from Cloud Foundry resulted in an error (1 for error, 0 for success).",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

lastTasksScrapeTimestampMetric := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "",
Name: "last_tasks_scrape_timestamp",
Help: "Number of seconds since 1970 since last scrape of Tasks metrics from Cloud Foundry.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

lastTasksScrapeDurationSecondsMetric := prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "",
Name: "last_tasks_scrape_duration_seconds",
Help: "Duration of the last scrape of Tasks metrics from Cloud Foundry.",
ConstLabels: prometheus.Labels{"environment": environment, "deployment": deployment},
},
)

return &TasksCollector{
namespace: namespace,
environment: environment,
deployment: deployment,
taskInfoMetric: taskInfoMetric,
tasksCountMetric: tasksCountMetric,
tasksMemoryMbSumMetric: tasksMemoryMbSumMetric,
tasksDiskQuotaMbSumMetric: tasksDiskQuotaMbSumMetric,
tasksOldestCreatedAtMetric: tasksOldestCreatedAtMetric,
tasksScrapesTotalMetric: tasksScrapesTotalMetric,
tasksScrapeErrorsTotalMetric: tasksScrapeErrorsTotalMetric,
lastTasksScrapeErrorMetric: lastTasksScrapeErrorMetric,
lastTasksScrapeTimestampMetric: lastTasksScrapeTimestampMetric,
lastTasksScrapeDurationSecondsMetric: lastTasksScrapeDurationSecondsMetric,
}
}

func (c TasksCollector) Collect(objs *models.CFObjects, ch chan<- prometheus.Metric) {
errorMetric := float64(0)
if objs.Error != nil {
errorMetric = float64(1)
c.tasksScrapeErrorsTotalMetric.Inc()
} else {
c.reportTasksMetrics(objs, ch)
}

c.tasksScrapeErrorsTotalMetric.Collect(ch)
c.tasksScrapesTotalMetric.Inc()
c.tasksScrapesTotalMetric.Collect(ch)

c.lastTasksScrapeErrorMetric.Set(errorMetric)
c.lastTasksScrapeErrorMetric.Collect(ch)

c.lastTasksScrapeTimestampMetric.Set(float64(time.Now().Unix()))
c.lastTasksScrapeTimestampMetric.Collect(ch)
c.lastTasksScrapeDurationSecondsMetric.Set(objs.Took)

c.lastTasksScrapeDurationSecondsMetric.Collect(ch)
}

func (c TasksCollector) Describe(ch chan<- *prometheus.Desc) {
c.taskInfoMetric.Describe(ch)
c.tasksCountMetric.Describe(ch)
c.tasksMemoryMbSumMetric.Describe(ch)
c.tasksDiskQuotaMbSumMetric.Describe(ch)
c.tasksOldestCreatedAtMetric.Describe(ch)
c.tasksScrapesTotalMetric.Describe(ch)
c.tasksScrapeErrorsTotalMetric.Describe(ch)
c.lastTasksScrapeErrorMetric.Describe(ch)
c.lastTasksScrapeTimestampMetric.Describe(ch)
c.lastTasksScrapeDurationSecondsMetric.Describe(ch)
}

func (c TasksCollector) reportTasksMetrics(objs *models.CFObjects, ch chan<- prometheus.Metric) error {
c.taskInfoMetric.Reset()
c.tasksCountMetric.Reset()
c.tasksMemoryMbSumMetric.Reset()
c.tasksDiskQuotaMbSumMetric.Reset()
c.tasksOldestCreatedAtMetric.Reset()

type keyType struct {
application_id string
state string
}
groupedTasks := map[keyType][]*models.Task{}
for _, task := range objs.Tasks {
application_id := "unavailable"
if app, ok := task.Relationships["app"]; ok && app.GUID != "" {
application_id = app.GUID
}
key := keyType{application_id, string(task.State)}

existingValue, ok := groupedTasks[key]
if !ok {
existingValue = []*models.Task{}
}
groupedTasks[key] = append(existingValue, &task)
}

for key, tasks := range groupedTasks {
c.taskInfoMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(1))

c.tasksCountMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(len(tasks)))

memorySum := int64(0)
for _, task := range tasks {
memorySum += int64(task.MemoryInMb)
}
c.tasksMemoryMbSumMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(memorySum))

diskSum := int64(0)
for _, task := range tasks {
diskSum += int64(task.DiskInMb)
}
c.tasksDiskQuotaMbSumMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(diskSum))

createdAtOldest := time.Now()
for _, task := range tasks {
if task.CreatedAt.Before(createdAtOldest) {
createdAtOldest = task.CreatedAt
}
}
c.tasksOldestCreatedAtMetric.WithLabelValues(
key.application_id,
key.state,
).Set(float64(createdAtOldest.Unix()))
}

c.taskInfoMetric.Collect(ch)
c.tasksCountMetric.Collect(ch)
c.tasksMemoryMbSumMetric.Collect(ch)
c.tasksDiskQuotaMbSumMetric.Collect(ch)
c.tasksOldestCreatedAtMetric.Collect(ch)

return nil
}
5 changes: 5 additions & 0 deletions fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ var (
Key: ccv3.OrderBy,
Values: []string{"-created_at"},
}
TaskActiveStates = ccv3.Query{
Key: ccv3.StatesFilter,
Values: []string{"PENDING", "RUNNING", "CANCELING"},
}
)

type CFConfig struct {
Expand Down Expand Up @@ -67,6 +71,7 @@ func (c *Fetcher) workInit() {
c.worker.PushIf("security_groups", c.fetchSecurityGroups, filters.SecurityGroups)
c.worker.PushIf("stacks", c.fetchStacks, filters.Stacks)
c.worker.PushIf("buildpacks", c.fetchBuildpacks, filters.Buildpacks)
c.worker.PushIf("tasks", c.fetchTasks, filters.Tasks)
c.worker.PushIf("service_brokers", c.fetchServiceBrokers, filters.Services)
c.worker.PushIf("service_offerings", c.fetchServiceOfferings, filters.Services)
c.worker.PushIf("service_instances", c.fetchServiceInstances, filters.ServiceInstances)
Expand Down
8 changes: 8 additions & 0 deletions fetcher/fetcher_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ func (c *Fetcher) fetchBuildpacks(session *SessionExt, entry *models.CFObjects)
return err
}

func (c *Fetcher) fetchTasks(session *SessionExt, entry *models.CFObjects) error {
tasks, err := session.GetTasks()
if err == nil {
loadIndex(entry.Tasks, tasks, func(r models.Task) string { return r.GUID })
}
return err
}

func (c *Fetcher) fetchServiceBrokers(session *SessionExt, entry *models.CFObjects) error {
servicebrokers, _, err := session.V3().GetServiceBrokers(LargeQuery)
if err == nil {
Expand Down
11 changes: 11 additions & 0 deletions fetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var _ = Describe("Fetcher", func() {
"security_groups",
"stacks",
"buildpacks",
"tasks",
"service_brokers",
"service_offerings",
"service_instances",
Expand Down Expand Up @@ -120,6 +121,16 @@ var _ = Describe("Fetcher", func() {
})
})

When("tasks filter is set", func() {
BeforeEach(func() {
active = []string{ filters.Tasks }
expected = []string{ "info", "tasks" }
})
It("plans only specific jobs", func() {
Ω(jobs).Should(ConsistOf(expected))
})
})

When("isolationsegments filter is set", func() {
BeforeEach(func() {
active = []string{filters.IsolationSegments}
Expand Down
14 changes: 14 additions & 0 deletions fetcher/sessionext.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,20 @@ func (s SessionExt) GetApplications() ([]models.Application, error) {
return res, err
}

func (s SessionExt) GetTasks() ([]models.Task, error) {
res := []models.Task{}
_, _, err := s.V3().MakeListRequest(ccv3.RequestParams{
RequestName: "GetTasks",
Query: []ccv3.Query{LargeQuery, TaskActiveStates},
ResponseBody: models.Task{},
AppendToList: func(item interface{}) error {
res = append(res, item.(models.Task))
return nil
},
})
return res, err
}

func (s SessionExt) GetOrganizationQuotas() ([]models.Quota, error) {
res := []models.Quota{}
_, _, err := s.V3().MakeListRequest(ccv3.RequestParams{
Expand Down
27 changes: 27 additions & 0 deletions fetcher/sessionext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,33 @@ var _ = Describe("Extensions", func() {
})
})

Context("fetching tasks", func() {
It("no error occurs", func() {
server.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/v3/tasks", "per_page=5000&states=PENDING,RUNNING,CANCELING"),
ghttp.RespondWith(http.StatusOK, serializeList(
models.Task{
GUID: "guid1",
State: constant.TaskPending,
},
models.Task{
GUID: "guid2",
State: constant.TaskCanceling,
},
)),
),
)
objs, err := target.GetTasks()
Ω(err).ShouldNot(HaveOccurred())
Ω(objs).Should(HaveLen(2))
Ω(objs[0].GUID).Should(Equal("guid1"))
Ω(objs[0].State).Should(Equal(constant.TaskPending))
Ω(objs[1].GUID).Should(Equal("guid2"))
Ω(objs[1].State).Should(Equal(constant.TaskCanceling))
})
})

Context("fetching org quotas", func() {
It("no error occurs", func() {
server.AppendHandlers(
Expand Down
Loading