Skip to content

Commit

Permalink
fix(metastore): empty object corrections (#16147)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Feb 10, 2025
1 parent f084f02 commit d9892b4
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 69 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -409,4 +409,4 @@ replace github.com/grafana/loki/pkg/push => ./pkg/push
// leodido fork his project to continue support
replace github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0

replace github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6
replace github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250210100727-533688b5600d
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,8 @@ github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675 h1:U94jQ2TQr1m3
github.com/grafana/jsonparser v0.0.0-20241004153430-023329977675/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6 h1:SlGPi1Sg15c/OzhGMAd7/EOnYJ03ZX6Wuql8lQ2pRU4=
github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM=
github.com/grafana/objstore v0.0.0-20250210100727-533688b5600d h1:prt2nn03NfxwgXWZNmC8a7jahg/R6mtyGmfKY3sbd6E=
github.com/grafana/objstore v0.0.0-20250210100727-533688b5600d/go.mod h1:Quz9HUDjGidU0RQpoytzK4KqJ7kwzP+DMAm4K57/usM=
github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg=
github.com/grafana/pyroscope-go/godeltaprof v0.1.8/go.mod h1:2+l7K7twW49Ct4wFluZD3tZ6e0SjanjcUUBPVD/UuGU=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248=
Expand Down
29 changes: 18 additions & 11 deletions pkg/dataobj/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"

Expand Down Expand Up @@ -101,9 +102,14 @@ func (m *Manager) UpdateMetastore(ctx context.Context, dataobjPath string, flush
for m.backoff.Ongoing() {
err = m.bucket.GetAndReplace(ctx, metastorePath, func(existing io.Reader) (io.Reader, error) {
m.buf.Reset()
_, err := io.Copy(m.buf, existing)
if err != nil {
return nil, err
if existing != nil {
level.Debug(m.logger).Log("msg", "found existing metastore, updating", "path", metastorePath)
_, err := io.Copy(m.buf, existing)
if err != nil {
return nil, errors.Wrap(err, "copying to local buffer")
}
} else {
level.Debug(m.logger).Log("msg", "no existing metastore found, creating new one", "path", metastorePath)
}

m.metastoreBuilder.Reset()
Expand All @@ -112,36 +118,37 @@ func (m *Manager) UpdateMetastore(ctx context.Context, dataobjPath string, flush
replayDuration := prometheus.NewTimer(m.metrics.metastoreReplayTime)
object := dataobj.FromReaderAt(bytes.NewReader(m.buf.Bytes()), int64(m.buf.Len()))
if err := m.readFromExisting(ctx, object); err != nil {
return nil, err
return nil, errors.Wrap(err, "reading existing metastore version")
}
replayDuration.ObserveDuration()
}

encodingDuration := prometheus.NewTimer(m.metrics.metastoreEncodingTime)

ls := fmt.Sprintf("{__start__=\"%d\", __end__=\"%d\", __path__=\"%s\"}", minTimestamp.UnixNano(), maxTimestamp.UnixNano(), dataobjPath)
err = m.metastoreBuilder.Append(logproto.Stream{
err := m.metastoreBuilder.Append(logproto.Stream{
Labels: ls,
Entries: []logproto.Entry{{Line: ""}},
})
if err != nil {
return nil, err
return nil, errors.Wrap(err, "appending internal metadata stream")
}

m.buf.Reset()
_, err = m.metastoreBuilder.Flush(m.buf)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "flushing metastore builder")
}
encodingDuration.ObserveDuration()
return m.buf, nil
})
if err == nil {
level.Info(m.logger).Log("msg", "successfully merged & updated metastore", "metastore", metastorePath)
m.metrics.incMetastoreWrites(statusSuccess)
break
}
level.Error(m.logger).Log("msg", "failed to get and replace metastore object", "err", err, "metastore", metastorePath)
m.metrics.incMetastoreWriteFailures()
m.metrics.incMetastoreWrites(statusFailure)
m.backoff.Wait()
}
// Reset at the end too so we don't leave our memory hanging around between calls.
Expand All @@ -155,7 +162,7 @@ func (m *Manager) readFromExisting(ctx context.Context, object *dataobj.Object)
// Fetch sections
si, err := object.Metadata(ctx)
if err != nil {
return err
return errors.Wrap(err, "resolving object metadata")
}

// Read streams from existing metastore object and write them to the builder for the new object
Expand All @@ -164,15 +171,15 @@ func (m *Manager) readFromExisting(ctx context.Context, object *dataobj.Object)
streamsReader := dataobj.NewStreamsReader(object, i)
for n, err := streamsReader.Read(ctx, streams); n > 0; n, err = streamsReader.Read(ctx, streams) {
if err != nil && err != io.EOF {
return err
return errors.Wrap(err, "reading streams")
}
for _, stream := range streams[:n] {
err = m.metastoreBuilder.Append(logproto.Stream{
Labels: stream.Labels.String(),
Entries: []logproto.Entry{{Line: ""}},
})
if err != nil {
return err
return errors.Wrap(err, "appending streams")
}
}
}
Expand Down
19 changes: 13 additions & 6 deletions pkg/dataobj/metastore/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

type status string

const (
statusSuccess status = "success"
statusFailure status = "failure"
)

type metastoreMetrics struct {
metastoreProcessingTime prometheus.Histogram
metastoreReplayTime prometheus.Histogram
metastoreEncodingTime prometheus.Histogram
metastoreWriteFailures prometheus.Counter
metastoreWriteFailures *prometheus.CounterVec
}

func newMetastoreMetrics() *metastoreMetrics {
Expand Down Expand Up @@ -39,10 +46,10 @@ func newMetastoreMetrics() *metastoreMetrics {
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
metastoreWriteFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: "loki_dataobj_consumer_metastore_write_failures_total",
metastoreWriteFailures: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "loki_dataobj_consumer_metastore_writes_total",
Help: "Total number of metastore write failures",
}),
}, []string{"status"}),
}

