diff --git a/atc/metric/emitter/prometheus.go b/atc/metric/emitter/prometheus.go index eea2e8e8c..40639fdb7 100644 --- a/atc/metric/emitter/prometheus.go +++ b/atc/metric/emitter/prometheus.go @@ -27,7 +27,8 @@ type PrometheusEmitter struct { concurrentRequestsLimitHit *prometheus.CounterVec concurrentRequests *prometheus.GaugeVec - tasksWaiting prometheus.Gauge + tasksWaiting *prometheus.GaugeVec + tasksWaitingDuration *prometheus.HistogramVec buildDurationsVec *prometheus.HistogramVec buildsAborted prometheus.Counter @@ -171,14 +172,23 @@ func (config *PrometheusConfig) NewEmitter() (metric.Emitter, error) { }, []string{"action"}) prometheus.MustRegister(concurrentRequests) - tasksWaiting := prometheus.NewGauge(prometheus.GaugeOpts{ + tasksWaiting := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "concourse", Subsystem: "tasks", Name: "waiting", Help: "Number of Concourse tasks currently waiting.", - }) + }, []string{"teamId", "workerTags", "platform"}) prometheus.MustRegister(tasksWaiting) + tasksWaitingDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "concourse", + Subsystem: "tasks", + Name: "wait_duration", + Help: "Elapsed time waiting for execution", + Buckets: []float64{1, 15, 30, 60, 120, 180, 240, 300, 600, 1200}, + }, []string{"teamId", "workerTags", "platform"}) + prometheus.MustRegister(tasksWaitingDuration) + buildsFinished := prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "concourse", Subsystem: "builds", @@ -398,7 +408,8 @@ func (config *PrometheusConfig) NewEmitter() (metric.Emitter, error) { concurrentRequestsLimitHit: concurrentRequestsLimitHit, concurrentRequests: concurrentRequests, - tasksWaiting: tasksWaiting, + tasksWaiting: tasksWaiting, + tasksWaitingDuration: tasksWaitingDuration, buildDurationsVec: buildDurationsVec, buildsAborted: buildsAborted, @@ -463,9 +474,22 @@ func (emitter *PrometheusEmitter) Emit(logger lager.Logger, event metric.Event) case "concurrent requests limit hit": emitter.concurrentRequestsLimitHit.WithLabelValues(event.Attributes["action"]).Add(event.Value) case "concurrent requests": - emitter.concurrentRequests.WithLabelValues(event.Attributes["action"]).Set(event.Value) + emitter.concurrentRequests. + WithLabelValues(event.Attributes["action"]).Set(event.Value) case "tasks waiting": - emitter.tasksWaiting.Set(event.Value) + emitter.tasksWaiting. + WithLabelValues( + event.Attributes["teamId"], + event.Attributes["workerTags"], + event.Attributes["platform"], + ).Set(event.Value) + case "tasks waiting duration": + emitter.tasksWaitingDuration. + WithLabelValues( + event.Attributes["teamId"], + event.Attributes["workerTags"], + event.Attributes["platform"], + ).Observe(event.Value) case "build finished": emitter.buildFinishedMetrics(logger, event) case "worker containers": diff --git a/atc/metric/emitter/prometheus_test.go b/atc/metric/emitter/prometheus_test.go index 3a06d675a..ff082d416 100644 --- a/atc/metric/emitter/prometheus_test.go +++ b/atc/metric/emitter/prometheus_test.go @@ -187,7 +187,6 @@ var _ = Describe("PrometheusEmitter", func() { } }) - JustBeforeEach(func() { prometheusEmitter, err = prometheusConfig.NewEmitter() }) @@ -196,6 +195,11 @@ var _ = Describe("PrometheusEmitter", func() { prometheusEmitter.Emit(logger, metric.Event{ Name: "tasks waiting", Value: 4, + Attributes: map[string]string{ + "teamId": "42", + "workerTags": "tester", + "platform": "darwin", + }, }) res, _ := http.Get(fmt.Sprintf("http://%s:%s/metrics", prometheusConfig.BindIP, prometheusConfig.BindPort)) @@ -203,7 +207,7 @@ var _ = Describe("PrometheusEmitter", func() { body, _ := ioutil.ReadAll(res.Body) Expect(res.StatusCode).To(Equal(http.StatusOK)) - Expect(string(body)).To(ContainSubstring("concourse_tasks_waiting 4")) + Expect(string(body)).To(ContainSubstring("concourse_tasks_waiting{platform=\"darwin\",teamId=\"42\",workerTags=\"tester\"} 4")) Expect(err).To(BeNil()) }) }) diff --git a/atc/metric/metrics.go b/atc/metric/metrics.go index 70647d99a..69b8580d1 100644 --- a/atc/metric/metrics.go +++ b/atc/metric/metrics.go @@ -30,8 +30,6 @@ var JobsScheduling = &Gauge{} var BuildsStarted = &Counter{} var BuildsRunning = &Gauge{} -var TasksWaiting = &Gauge{} - var ChecksFinishedWithError = &Counter{} var ChecksFinishedWithSuccess = &Counter{} var ChecksQueueSize = &Gauge{} @@ -41,6 +39,34 @@ var ChecksEnqueued = &Counter{} var ConcurrentRequests = map[string]*Gauge{} var ConcurrentRequestsLimitHit = map[string]*Counter{} +type TasksWaitingLabels struct { + TeamId string + WorkerTags string + Platform string +} + +var TasksWaiting = map[TasksWaitingLabels]*Gauge{} + +type TasksWaitingDuration struct { + Labels TasksWaitingLabels + Duration time.Duration +} + +func (event TasksWaitingDuration) Emit(logger lager.Logger) { + emit( + logger.Session("tasks-waiting-duration"), + Event{ + Name: "tasks waiting duration", + Value: event.Duration.Seconds(), + Attributes: map[string]string{ + "teamId": event.Labels.TeamId, + "workerTags": event.Labels.WorkerTags, + "platform": event.Labels.Platform, + }, + }, + ) +} + type BuildCollectorDuration struct { Duration time.Duration } diff --git a/atc/metric/periodic.go b/atc/metric/periodic.go index 32f5f69d6..2463b8b02 100644 --- a/atc/metric/periodic.go +++ b/atc/metric/periodic.go @@ -165,13 +165,20 @@ func tick(logger lager.Logger) { ) } - emit( - logger.Session("tasks-waiting"), - Event{ - Name: "tasks waiting", - Value: TasksWaiting.Max(), - }, - ) + for labels, gauge := range TasksWaiting { + emit( + logger.Session("tasks-waiting"), + Event{ + Name: "tasks waiting", + Value: gauge.Max(), + Attributes: map[string]string{ + "teamId": labels.TeamId, + "workerTags": labels.WorkerTags, + "platform": labels.Platform, + }, + }, + ) + } emit( logger.Session("checks-finished-with-error"), diff --git a/atc/metric/periodic_test.go b/atc/metric/periodic_test.go index 7c7f71185..97a2a1827 100644 --- a/atc/metric/periodic_test.go +++ b/atc/metric/periodic_test.go @@ -142,10 +142,17 @@ var _ = Describe("Periodic emission of metrics", func() { }) Context("limit-active-tasks metrics", func() { + labels := metric.TasksWaitingLabels{ + TeamId: "42", + WorkerTags: "tester", + Platform: "darwin", + } + BeforeEach(func() { gauge := &metric.Gauge{} gauge.Set(123) - metric.TasksWaiting = gauge + + metric.TasksWaiting[labels] = gauge }) It("emits", func() { Eventually(emitter.EmitCallCount).Should(BeNumerically(">=", 1)) @@ -155,6 +162,11 @@ var _ = Describe("Periodic emission of metrics", func() { MatchFields(IgnoreExtras, Fields{ "Name": Equal("tasks waiting"), "Value": Equal(float64(123)), + "Attributes": Equal(map[string]string{ + "teamId": labels.TeamId, + "workerTags": labels.WorkerTags, + "platform": labels.Platform, + }), }), ), ), diff --git a/atc/worker/choose_task_worker_test.go b/atc/worker/choose_task_worker_test.go index 138d423e2..53d1a35e5 100644 --- a/atc/worker/choose_task_worker_test.go +++ b/atc/worker/choose_task_worker_test.go @@ -2,11 +2,14 @@ package worker_test import ( "bytes" - "code.cloudfoundry.org/garden" "context" - "github.com/concourse/concourse/atc/metric" + "strconv" + "strings" "time" + "code.cloudfoundry.org/garden" + "github.com/concourse/concourse/atc/metric" + "code.cloudfoundry.org/lager/lagertest" "github.com/concourse/concourse/atc" @@ -166,8 +169,15 @@ var _ = Describe("RunTaskStep", func() { }) It("task waiting metrics is gauged", func() { - Eventually(metric.TasksWaiting.Max(), 2*time.Second).Should(Equal(float64(1))) - Eventually(metric.TasksWaiting.Max(), 2*time.Second).Should(Equal(float64(0))) + labels := metric.TasksWaitingLabels{ + TeamId: strconv.Itoa(fakeWorkerSpec.TeamID), + WorkerTags: strings.Join(fakeContainerSpec.Tags, "_"), + Platform: fakeWorkerSpec.Platform, + } + // Verify that when one task is waiting the gauge is increased... + Eventually(metric.TasksWaiting[labels].Max(), 2*time.Second).Should(Equal(float64(1))) + // and then decreased. + Eventually(metric.TasksWaiting[labels].Max(), 2*time.Second).Should(Equal(float64(0))) }) It("writes status to output writer", func() { diff --git a/atc/worker/client.go b/atc/worker/client.go index f0a7a0946..1fe33f0ec 100644 --- a/atc/worker/client.go +++ b/atc/worker/client.go @@ -7,6 +7,7 @@ import ( "io" "path" "strconv" + "strings" "time" "code.cloudfoundry.org/garden" @@ -586,6 +587,12 @@ func (client *client) chooseTaskWorker( workerStatusPublishTicker := time.NewTicker(client.workerStatusPublishInterval) defer workerStatusPublishTicker.Stop() + tasksWaitingLabels := metric.TasksWaitingLabels{ + TeamId: strconv.Itoa(workerSpec.TeamID), + WorkerTags: strings.Join(containerSpec.Tags, "_"), + Platform: workerSpec.Platform, + } + for { if chosenWorker, err = client.pool.FindOrChooseWorkerForContainer( ctx, @@ -631,6 +638,10 @@ func (client *client) chooseTaskWorker( if elapsed > 0 { message := fmt.Sprintf("Found a free worker after waiting %s.\n", elapsed.Round(1*time.Second)) writeOutputMessage(logger, outputWriter, message) + metric.TasksWaitingDuration{ + Labels: tasksWaitingLabels, + Duration: elapsed, + }.Emit(logger) } return chosenWorker, err @@ -643,8 +654,12 @@ func (client *client) chooseTaskWorker( // Increase task waiting only once if elapsed == 0 { - metric.TasksWaiting.Inc() - defer metric.TasksWaiting.Dec() + _, ok := metric.TasksWaiting[tasksWaitingLabels] + if !ok { + metric.TasksWaiting[tasksWaitingLabels] = &metric.Gauge{} + } + metric.TasksWaiting[tasksWaitingLabels].Inc() + defer metric.TasksWaiting[tasksWaitingLabels].Dec() } elapsed = waitForWorker(logger, diff --git a/release-notes/latest.md b/release-notes/latest.md index 6d9baa0a0..541c3ae5e 100644 --- a/release-notes/latest.md +++ b/release-notes/latest.md @@ -25,4 +25,11 @@ #### :link: feature -* Refactor TSA to use Concourse's gclient which has a configurable timeout Issue: #5146 PR: #5845 \ No newline at end of file +* Refactor TSA to use Concourse's gclient which has a configurable timeout Issue: #5146 PR: #5845 + +#### :link: feature + +* Enhance `task_waiting` metric to export labels in Prometheus for: platform, worker tags and team of the tasks awaiting execution. + + A new metric called `tasks_wait_duration_bucket` is also added to express as quantiles the average time spent by tasks awaiting execution. PR: #5981 + ![Example graph for the task wait time histograms.](https://user-images.githubusercontent.com/40891147/89990749-189d2600-dc83-11ea-8fde-ae579fdb0a0a.png)