diff --git a/collector/collector_manager.go b/collector/collector_manager.go new file mode 100644 index 0000000000000..0641d199dfcec --- /dev/null +++ b/collector/collector_manager.go @@ -0,0 +1,83 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "fmt" + "strings" + "time" +) + +type collectorManager struct { + collectors []*collectorData +} + +var _ CollectorManager = &collectorManager{} + +type collectorData struct { + collector Collector + nextCollectionTime time.Time +} + +// Returns a new CollectorManager that is thread-compatible. +func NewCollectorManager() (CollectorManager, error) { + return &collectorManager{ + collectors: []*collectorData{}, + }, nil +} + +func (cm *collectorManager) RegisterCollector(collector Collector) error { + cm.collectors = append(cm.collectors, &collectorData{ + collector: collector, + nextCollectionTime: time.Now(), + }) + return nil +} + +func (cm *collectorManager) Collect() (time.Time, error) { + var errors []error + + // Collect from all collectors that are ready. + var next time.Time + for _, c := range cm.collectors { + if c.nextCollectionTime.Before(time.Now()) { + nextCollection, err := c.collector.Collect() + if err != nil { + errors = append(errors, err) + } + c.nextCollectionTime = nextCollection + } + + // Keep track of the next collector that will be ready. + if next.IsZero() || next.After(c.nextCollectionTime) { + next = c.nextCollectionTime + } + } + + return next, compileErrors(errors) +} + +// Make an error slice into a single error. +func compileErrors(errors []error) error { + if len(errors) == 0 { + return nil + } + + res := make([]string, len(errors)) + for i := range errors { + res[i] = fmt.Sprintf("Error %d: %v", i, errors[i].Error()) + } + return fmt.Errorf("%s", strings.Join(res, ",")) +} diff --git a/collector/collector_manager_test.go b/collector/collector_manager_test.go new file mode 100644 index 0000000000000..85d6bbc08eeb0 --- /dev/null +++ b/collector/collector_manager_test.go @@ -0,0 +1,70 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type fakeCollector struct { + nextCollectionTime time.Time + err error + collectedFrom int +} + +func (fc *fakeCollector) Collect() (time.Time, error) { + fc.collectedFrom++ + return fc.nextCollectionTime, fc.err +} + +func (fc *fakeCollector) Name() string { + return "fake-collector" +} + +func TestCollect(t *testing.T) { + cm := &collectorManager{} + + firstTime := time.Now().Add(-time.Hour) + secondTime := time.Now().Add(time.Hour) + f1 := &fakeCollector{ + nextCollectionTime: firstTime, + } + f2 := &fakeCollector{ + nextCollectionTime: secondTime, + } + + assert := assert.New(t) + assert.NoError(cm.RegisterCollector(f1)) + assert.NoError(cm.RegisterCollector(f2)) + + // First collection, everyone gets collected from. + nextTime, err := cm.Collect() + assert.Equal(firstTime, nextTime) + assert.NoError(err) + assert.Equal(1, f1.collectedFrom) + assert.Equal(1, f2.collectedFrom) + + f1.nextCollectionTime = time.Now().Add(2 * time.Hour) + + // Second collection, only the one that is ready gets collected from. + nextTime, err = cm.Collect() + assert.Equal(secondTime, nextTime) + assert.NoError(err) + assert.Equal(2, f1.collectedFrom) + assert.Equal(1, f2.collectedFrom) +} diff --git a/collector/fakes.go b/collector/fakes.go new file mode 100644 index 0000000000000..d36f113614da5 --- /dev/null +++ b/collector/fakes.go @@ -0,0 +1,31 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "time" +) + +type FakeCollectorManager struct { +} + +func (fkm *FakeCollectorManager) RegisterCollector(collector Collector) error { + return nil +} + +func (fkm *FakeCollectorManager) Collect() (time.Time, error) { + var zero time.Time + return zero, nil +} diff --git a/collector/types.go b/collector/types.go new file mode 100644 index 0000000000000..4967a6ce27fb2 --- /dev/null +++ b/collector/types.go @@ -0,0 +1,45 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "time" +) + +// TODO(vmarmol): Export to a custom metrics type when that is available. + +// Metric collector. +type Collector interface { + // Collect metrics from this collector. + // Returns the next time this collector should be collected from. + // Next collection time is always returned, even when an error occurs. + // A collection time of zero means no more collection. + Collect() (time.Time, error) + + // Name of this collector. + Name() string +} + +// Manages and runs collectors. +type CollectorManager interface { + // Register a collector. + RegisterCollector(collector Collector) error + + // Collect from collectors that are ready and return the next time + // at which a collector will be ready to collect from. + // Next collection time is always returned, even when an error occurs. + // A collection time of zero means no more collection. + Collect() (time.Time, error) +} diff --git a/manager/container.go b/manager/container.go index 1d77adcfa4d8e..ed62dbdca54b4 100644 --- a/manager/container.go +++ b/manager/container.go @@ -24,6 +24,7 @@ import ( "github.com/docker/docker/pkg/units" "github.com/golang/glog" + "github.com/google/cadvisor/collector" "github.com/google/cadvisor/container" info "github.com/google/cadvisor/info/v1" "github.com/google/cadvisor/info/v2" @@ -63,6 +64,9 @@ type containerData struct { // Tells the container to stop. stop chan bool + + // Runs custom metric collectors. + collectorManager collector.CollectorManager } func (c *containerData) Start() error { @@ -109,7 +113,7 @@ func (c *containerData) DerivedStats() (v2.DerivedStats, error) { return c.summaryReader.DerivedStats() } -func newContainerData(containerName string, memoryStorage *memory.InMemoryStorage, handler container.ContainerHandler, loadReader cpuload.CpuLoadReader, logUsage bool) (*containerData, error) { +func newContainerData(containerName string, memoryStorage *memory.InMemoryStorage, handler container.ContainerHandler, loadReader cpuload.CpuLoadReader, logUsage bool, collectorManager collector.CollectorManager) (*containerData, error) { if memoryStorage == nil { return nil, fmt.Errorf("nil memory storage") } @@ -129,6 +133,7 @@ func newContainerData(containerName string, memoryStorage *memory.InMemoryStorag logUsage: logUsage, loadAvg: -1.0, // negative value indicates uninitialized. stop: make(chan bool, 1), + collectorManager: collectorManager, } cont.info.ContainerReference = ref @@ -172,6 +177,7 @@ func (self *containerData) nextHousekeeping(lastHousekeeping time.Time) time.Tim return lastHousekeeping.Add(self.housekeepingInterval) } +// TODO(vmarmol): Implement stats collecting as a custom collector. func (c *containerData) housekeeping() { // Long housekeeping is either 100ms or half of the housekeeping interval. longHousekeeping := 100 * time.Millisecond @@ -226,12 +232,24 @@ func (c *containerData) housekeeping() { } } - // Schedule the next housekeeping. Sleep until that time. + // Run custom collectors. + nextCollectionTime, err := c.collectorManager.Collect() + if err != nil && c.allowErrorLogging() { + glog.Warningf("[%s] Collection failed: %v", c.info.Name, err) + } + + // Next housekeeping is the first of the stats or the custom collector's housekeeping. nextHousekeeping := c.nextHousekeeping(lastHousekeeping) - if time.Now().Before(nextHousekeeping) { - time.Sleep(nextHousekeeping.Sub(time.Now())) + next := nextHousekeeping + if !nextCollectionTime.IsZero() && nextCollectionTime.Before(nextHousekeeping) { + next = nextCollectionTime + } + + // Schedule the next housekeeping. Sleep until that time. + if time.Now().Before(next) { + time.Sleep(next.Sub(time.Now())) } - lastHousekeeping = nextHousekeeping + lastHousekeeping = next } } diff --git a/manager/container_test.go b/manager/container_test.go index bd73a1199c940..bb5bbb9f16cc7 100644 --- a/manager/container_test.go +++ b/manager/container_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/google/cadvisor/collector" "github.com/google/cadvisor/container" info "github.com/google/cadvisor/info/v1" itest "github.com/google/cadvisor/info/v1/test" @@ -40,7 +41,7 @@ func setupContainerData(t *testing.T, spec info.ContainerSpec) (*containerData, nil, ) memoryStorage := memory.New(60, nil) - ret, err := newContainerData(containerName, memoryStorage, mockHandler, nil, false) + ret, err := newContainerData(containerName, memoryStorage, mockHandler, nil, false, &collector.FakeCollectorManager{}) if err != nil { t.Fatal(err) } diff --git a/manager/manager.go b/manager/manager.go index 4de8fa2b14ef9..f7c8e92bb1c9c 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -27,6 +27,7 @@ import ( "github.com/docker/libcontainer/cgroups" "github.com/golang/glog" + "github.com/google/cadvisor/collector" "github.com/google/cadvisor/container" "github.com/google/cadvisor/container/docker" "github.com/google/cadvisor/container/raw" @@ -650,8 +651,13 @@ func (m *manager) createContainer(containerName string) error { glog.V(4).Infof("ignoring container %q", containerName) return nil } + // TODO(vmarmol): Register collectors. + collectorManager, err := collector.NewCollectorManager() + if err != nil { + return err + } logUsage := *logCadvisorUsage && containerName == m.cadvisorContainer - cont, err := newContainerData(containerName, m.memoryStorage, handler, m.loadReader, logUsage) + cont, err := newContainerData(containerName, m.memoryStorage, handler, m.loadReader, logUsage, collectorManager) if err != nil { return err } diff --git a/manager/manager_test.go b/manager/manager_test.go index 382f71cac8076..89cfe7aa6795a 100644 --- a/manager/manager_test.go +++ b/manager/manager_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/google/cadvisor/collector" "github.com/google/cadvisor/container" "github.com/google/cadvisor/container/docker" info "github.com/google/cadvisor/info/v1" @@ -52,7 +53,7 @@ func createManagerAndAddContainers( spec, nil, ).Once() - cont, err := newContainerData(name, memoryStorage, mockHandler, nil, false) + cont, err := newContainerData(name, memoryStorage, mockHandler, nil, false, &collector.FakeCollectorManager{}) if err != nil { t.Fatal(err) }