return metrics
Expand Down Expand Up @@ -79,8 +86,8 @@ func (p *metastoreMetrics) unregister(reg prometheus.Registerer) {
}
}

func (p *metastoreMetrics) incMetastoreWriteFailures() {
p.metastoreWriteFailures.Inc()
func (p *metastoreMetrics) incMetastoreWrites(status status) {
p.metastoreWriteFailures.WithLabelValues(string(status)).Inc()
}

func (p *metastoreMetrics) observeMetastoreReplay(recordTimestamp time.Time) {
Expand Down
4 changes: 0 additions & 4 deletions vendor/github.com/thanos-io/objstore/inmem.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions vendor/github.com/thanos-io/objstore/objstore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 13 additions & 17 deletions vendor/github.com/thanos-io/objstore/providers/gcs/gcs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 20 additions & 11 deletions vendor/github.com/thanos-io/objstore/providers/s3/s3.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1633,7 +1633,7 @@ github.com/stretchr/testify/assert/yaml
github.com/stretchr/testify/mock
github.com/stretchr/testify/require
github.com/stretchr/testify/suite
# github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a => github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6
# github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a => github.com/grafana/objstore v0.0.0-20250210100727-533688b5600d
## explicit; go 1.22
github.com/thanos-io/objstore
github.com/thanos-io/objstore/clientutil
Expand Down Expand Up @@ -2566,4 +2566,4 @@ sigs.k8s.io/yaml/goyaml.v2
# github.com/grafana/regexp => github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
# github.com/grafana/loki/pkg/push => ./pkg/push
# github.com/influxdata/go-syslog/v3 => github.com/leodido/go-syslog/v4 v4.2.0
# github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250203161329-90e33e9afde6
# github.com/thanos-io/objstore => github.com/grafana/objstore v0.0.0-20250210100727-533688b5600d

0 comments on commit d9892b4

Please sign in to comment.