diff --git a/integration/bloom_building_test.go b/integration/bloom_building_test.go index 0a96ee5702ace..46e8570c47717 100644 --- a/integration/bloom_building_test.go +++ b/integration/bloom_building_test.go @@ -170,7 +170,6 @@ func checkForTimestampMetric(t *testing.T, cliPlanner *client.Client, metricName func createBloomStore(t *testing.T, sharedPath string) *bloomshipper.BloomStore { logger := log.NewNopLogger() - //logger := log.NewLogfmtLogger(os.Stdout) schemaCfg := config.SchemaConfig{ Configs: []config.PeriodConfig{ diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 629ac6d645cf4..4830eb482cc79 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -1,9 +1,9 @@ package builder import ( - "bytes" "context" "fmt" + "os" "sync" "time" @@ -368,7 +368,7 @@ func (b *Builder) processTask( seriesItrWithCounter, b.chunkLoader, blocksIter, - b.rwFn, + b.writerReaderFunc, nil, // TODO(salvacorts): Pass reporter or remove when we address tracking b.bloomStore.BloomMetrics(), logger, @@ -384,6 +384,9 @@ func (b *Builder) processTask( built, err := bloomshipper.BlockFrom(tenant, task.Table.Addr(), blk) if err != nil { level.Error(logger).Log("msg", "failed to build block", "err", err) + if err = blk.Reader().Cleanup(); err != nil { + level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err) + } return nil, fmt.Errorf("failed to build block: %w", err) } @@ -394,10 +397,17 @@ func (b *Builder) processTask( built, ); err != nil { level.Error(logger).Log("msg", "failed to write block", "err", err) + if err = blk.Reader().Cleanup(); err != nil { + level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err) + } return nil, fmt.Errorf("failed to write block: %w", err) } b.metrics.blocksCreated.Inc() + if err := blk.Reader().Cleanup(); err != nil { + level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err) + } + totalGapKeyspace := gap.Bounds.Max - gap.Bounds.Min progress := built.Bounds.Max - gap.Bounds.Min pct := float64(progress) / float64(totalGapKeyspace) * 100 @@ -489,9 +499,10 @@ func (b *Builder) loadWorkForGap( return seriesItr, blocksIter, nil } -// TODO(owen-d): pool, evaluate if memory-only is the best choice -func (b *Builder) rwFn() (v1.BlockWriter, v1.BlockReader) { - indexBuf := bytes.NewBuffer(nil) - bloomsBuf := bytes.NewBuffer(nil) - return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf) +func (b *Builder) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) { + dir, err := os.MkdirTemp(b.cfg.WorkingDir, "bloom-block-") + if err != nil { + panic(err) + } + return v1.NewDirectoryBlockWriter(dir), v1.NewDirectoryBlockReader(dir) } diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index d0c553104b09e..deeeb951465ab 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -13,6 +13,7 @@ type Config struct { GrpcConfig grpcclient.Config `yaml:"grpc_config"` PlannerAddress string `yaml:"planner_address"` BackoffConfig backoff.Config `yaml:"backoff_config"` + WorkingDir string `yaml:"working_directory" doc:"hidden"` } // RegisterFlagsWithPrefix registers flags for the bloom-planner configuration. @@ -20,6 +21,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner") cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix+".backoff", f) + f.StringVar(&cfg.WorkingDir, prefix+".working-directory", "", "Working directory to which blocks are temporarily written to. Empty string defaults to the operating system's temp directory.") } func (cfg *Config) Validate() error { diff --git a/pkg/bloombuild/builder/spec.go b/pkg/bloombuild/builder/spec.go index abb6cef1447f2..82457cf92b84a 100644 --- a/pkg/bloombuild/builder/spec.go +++ b/pkg/bloombuild/builder/spec.go @@ -53,8 +53,8 @@ type SimpleBloomGenerator struct { metrics *v1.Metrics logger log.Logger - readWriterFn func() (v1.BlockWriter, v1.BlockReader) - reporter func(model.Fingerprint) + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader) + reporter func(model.Fingerprint) tokenizer *v1.BloomTokenizer } @@ -69,7 +69,7 @@ func NewSimpleBloomGenerator( store iter.Iterator[*v1.Series], chunkLoader ChunkLoader, blocksIter iter.ResetIterator[*v1.SeriesWithBlooms], - readWriterFn func() (v1.BlockWriter, v1.BlockReader), + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader), reporter func(model.Fingerprint), metrics *v1.Metrics, logger log.Logger, @@ -85,9 +85,9 @@ func NewSimpleBloomGenerator( "component", "bloom_generator", "org_id", userID, ), - readWriterFn: readWriterFn, - metrics: metrics, - reporter: reporter, + writerReaderFunc: writerReaderFunc, + metrics: metrics, + reporter: reporter, tokenizer: v1.NewBloomTokenizer( opts.Schema.NGramLen(), @@ -161,19 +161,19 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt ) } - return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter) + return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter) } // LazyBlockBuilderIterator is a lazy iterator over blocks that builds // each block by adding series to them until they are full. type LazyBlockBuilderIterator struct { - ctx context.Context - opts v1.BlockOptions - metrics *v1.Metrics - populate v1.BloomPopulatorFunc - readWriterFn func() (v1.BlockWriter, v1.BlockReader) - series iter.PeekIterator[*v1.Series] - blocks iter.ResetIterator[*v1.SeriesWithBlooms] + ctx context.Context + opts v1.BlockOptions + metrics *v1.Metrics + populate v1.BloomPopulatorFunc + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader) + series iter.PeekIterator[*v1.Series] + blocks iter.ResetIterator[*v1.SeriesWithBlooms] bytesAdded int curr *v1.Block @@ -185,18 +185,18 @@ func NewLazyBlockBuilderIterator( opts v1.BlockOptions, metrics *v1.Metrics, populate v1.BloomPopulatorFunc, - readWriterFn func() (v1.BlockWriter, v1.BlockReader), + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader), series iter.PeekIterator[*v1.Series], blocks iter.ResetIterator[*v1.SeriesWithBlooms], ) *LazyBlockBuilderIterator { return &LazyBlockBuilderIterator{ - ctx: ctx, - opts: opts, - metrics: metrics, - populate: populate, - readWriterFn: readWriterFn, - series: series, - blocks: blocks, + ctx: ctx, + opts: opts, + metrics: metrics, + populate: populate, + writerReaderFunc: writerReaderFunc, + series: series, + blocks: blocks, } } @@ -221,9 +221,10 @@ func (b *LazyBlockBuilderIterator) Next() bool { } mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics) - writer, reader := b.readWriterFn() + writer, reader := b.writerReaderFunc() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { + _ = writer.Cleanup() b.err = errors.Wrap(err, "failed to create bloom block builder") return false } @@ -231,6 +232,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { b.bytesAdded += sourceBytes if err != nil { + _ = writer.Cleanup() b.err = errors.Wrap(err, "failed to build bloom block") return false } diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index fffd67f7f2f42..b852896bfd27c 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -1,10 +1,10 @@ package bloomcompactor import ( - "bytes" "context" "fmt" "math" + "os" "sort" "sync" @@ -49,11 +49,12 @@ func NewSimpleBloomController( } } -// TODO(owen-d): pool, evaluate if memory-only is the best choice -func (s *SimpleBloomController) rwFn() (v1.BlockWriter, v1.BlockReader) { - indexBuf := bytes.NewBuffer(nil) - bloomsBuf := bytes.NewBuffer(nil) - return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf) +func (s *SimpleBloomController) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) { + dir, err := os.MkdirTemp("", "bloom-block-") + if err != nil { + panic(err) + } + return v1.NewDirectoryBlockWriter(dir), v1.NewDirectoryBlockReader(dir) } /* @@ -409,7 +410,7 @@ func (s *SimpleBloomController) buildGaps( seriesItrWithCounter, s.chunkLoader, blocksIter, - s.rwFn, + s.writerReaderFunc, reporter, s.metrics, logger, diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 61cd8f1d06a44..696f192970b68 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -53,8 +53,8 @@ type SimpleBloomGenerator struct { metrics *Metrics logger log.Logger - readWriterFn func() (v1.BlockWriter, v1.BlockReader) - reporter func(model.Fingerprint) + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader) + reporter func(model.Fingerprint) tokenizer *v1.BloomTokenizer } @@ -69,7 +69,7 @@ func NewSimpleBloomGenerator( store iter.Iterator[*v1.Series], chunkLoader ChunkLoader, blocksIter iter.ResetIterator[*v1.SeriesWithBlooms], - readWriterFn func() (v1.BlockWriter, v1.BlockReader), + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader), reporter func(model.Fingerprint), metrics *Metrics, logger log.Logger, @@ -85,9 +85,9 @@ func NewSimpleBloomGenerator( "component", "bloom_generator", "org_id", userID, ), - readWriterFn: readWriterFn, - metrics: metrics, - reporter: reporter, + writerReaderFunc: writerReaderFunc, + metrics: metrics, + reporter: reporter, tokenizer: v1.NewBloomTokenizer( opts.Schema.NGramLen(), @@ -161,19 +161,19 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt ) } - return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter) + return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter) } // LazyBlockBuilderIterator is a lazy iterator over blocks that builds // each block by adding series to them until they are full. type LazyBlockBuilderIterator struct { - ctx context.Context - opts v1.BlockOptions - metrics *Metrics - populate v1.BloomPopulatorFunc - readWriterFn func() (v1.BlockWriter, v1.BlockReader) - series iter.PeekIterator[*v1.Series] - blocks iter.ResetIterator[*v1.SeriesWithBlooms] + ctx context.Context + opts v1.BlockOptions + metrics *Metrics + populate v1.BloomPopulatorFunc + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader) + series iter.PeekIterator[*v1.Series] + blocks iter.ResetIterator[*v1.SeriesWithBlooms] bytesAdded int curr *v1.Block @@ -185,18 +185,18 @@ func NewLazyBlockBuilderIterator( opts v1.BlockOptions, metrics *Metrics, populate v1.BloomPopulatorFunc, - readWriterFn func() (v1.BlockWriter, v1.BlockReader), + writerReaderFunc func() (v1.BlockWriter, v1.BlockReader), series iter.PeekIterator[*v1.Series], blocks iter.ResetIterator[*v1.SeriesWithBlooms], ) *LazyBlockBuilderIterator { return &LazyBlockBuilderIterator{ - ctx: ctx, - opts: opts, - metrics: metrics, - populate: populate, - readWriterFn: readWriterFn, - series: series, - blocks: blocks, + ctx: ctx, + opts: opts, + metrics: metrics, + populate: populate, + writerReaderFunc: writerReaderFunc, + series: series, + blocks: blocks, } } @@ -221,7 +221,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { } mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics) - writer, reader := b.readWriterFn() + writer, reader := b.writerReaderFunc() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { b.err = errors.Wrap(err, "failed to create bloom block builder") diff --git a/pkg/storage/bloom/v1/block_writer.go b/pkg/storage/bloom/v1/block_writer.go index 70ed868235a75..a50c2f81e4b8a 100644 --- a/pkg/storage/bloom/v1/block_writer.go +++ b/pkg/storage/bloom/v1/block_writer.go @@ -8,6 +8,8 @@ import ( "github.com/pkg/errors" + "github.com/grafana/dskit/multierror" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/util" ) @@ -22,6 +24,7 @@ type BlockWriter interface { Blooms() (io.WriteCloser, error) Size() (int, error) // byte size of accumualted index & blooms Full(maxSize uint64) (full bool, size int, err error) + Cleanup() error } // in memory impl @@ -39,6 +42,7 @@ func NewMemoryBlockWriter(index, blooms *bytes.Buffer) MemoryBlockWriter { func (b MemoryBlockWriter) Index() (io.WriteCloser, error) { return NewNoopCloser(b.index), nil } + func (b MemoryBlockWriter) Blooms() (io.WriteCloser, error) { return NewNoopCloser(b.blooms), nil } @@ -60,6 +64,12 @@ func (b MemoryBlockWriter) Full(maxSize uint64) (full bool, size int, err error) return uint64(size) >= maxSize, size, nil } +func (b MemoryBlockWriter) Cleanup() error { + b.index.Reset() + b.blooms.Reset() + return nil +} + // Directory based impl type DirectoryBlockWriter struct { dir string @@ -139,3 +149,12 @@ func (b *DirectoryBlockWriter) Full(maxSize uint64) (full bool, size int, err er return uint64(size) >= maxSize, size, nil } + +func (b *DirectoryBlockWriter) Cleanup() error { + b.initialized = false + err := multierror.New() + err.Add(os.Remove(b.index.Name())) + err.Add(os.Remove(b.blooms.Name())) + err.Add(os.RemoveAll(b.dir)) + return err.Err() +} diff --git a/pkg/storage/bloom/v1/reader.go b/pkg/storage/bloom/v1/reader.go index d402ee1fd9717..d589aa19c4927 100644 --- a/pkg/storage/bloom/v1/reader.go +++ b/pkg/storage/bloom/v1/reader.go @@ -8,6 +8,8 @@ import ( "github.com/pkg/errors" + "github.com/grafana/dskit/multierror" + iter "github.com/grafana/loki/v3/pkg/iter/v2" ) @@ -15,6 +17,7 @@ type BlockReader interface { Index() (io.ReadSeeker, error) Blooms() (io.ReadSeeker, error) TarEntries() (iter.Iterator[TarEntry], error) + Cleanup() error } // In memory reader @@ -61,6 +64,12 @@ func (r *ByteReader) TarEntries() (iter.Iterator[TarEntry], error) { return iter.NewSliceIter[TarEntry](entries), err } +func (r *ByteReader) Cleanup() error { + r.index.Reset() + r.blooms.Reset() + return nil +} + // File reader type DirectoryBlockReader struct { dir string @@ -113,17 +122,28 @@ func (r *DirectoryBlockReader) Blooms() (io.ReadSeeker, error) { } func (r *DirectoryBlockReader) TarEntries() (iter.Iterator[TarEntry], error) { + var err error if !r.initialized { - if err := r.Init(); err != nil { + if err = r.Init(); err != nil { return nil, err } } + _, err = r.index.Seek(0, io.SeekStart) + if err != nil { + return nil, errors.Wrap(err, "error seeking series file") + } + idxInfo, err := r.index.Stat() if err != nil { return nil, errors.Wrap(err, "error stat'ing series file") } + _, err = r.blooms.Seek(0, io.SeekStart) + if err != nil { + return nil, errors.Wrap(err, "error seeking bloom file") + } + bloomInfo, err := r.blooms.Stat() if err != nil { return nil, errors.Wrap(err, "error stat'ing bloom file") @@ -144,3 +164,12 @@ func (r *DirectoryBlockReader) TarEntries() (iter.Iterator[TarEntry], error) { return iter.NewSliceIter[TarEntry](entries), nil } + +func (r *DirectoryBlockReader) Cleanup() error { + r.initialized = false + err := multierror.New() + err.Add(os.Remove(r.index.Name())) + err.Add(os.Remove(r.blooms.Name())) + err.Add(os.RemoveAll(r.dir)) + return err.Err() +}