Skip to content

Commit

Permalink
Merge pull request #918 from dgageot/improve-logs
Browse files Browse the repository at this point in the history
Improve logs
  • Loading branch information
dgageot authored Aug 24, 2018
2 parents 80d365c + 984a0ef commit 6ad057a
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 44 deletions.
92 changes: 48 additions & 44 deletions pkg/skaffold/kubernetes/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,50 +62,47 @@ func NewLogAggregator(out io.Writer, podSelector PodSelector, colorPicker ColorP
}

func (a *LogAggregator) Start(ctx context.Context) error {
a.startTime = time.Now()

kubeclient, err := Client()
if err != nil {
return errors.Wrap(err, "getting k8s client")
}
client := kubeclient.CoreV1()

var forever int64 = 3600 * 24 * 365 * 100
watcher, err := client.Pods("").Watch(meta_v1.ListOptions{
IncludeUninitialized: true,
TimeoutSeconds: &forever,
})

if err != nil {
return errors.Wrap(err, "initializing pod watcher")
}

go func() {
retryLoop:
defer watcher.Stop()

for {
a.startTime = time.Now()
select {
case <-ctx.Done():
return
case evt, ok := <-watcher.ResultChan():
if !ok {
return
}

watcher, err := client.Pods("").Watch(meta_v1.ListOptions{
IncludeUninitialized: true,
})
if evt.Type != watch.Added && evt.Type != watch.Modified {
continue
}

if err != nil {
logrus.Errorf("initializing pod watcher %s", err)
return
}
pod, ok := evt.Object.(*v1.Pod)
if !ok {
continue
}

eventLoop:
for {
select {
case <-ctx.Done():
watcher.Stop()
return
case evt, ok := <-watcher.ResultChan():
if !ok {
// expected: server connection timeout
continue retryLoop
}

if evt.Type != watch.Added && evt.Type != watch.Modified {
continue eventLoop
}

pod, ok := evt.Object.(*v1.Pod)
if !ok {
continue eventLoop
}

if a.podSelector.Select(pod) {
go a.streamLogs(ctx, pod)
}
if a.podSelector.Select(pod) {
go a.streamLogs(ctx, pod)
}
}
}
Expand All @@ -114,6 +111,16 @@ func (a *LogAggregator) Start(ctx context.Context) error {
return nil
}

func sinceSeconds(d time.Duration) int64 {
since := int64((d + 999*time.Millisecond).Truncate(1 * time.Second).Seconds())
if since != 0 {
return since
}

// 0 means all the logs. So we ask for the logs since 1s.
return 1
}

func (a *LogAggregator) streamLogs(ctx context.Context, pod *v1.Pod) {
for _, container := range pod.Status.ContainerStatuses {
containerID := container.ContainerID
Expand All @@ -128,18 +135,15 @@ func (a *LogAggregator) streamLogs(ctx context.Context, pod *v1.Pod) {

logrus.Infof("Stream logs from pod: %s container: %s", pod.Name, container.Name)

tr, tw := io.Pipe()
go func() {
sinceSeconds := int64(time.Since(a.startTime).Seconds() + 0.5)
// 0s means all the logs
if sinceSeconds == 0 {
sinceSeconds = 1
}
// In theory, it's more precise to use --since-time='' but there can be a time
// difference between the user's machine and the server.
// So we use --since=Xs and round up to the nearest second to not lose any log.
sinceSeconds := fmt.Sprintf("--since=%ds", sinceSeconds(time.Since(a.startTime)))

cmd := exec.CommandContext(ctx, "kubectl", "logs", fmt.Sprintf("--since=%ds", sinceSeconds), "-f", pod.Name, "-c", container.Name, "--namespace", pod.Namespace)
cmd.Stdout = tw
cmd.Run()
}()
tr, tw := io.Pipe()
cmd := exec.CommandContext(ctx, "kubectl", "logs", sinceSeconds, "-f", pod.Name, "-c", container.Name, "--namespace", pod.Namespace)
cmd.Stdout = tw
go cmd.Run()

color := a.colorPicker.Pick(pod)
prefix := prefix(pod, container)
Expand Down
51 changes: 51 additions & 0 deletions pkg/skaffold/kubernetes/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2018 The Skaffold Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubernetes

import (
"testing"
"time"
)

func TestSinceSeconds(t *testing.T) {
var tests = []struct {
description string
duration time.Duration
expected int64
}{
{"0s", 0, 1},
{"1ms", 1 * time.Millisecond, 1},
{"500ms", 500 * time.Millisecond, 1},
{"999ms", 999 * time.Millisecond, 1},
{"1s", 1 * time.Second, 1},
{"1.1s", 1100 * time.Millisecond, 2},
{"1.5s", 1500 * time.Millisecond, 2},
{"1.9s", 1500 * time.Millisecond, 2},
{"2s", 2 * time.Second, 2},
{"10s", 10 * time.Second, 10},
{"60s", 60 * time.Second, 60},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
since := sinceSeconds(test.duration)

if since != test.expected {
t.Errorf("Expected %d. Got %d", test.expected, since)
}
})
}
}

0 comments on commit 6ad057a

Please sign in to comment.