Skip to content

Commit

Permalink
Add taskrun/pipelinerun gauge metrics for wait times around resolving…
Browse files Browse the repository at this point in the history
… respective task/pipelines

This commit adds new experimental gauge metrics that count the number of TaskRuns who are waiting for resolution of any Tasks they reference,
as well as count the number of PipelineRuns waiting on Pipeline resolution, and lastly count the number of PipelineRuns waiting on Task resolution
for their underlying TaskRuns.
  • Loading branch information
gabemontero committed Sep 6, 2023
1 parent 55e0450 commit 423b81d
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 23 deletions.
27 changes: 15 additions & 12 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@ The following pipeline metrics are available at `controller-service` on port `90

We expose several kinds of exporters, including Prometheus, Google Stackdriver, and many others. You can set them up using [observability configuration](../config/config-observability.yaml).

| Name | Type | Labels/Tags | Status |
| ---------- | ----------- | ----------- | ----------- |
| `tekton_pipelines_controller_pipelinerun_duration_seconds_[bucket, sum, count]` | Histogram/LastValue(Gauge) | `*pipeline`=&lt;pipeline_name&gt; <br> `*pipelinerun`=&lt;pipelinerun_name&gt; <br> `status`=&lt;status&gt; <br> `namespace`=&lt;pipelinerun-namespace&gt; | experimental |
| Name | Type | Labels/Tags | Status |
|-----------------------------------------------------------------------------------------| ----------- | ----------- | ----------- |
| `tekton_pipelines_controller_pipelinerun_duration_seconds_[bucket, sum, count]` | Histogram/LastValue(Gauge) | `*pipeline`=&lt;pipeline_name&gt; <br> `*pipelinerun`=&lt;pipelinerun_name&gt; <br> `status`=&lt;status&gt; <br> `namespace`=&lt;pipelinerun-namespace&gt; | experimental |
| `tekton_pipelines_controller_pipelinerun_taskrun_duration_seconds_[bucket, sum, count]` | Histogram/LastValue(Gauge) | `*pipeline`=&lt;pipeline_name&gt; <br> `*pipelinerun`=&lt;pipelinerun_name&gt; <br> `status`=&lt;status&gt; <br> `*task`=&lt;task_name&gt; <br> `*taskrun`=&lt;taskrun_name&gt;<br> `namespace`=&lt;pipelineruns-taskruns-namespace&gt;| experimental |
| `tekton_pipelines_controller_pipelinerun_count` | Counter | `status`=&lt;status&gt; | experimental |
| `tekton_pipelines_controller_running_pipelineruns_count` | Gauge | | experimental |
| `tekton_pipelines_controller_taskrun_duration_seconds_[bucket, sum, count]` | Histogram/LastValue(Gauge) | `status`=&lt;status&gt; <br> `*task`=&lt;task_name&gt; <br> `*taskrun`=&lt;taskrun_name&gt;<br> `namespace`=&lt;pipelineruns-taskruns-namespace&gt; | experimental |
| `tekton_pipelines_controller_taskrun_count` | Counter | `status`=&lt;status&gt; | experimental |
| `tekton_pipelines_controller_running_taskruns_count` | Gauge | | experimental |
| `tekton_pipelines_controller_running_taskruns_throttled_by_quota_count` | Gauge | | experimental |
| `tekton_pipelines_controller_running_taskruns_throttled_by_node_count` | Gauge | | experimental |
| `tekton_pipelines_controller_taskruns_pod_latency_milliseconds` | Gauge | `namespace`=&lt;taskruns-namespace&gt; <br> `pod`= &lt; taskrun_pod_name&gt; <br> `*task`=&lt;task_name&gt; <br> `*taskrun`=&lt;taskrun_name&gt;<br> | experimental |
| `tekton_pipelines_controller_client_latency_[bucket, sum, count]` | Histogram | | experimental |
| `tekton_pipelines_controller_pipelinerun_count` | Counter | `status`=&lt;status&gt; | experimental |
| `tekton_pipelines_controller_running_pipelineruns_count` | Gauge | | experimental |
| `tekton_pipelines_controller_taskrun_duration_seconds_[bucket, sum, count]` | Histogram/LastValue(Gauge) | `status`=&lt;status&gt; <br> `*task`=&lt;task_name&gt; <br> `*taskrun`=&lt;taskrun_name&gt;<br> `namespace`=&lt;pipelineruns-taskruns-namespace&gt; | experimental |
| `tekton_pipelines_controller_taskrun_count` | Counter | `status`=&lt;status&gt; | experimental |
| `tekton_pipelines_controller_running_taskruns_count` | Gauge | | experimental |
| `tekton_pipelines_controller_running_taskruns_throttled_by_quota_count` | Gauge | | experimental |
| `tekton_pipelines_controller_running_taskruns_throttled_by_node_count` | Gauge | | experimental |
| `tekton_pipelines_controller_running_taskruns_waiting_on_task_resolution_count` | Gauge | | experimental |
| `tekton_pipelines_controller_running_pipelineruns_waiting_on_pipeline_resolution_count` | Gauge | | experimental |
| `tekton_pipelines_controller_running_pipelineruns_waiting_on_task_resolution_count` | Gauge | | experimental |
| `tekton_pipelines_controller_taskruns_pod_latency_milliseconds` | Gauge | `namespace`=&lt;taskruns-namespace&gt; <br> `pod`= &lt; taskrun_pod_name&gt; <br> `*task`=&lt;task_name&gt; <br> `*taskrun`=&lt;taskrun_name&gt;<br> | experimental |
| `tekton_pipelines_controller_client_latency_[bucket, sum, count]` | Histogram | | experimental |

The Labels/Tag marked as "*" are optional. And there's a choice between Histogram and LastValue(Gauge) for pipelinerun and taskrun duration metrics.

Expand Down
38 changes: 37 additions & 1 deletion pkg/pipelinerunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ var (
"Number of pipelineruns executing currently",
stats.UnitDimensionless)
runningPRsCountView *view.View

runningPRsWaitingOnPipelineResolutionCount = stats.Float64("running_pipelineruns_waiting_on_pipeline_resolution_count",
"Number of pipelineruns executing currently that are waiting on resolution requests for their pipeline references.",
stats.UnitDimensionless)
runningPRsWaitingOnPipelineResolutionCountView *view.View

runningPRsWaitingOnTaskResolutionCount = stats.Float64("running_pipelineruns_waiting_on_task_resolution_count",
"Number of pipelineruns executing currently that are waiting on resolution requests for the task references of their taskrun children.",
stats.UnitDimensionless)
runningPRsWaitingOnTaskResolutionCountView *view.View
)

const (
Expand Down Expand Up @@ -160,16 +170,28 @@ func viewRegister(cfg *config.Metrics) error {
Measure: runningPRsCount,
Aggregation: view.LastValue(),
}
runningPRsWaitingOnPipelineResolutionCountView = &view.View{
Description: runningPRsWaitingOnPipelineResolutionCount.Description(),
Measure: runningPRsWaitingOnPipelineResolutionCount,
Aggregation: view.LastValue(),
}
runningPRsWaitingOnTaskResolutionCountView = &view.View{
Description: runningPRsWaitingOnTaskResolutionCount.Description(),
Measure: runningPRsWaitingOnTaskResolutionCount,
Aggregation: view.LastValue(),
}

return view.Register(
prDurationView,
prCountView,
runningPRsCountView,
runningPRsWaitingOnPipelineResolutionCountView,
runningPRsWaitingOnTaskResolutionCountView,
)
}

func viewUnregister() {
view.Unregister(prDurationView, prCountView, runningPRsCountView)
view.Unregister(prDurationView, prCountView, runningPRsCountView, runningPRsWaitingOnPipelineResolutionCountView, runningPRsWaitingOnTaskResolutionCountView)
}

// MetricsOnStore returns a function that checks if metrics are configured for a config.Store, and registers it if so
Expand Down Expand Up @@ -272,9 +294,21 @@ func (r *Recorder) RunningPipelineRuns(lister listers.PipelineRunLister) error {
}

var runningPRs int
var trsWaitResolvingTaskRef int
var prsWaitResolvingPipelineRef int

for _, pr := range prs {
if !pr.IsDone() {
runningPRs++
succeedCondition := pr.Status.GetCondition(apis.ConditionSucceeded)
if succeedCondition != nil && succeedCondition.Status == corev1.ConditionUnknown {
switch succeedCondition.Reason {
case v1.TaskRunReasonResolvingTaskRef:
trsWaitResolvingTaskRef++
case "ResolvingPipelineRef": // unfortunately importing pipelinerun.ReasonResolvingPipelineRef results in a circular dependency
prsWaitResolvingPipelineRef++
}
}
}
}

Expand All @@ -283,6 +317,8 @@ func (r *Recorder) RunningPipelineRuns(lister listers.PipelineRunLister) error {
return err
}
metrics.Record(ctx, runningPRsCount.M(float64(runningPRs)))
metrics.Record(ctx, runningPRsWaitingOnPipelineResolutionCount.M(float64(prsWaitResolvingPipelineRef)))
metrics.Record(ctx, runningPRsWaitingOnTaskResolutionCount.M(float64(trsWaitResolvingTaskRef)))

return nil
}
Expand Down
81 changes: 80 additions & 1 deletion pkg/pipelinerunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,87 @@ func TestRecordRunningPipelineRunsCount(t *testing.T) {
metricstest.CheckLastValueData(t, "running_pipelineruns_count", map[string]string{}, 1)
}

func TestRecordRunningPipelineRunsResolutionWaitCounts(t *testing.T) {
for _, tc := range []struct {
status corev1.ConditionStatus
reason string
prWaitCount float64
trWaitCount float64
}{
{
status: corev1.ConditionTrue,
reason: "",
},
{
status: corev1.ConditionTrue,
reason: "ResolvingPipelineRef", // unfortunately importing pipelinerun.ReasonResolvingPipelineRef results in a circular dependency
},
{
status: corev1.ConditionTrue,
reason: v1.TaskRunReasonResolvingTaskRef,
},
{
status: corev1.ConditionFalse,
reason: "",
},
{
status: corev1.ConditionFalse,
reason: "ResolvingPipelineRef", // unfortunately importing pipelinerun.ReasonResolvingPipelineRef results in a circular dependency
},
{
status: corev1.ConditionFalse,
reason: v1.TaskRunReasonResolvingTaskRef,
},
{
status: corev1.ConditionUnknown,
reason: "",
},
{
status: corev1.ConditionUnknown,
reason: "ResolvingPipelineRef", // unfortunately importing pipelinerun.ReasonResolvingPipelineRef results in a circular dependency
prWaitCount: 1,
},
{
status: corev1.ConditionUnknown,
reason: v1.TaskRunReasonResolvingTaskRef,
trWaitCount: 1,
},
} {
unregisterMetrics()
tr := &v1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{Name: names.SimpleNameGenerator.RestrictLengthWithRandomSuffix("taskrun-")},
Status: v1.PipelineRunStatus{
Status: duckv1.Status{
Conditions: duckv1.Conditions{{
Type: apis.ConditionSucceeded,
Status: tc.status,
Reason: tc.reason,
}},
},
},
}
ctx, _ := ttesting.SetupFakeContext(t)
informer := fakepipelineruninformer.Get(ctx)
if err := informer.Informer().GetIndexer().Add(tr); err != nil {
t.Fatalf("Adding TaskRun to informer: %v", err)
}

ctx = getConfigContext()
metrics, err := NewRecorder(ctx)
if err != nil {
t.Fatalf("NewRecorder: %v", err)
}

if err := metrics.RunningPipelineRuns(informer.Lister()); err != nil {
t.Errorf("RunningTaskRuns: %v", err)
}
metricstest.CheckLastValueData(t, "running_pipelineruns_waiting_on_pipeline_resolution_count", map[string]string{}, tc.prWaitCount)
metricstest.CheckLastValueData(t, "running_pipelineruns_waiting_on_task_resolution_count", map[string]string{}, tc.trWaitCount)
}
}

func unregisterMetrics() {
metricstest.Unregister("pipelinerun_duration_seconds", "pipelinerun_count", "running_pipelineruns_count")
metricstest.Unregister("pipelinerun_duration_seconds", "pipelinerun_count", "running_pipelineruns_waiting_on_pipeline_resolution_count", "running_pipelineruns_waiting_on_task_resolution_count", "running_pipelineruns_count")

// Allow the recorder singleton to be recreated.
once = sync.Once{}
Expand Down
33 changes: 25 additions & 8 deletions pkg/taskrunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ var (
statusTag = tag.MustNewKey("status")
podTag = tag.MustNewKey("pod")

trDurationView *view.View
prTRDurationView *view.View
trCountView *view.View
runningTRsCountView *view.View
runningTRsThrottledByQuotaCountView *view.View
runningTRsThrottledByNodeCountView *view.View
podLatencyView *view.View
trDurationView *view.View
prTRDurationView *view.View
trCountView *view.View
runningTRsCountView *view.View
runningTRsThrottledByQuotaCountView *view.View
runningTRsThrottledByNodeCountView *view.View
runningTRsWaitingOnBundleResolutionCountView *view.View
podLatencyView *view.View

trDuration = stats.Float64(
"taskrun_duration_seconds",
Expand Down Expand Up @@ -86,6 +87,10 @@ var (
"Number of taskruns executing currently, but whose underlying Pods or Containers are suspended by k8s because of Node level constraints. Such suspensions can occur as part of initial scheduling of the Pod, or scheduling of any of the subsequent Container(s) in the Pod after the first Container is started",
stats.UnitDimensionless)

runningTRsWaitingOnTaskResolutionCount = stats.Float64("running_taskruns_waiting_on_task_resolution_count",
"Number of taskruns executing currently that are waiting on resolution requests for their task references.",
stats.UnitDimensionless)

podLatency = stats.Float64("taskruns_pod_latency_milliseconds",
"scheduling latency for the taskruns pods",
stats.UnitMilliseconds)
Expand Down Expand Up @@ -219,6 +224,11 @@ func viewRegister(cfg *config.Metrics) error {
Measure: runningTRsThrottledByNodeCount,
Aggregation: view.LastValue(),
}
runningTRsWaitingOnBundleResolutionCountView = &view.View{
Description: runningTRsWaitingOnTaskResolutionCount.Description(),
Measure: runningTRsWaitingOnTaskResolutionCount,
Aggregation: view.LastValue(),
}
podLatencyView = &view.View{
Description: podLatency.Description(),
Measure: podLatency,
Expand All @@ -232,6 +242,7 @@ func viewRegister(cfg *config.Metrics) error {
runningTRsCountView,
runningTRsThrottledByQuotaCountView,
runningTRsThrottledByNodeCountView,
runningTRsWaitingOnBundleResolutionCountView,
podLatencyView,
)
}
Expand All @@ -244,6 +255,7 @@ func viewUnregister() {
runningTRsCountView,
runningTRsThrottledByQuotaCountView,
runningTRsThrottledByNodeCountView,
runningTRsWaitingOnBundleResolutionCountView,
podLatencyView,
)
}
Expand Down Expand Up @@ -358,6 +370,7 @@ func (r *Recorder) RunningTaskRuns(ctx context.Context, lister listers.TaskRunLi
var runningTrs int
var trsThrottledByQuota int
var trsThrottledByNode int
var trsWaitResolvingTaskRef int
for _, pr := range trs {
if pr.IsDone() {
continue
Expand All @@ -370,6 +383,8 @@ func (r *Recorder) RunningTaskRuns(ctx context.Context, lister listers.TaskRunLi
trsThrottledByQuota++
case pod.ReasonExceededNodeResources:
trsThrottledByNode++
case v1.TaskRunReasonResolvingTaskRef:
trsWaitResolvingTaskRef++
}
}
}
Expand All @@ -381,6 +396,7 @@ func (r *Recorder) RunningTaskRuns(ctx context.Context, lister listers.TaskRunLi
metrics.Record(ctx, runningTRsCount.M(float64(runningTrs)))
metrics.Record(ctx, runningTRsThrottledByNodeCount.M(float64(trsThrottledByNode)))
metrics.Record(ctx, runningTRsThrottledByQuotaCount.M(float64(trsThrottledByQuota)))
metrics.Record(ctx, runningTRsWaitingOnTaskResolutionCount.M(float64(trsWaitResolvingTaskRef)))

return nil
}
Expand All @@ -400,7 +416,8 @@ func (r *Recorder) ReportRunningTaskRuns(ctx context.Context, lister listers.Tas
return

case <-delay.C:
// Every 30s surface a metric for the number of running tasks, as well as those running tasks that are currently throttled by k8s.
// Every 30s surface a metric for the number of running tasks, as well as those running tasks that are currently throttled by k8s,
// and those running tasks waiting on task reference resolution
if err := r.RunningTaskRuns(ctx, lister); err != nil {
logger.Warnf("Failed to log the metrics : %v", err)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/taskrunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func TestRecordRunningTaskRunsThrottledCounts(t *testing.T) {
reason string
nodeCount float64
quotaCount float64
waitCount float64
}{
{
status: corev1.ConditionTrue,
Expand All @@ -435,6 +436,10 @@ func TestRecordRunningTaskRunsThrottledCounts(t *testing.T) {
status: corev1.ConditionTrue,
reason: pod.ReasonExceededNodeResources,
},
{
status: corev1.ConditionTrue,
reason: v1.TaskRunReasonResolvingTaskRef,
},
{
status: corev1.ConditionFalse,
reason: "",
Expand All @@ -447,6 +452,10 @@ func TestRecordRunningTaskRunsThrottledCounts(t *testing.T) {
status: corev1.ConditionFalse,
reason: pod.ReasonExceededNodeResources,
},
{
status: corev1.ConditionFalse,
reason: v1.TaskRunReasonResolvingTaskRef,
},
{
status: corev1.ConditionUnknown,
reason: "",
Expand All @@ -461,6 +470,11 @@ func TestRecordRunningTaskRunsThrottledCounts(t *testing.T) {
reason: pod.ReasonExceededNodeResources,
nodeCount: 1,
},
{
status: corev1.ConditionUnknown,
reason: v1.TaskRunReasonResolvingTaskRef,
waitCount: 1,
},
} {
unregisterMetrics()
tr := &v1.TaskRun{
Expand Down Expand Up @@ -492,6 +506,7 @@ func TestRecordRunningTaskRunsThrottledCounts(t *testing.T) {
}
metricstest.CheckLastValueData(t, "running_taskruns_throttled_by_quota_count", map[string]string{}, tc.quotaCount)
metricstest.CheckLastValueData(t, "running_taskruns_throttled_by_node_count", map[string]string{}, tc.nodeCount)
metricstest.CheckLastValueData(t, "running_taskruns_waiting_on_task_resolution_count", map[string]string{}, tc.waitCount)
}
}

Expand Down Expand Up @@ -610,7 +625,7 @@ func TestTaskRunIsOfPipelinerun(t *testing.T) {
}

func unregisterMetrics() {
metricstest.Unregister("taskrun_duration_seconds", "pipelinerun_taskrun_duration_seconds", "taskrun_count", "running_taskruns_count", "running_taskruns_throttled_by_quota_count", "running_taskruns_throttled_by_node_count", "taskruns_pod_latency_milliseconds")
metricstest.Unregister("taskrun_duration_seconds", "pipelinerun_taskrun_duration_seconds", "taskrun_count", "running_taskruns_count", "running_taskruns_throttled_by_quota_count", "running_taskruns_throttled_by_node_count", "running_taskruns_waiting_on_task_resolution_count", "taskruns_pod_latency_milliseconds")

// Allow the recorder singleton to be recreated.
once = sync.Once{}
Expand Down

0 comments on commit 423b81d

Please sign in to comment.