Skip to content

Commit

Permalink
Merge pull request kubernetes#678 from vmarmol/collector
Browse files Browse the repository at this point in the history
Add Collector executor support
  • Loading branch information
rjnagal committed May 5, 2015
2 parents 97266fc + bce54ce commit 6bcb4aa
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 8 deletions.
83 changes: 83 additions & 0 deletions collector/collector_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"fmt"
"strings"
"time"
)

type collectorManager struct {
collectors []*collectorData
}

var _ CollectorManager = &collectorManager{}

type collectorData struct {
collector Collector
nextCollectionTime time.Time
}

// Returns a new CollectorManager that is thread-compatible.
func NewCollectorManager() (CollectorManager, error) {
return &collectorManager{
collectors: []*collectorData{},
}, nil
}

func (cm *collectorManager) RegisterCollector(collector Collector) error {
cm.collectors = append(cm.collectors, &collectorData{
collector: collector,
nextCollectionTime: time.Now(),
})
return nil
}

func (cm *collectorManager) Collect() (time.Time, error) {
var errors []error

// Collect from all collectors that are ready.
var next time.Time
for _, c := range cm.collectors {
if c.nextCollectionTime.Before(time.Now()) {
nextCollection, err := c.collector.Collect()
if err != nil {
errors = append(errors, err)
}
c.nextCollectionTime = nextCollection
}

// Keep track of the next collector that will be ready.
if next.IsZero() || next.After(c.nextCollectionTime) {
next = c.nextCollectionTime
}
}

return next, compileErrors(errors)
}

// Make an error slice into a single error.
func compileErrors(errors []error) error {
if len(errors) == 0 {
return nil
}

res := make([]string, len(errors))
for i := range errors {
res[i] = fmt.Sprintf("Error %d: %v", i, errors[i].Error())
}
return fmt.Errorf("%s", strings.Join(res, ","))
}
70 changes: 70 additions & 0 deletions collector/collector_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type fakeCollector struct {
nextCollectionTime time.Time
err error
collectedFrom int
}

func (fc *fakeCollector) Collect() (time.Time, error) {
fc.collectedFrom++
return fc.nextCollectionTime, fc.err
}

func (fc *fakeCollector) Name() string {
return "fake-collector"
}

func TestCollect(t *testing.T) {
cm := &collectorManager{}

firstTime := time.Now().Add(-time.Hour)
secondTime := time.Now().Add(time.Hour)
f1 := &fakeCollector{
nextCollectionTime: firstTime,
}
f2 := &fakeCollector{
nextCollectionTime: secondTime,
}

assert := assert.New(t)
assert.NoError(cm.RegisterCollector(f1))
assert.NoError(cm.RegisterCollector(f2))

// First collection, everyone gets collected from.
nextTime, err := cm.Collect()
assert.Equal(firstTime, nextTime)
assert.NoError(err)
assert.Equal(1, f1.collectedFrom)
assert.Equal(1, f2.collectedFrom)

f1.nextCollectionTime = time.Now().Add(2 * time.Hour)

// Second collection, only the one that is ready gets collected from.
nextTime, err = cm.Collect()
assert.Equal(secondTime, nextTime)
assert.NoError(err)
assert.Equal(2, f1.collectedFrom)
assert.Equal(1, f2.collectedFrom)
}
31 changes: 31 additions & 0 deletions collector/fakes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"time"
)

type FakeCollectorManager struct {
}

func (fkm *FakeCollectorManager) RegisterCollector(collector Collector) error {
return nil
}

func (fkm *FakeCollectorManager) Collect() (time.Time, error) {
var zero time.Time
return zero, nil
}
45 changes: 45 additions & 0 deletions collector/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2015 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"time"
)

// TODO(vmarmol): Export to a custom metrics type when that is available.

// Metric collector.
type Collector interface {
// Collect metrics from this collector.
// Returns the next time this collector should be collected from.
// Next collection time is always returned, even when an error occurs.
// A collection time of zero means no more collection.
Collect() (time.Time, error)

// Name of this collector.
Name() string
}

// Manages and runs collectors.
type CollectorManager interface {
// Register a collector.
RegisterCollector(collector Collector) error

// Collect from collectors that are ready and return the next time
// at which a collector will be ready to collect from.
// Next collection time is always returned, even when an error occurs.
// A collection time of zero means no more collection.
Collect() (time.Time, error)
}
28 changes: 23 additions & 5 deletions manager/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/docker/docker/pkg/units"
"github.com/golang/glog"
"github.com/google/cadvisor/collector"
"github.com/google/cadvisor/container"
info "github.com/google/cadvisor/info/v1"
"github.com/google/cadvisor/info/v2"
Expand Down Expand Up @@ -63,6 +64,9 @@ type containerData struct {

// Tells the container to stop.
stop chan bool

// Runs custom metric collectors.
collectorManager collector.CollectorManager
}

