diff --git a/pkg/skaffold/kubernetes/log.go b/pkg/skaffold/kubernetes/log.go index 5c0440ab89f..1df0ed4a977 100644 --- a/pkg/skaffold/kubernetes/log.go +++ b/pkg/skaffold/kubernetes/log.go @@ -62,50 +62,47 @@ func NewLogAggregator(out io.Writer, podSelector PodSelector, colorPicker ColorP } func (a *LogAggregator) Start(ctx context.Context) error { + a.startTime = time.Now() + kubeclient, err := Client() if err != nil { return errors.Wrap(err, "getting k8s client") } client := kubeclient.CoreV1() + var forever int64 = 3600 * 24 * 365 * 100 + watcher, err := client.Pods("").Watch(meta_v1.ListOptions{ + IncludeUninitialized: true, + TimeoutSeconds: &forever, + }) + + if err != nil { + return errors.Wrap(err, "initializing pod watcher") + } + go func() { - retryLoop: + defer watcher.Stop() + for { - a.startTime = time.Now() + select { + case <-ctx.Done(): + return + case evt, ok := <-watcher.ResultChan(): + if !ok { + return + } - watcher, err := client.Pods("").Watch(meta_v1.ListOptions{ - IncludeUninitialized: true, - }) + if evt.Type != watch.Added && evt.Type != watch.Modified { + continue + } - if err != nil { - logrus.Errorf("initializing pod watcher %s", err) - return - } + pod, ok := evt.Object.(*v1.Pod) + if !ok { + continue + } - eventLoop: - for { - select { - case <-ctx.Done(): - watcher.Stop() - return - case evt, ok := <-watcher.ResultChan(): - if !ok { - // expected: server connection timeout - continue retryLoop - } - - if evt.Type != watch.Added && evt.Type != watch.Modified { - continue eventLoop - } - - pod, ok := evt.Object.(*v1.Pod) - if !ok { - continue eventLoop - } - - if a.podSelector.Select(pod) { - go a.streamLogs(ctx, pod) - } + if a.podSelector.Select(pod) { + go a.streamLogs(ctx, pod) } } } @@ -114,6 +111,16 @@ func (a *LogAggregator) Start(ctx context.Context) error { return nil } +func sinceSeconds(d time.Duration) int64 { + since := int64((d + 999*time.Millisecond).Truncate(1 * time.Second).Seconds()) + if since != 0 { + return since + } + + // 0 means all the logs. So we ask for the logs since 1s. + return 1 +} + func (a *LogAggregator) streamLogs(ctx context.Context, pod *v1.Pod) { for _, container := range pod.Status.ContainerStatuses { containerID := container.ContainerID @@ -128,18 +135,15 @@ func (a *LogAggregator) streamLogs(ctx context.Context, pod *v1.Pod) { logrus.Infof("Stream logs from pod: %s container: %s", pod.Name, container.Name) - tr, tw := io.Pipe() - go func() { - sinceSeconds := int64(time.Since(a.startTime).Seconds() + 0.5) - // 0s means all the logs - if sinceSeconds == 0 { - sinceSeconds = 1 - } + // In theory, it's more precise to use --since-time='' but there can be a time + // difference between the user's machine and the server. + // So we use --since=Xs and round up to the nearest second to not lose any log. + sinceSeconds := fmt.Sprintf("--since=%ds", sinceSeconds(time.Since(a.startTime))) - cmd := exec.CommandContext(ctx, "kubectl", "logs", fmt.Sprintf("--since=%ds", sinceSeconds), "-f", pod.Name, "-c", container.Name, "--namespace", pod.Namespace) - cmd.Stdout = tw - cmd.Run() - }() + tr, tw := io.Pipe() + cmd := exec.CommandContext(ctx, "kubectl", "logs", sinceSeconds, "-f", pod.Name, "-c", container.Name, "--namespace", pod.Namespace) + cmd.Stdout = tw + go cmd.Run() color := a.colorPicker.Pick(pod) prefix := prefix(pod, container) diff --git a/pkg/skaffold/kubernetes/log_test.go b/pkg/skaffold/kubernetes/log_test.go new file mode 100644 index 00000000000..273de1669a0 --- /dev/null +++ b/pkg/skaffold/kubernetes/log_test.go @@ -0,0 +1,51 @@ +/* +Copyright 2018 The Skaffold Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubernetes + +import ( + "testing" + "time" +) + +func TestSinceSeconds(t *testing.T) { + var tests = []struct { + description string + duration time.Duration + expected int64 + }{ + {"0s", 0, 1}, + {"1ms", 1 * time.Millisecond, 1}, + {"500ms", 500 * time.Millisecond, 1}, + {"999ms", 999 * time.Millisecond, 1}, + {"1s", 1 * time.Second, 1}, + {"1.1s", 1100 * time.Millisecond, 2}, + {"1.5s", 1500 * time.Millisecond, 2}, + {"1.9s", 1500 * time.Millisecond, 2}, + {"2s", 2 * time.Second, 2}, + {"10s", 10 * time.Second, 10}, + {"60s", 60 * time.Second, 60}, + } + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + since := sinceSeconds(test.duration) + + if since != test.expected { + t.Errorf("Expected %d. Got %d", test.expected, since) + } + }) + } +}