func (c *containerData) Start() error {
Expand Down Expand Up @@ -109,7 +113,7 @@ func (c *containerData) DerivedStats() (v2.DerivedStats, error) {
return c.summaryReader.DerivedStats()
}

func newContainerData(containerName string, memoryStorage *memory.InMemoryStorage, handler container.ContainerHandler, loadReader cpuload.CpuLoadReader, logUsage bool) (*containerData, error) {
func newContainerData(containerName string, memoryStorage *memory.InMemoryStorage, handler container.ContainerHandler, loadReader cpuload.CpuLoadReader, logUsage bool, collectorManager collector.CollectorManager) (*containerData, error) {
if memoryStorage == nil {
return nil, fmt.Errorf("nil memory storage")
}
Expand All @@ -129,6 +133,7 @@ func newContainerData(containerName string, memoryStorage *memory.InMemoryStorag
logUsage: logUsage,
loadAvg: -1.0, // negative value indicates uninitialized.
stop: make(chan bool, 1),
collectorManager: collectorManager,
}
cont.info.ContainerReference = ref

Expand Down Expand Up @@ -172,6 +177,7 @@ func (self *containerData) nextHousekeeping(lastHousekeeping time.Time) time.Tim
return lastHousekeeping.Add(self.housekeepingInterval)
}

// TODO(vmarmol): Implement stats collecting as a custom collector.
func (c *containerData) housekeeping() {
// Long housekeeping is either 100ms or half of the housekeeping interval.
longHousekeeping := 100 * time.Millisecond
Expand Down Expand Up @@ -226,12 +232,24 @@ func (c *containerData) housekeeping() {
}
}

// Schedule the next housekeeping. Sleep until that time.
// Run custom collectors.
nextCollectionTime, err := c.collectorManager.Collect()
if err != nil && c.allowErrorLogging() {
glog.Warningf("[%s] Collection failed: %v", c.info.Name, err)
}

// Next housekeeping is the first of the stats or the custom collector's housekeeping.
nextHousekeeping := c.nextHousekeeping(lastHousekeeping)
if time.Now().Before(nextHousekeeping) {
time.Sleep(nextHousekeeping.Sub(time.Now()))
next := nextHousekeeping
if !nextCollectionTime.IsZero() && nextCollectionTime.Before(nextHousekeeping) {
next = nextCollectionTime
}

// Schedule the next housekeeping. Sleep until that time.
if time.Now().Before(next) {
time.Sleep(next.Sub(time.Now()))
}
lastHousekeeping = nextHousekeeping
lastHousekeeping = next
}
}

Expand Down
3 changes: 2 additions & 1 deletion manager/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/google/cadvisor/collector"
"github.com/google/cadvisor/container"
info "github.com/google/cadvisor/info/v1"
itest "github.com/google/cadvisor/info/v1/test"
Expand All @@ -40,7 +41,7 @@ func setupContainerData(t *testing.T, spec info.ContainerSpec) (*containerData,
nil,
)
memoryStorage := memory.New(60, nil)
ret, err := newContainerData(containerName, memoryStorage, mockHandler, nil, false)
ret, err := newContainerData(containerName, memoryStorage, mockHandler, nil, false, &collector.FakeCollectorManager{})
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 7 additions & 1 deletion manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/docker/libcontainer/cgroups"
"github.com/golang/glog"
"github.com/google/cadvisor/collector"
"github.com/google/cadvisor/container"
"github.com/google/cadvisor/container/docker"
"github.com/google/cadvisor/container/raw"
Expand Down Expand Up @@ -650,8 +651,13 @@ func (m *manager) createContainer(containerName string) error {
glog.V(4).Infof("ignoring container %q", containerName)
return nil
}
// TODO(vmarmol): Register collectors.
collectorManager, err := collector.NewCollectorManager()
if err != nil {
return err
}
logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer
cont, err := newContainerData(containerName, m.memoryStorage, handler, m.loadReader, logUsage)
cont, err := newContainerData(containerName, m.memoryStorage, handler, m.loadReader, logUsage, collectorManager)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/google/cadvisor/collector"
"github.com/google/cadvisor/container"
"github.com/google/cadvisor/container/docker"
info "github.com/google/cadvisor/info/v1"
Expand Down Expand Up @@ -52,7 +53,7 @@ func createManagerAndAddContainers(
spec,
nil,
).Once()
cont, err := newContainerData(name, memoryStorage, mockHandler, nil, false)
cont, err := newContainerData(name, memoryStorage, mockHandler, nil, false, &collector.FakeCollectorManager{})
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 6bcb4aa

Please sign in to comment.