-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
flush boltdb to object store #1837
flush boltdb to object store #1837
Conversation
08985e8
to
ae64e5f
Compare
Have you considered naming the "archiver" as "shipper", keeping the same naming of Thanos / Cortex blocks storage? |
pkg/loki/modules.go
Outdated
@@ -277,6 +292,17 @@ func (t *Loki) stopTableManager() error { | |||
} | |||
|
|||
func (t *Loki) initStore() (err error) { | |||
if ActiveIndexType(t.cfg.SchemaConfig) == "boltdb" && t.cfg.StorageConfig.BoltDBArchiverConfig.Enable { | |||
t.cfg.StorageConfig.BoltDBArchiverConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID | |||
if t.cfg.Target == Ingester { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this going to work in a single binary mode ? I think it would overrides one of them no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Active boltdb files are kept in a separate directory anyways. This is just an optimization to avoid downloading files unnecessarily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should use a switch
here and include "all"
target just for clarity of what is going on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is we don't know really which one is picked for all
probably none so default to ArchiverModeReadWrite
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it would be ArchiverModeReadWrite
pkg/loki/modules.go
Outdated
@@ -473,3 +499,16 @@ var modules = map[moduleName]module{ | |||
deps: []moduleName{Querier, Ingester, Distributor, TableManager}, | |||
}, | |||
} | |||
|
|||
// ActiveIndexType type returns index type which would be applicable to metrics that would be pushed starting now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// ActiveIndexType type returns index type which would be applicable to metrics that would be pushed starting now | |
// ActiveIndexType type returns index type which would be applicable to logs that would be pushed starting now |
pkg/loki/modules.go
Outdated
|
||
// ActiveIndexType type returns index type which would be applicable to metrics that would be pushed starting now | ||
// Note: Another periodic config can be applicable in future which can change index type | ||
func ActiveIndexType(cfg chunk.SchemaConfig) string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know you want to add tests later, and you should write a test for that function, doesn't look like this would change and it seems easy to write.
pkg/storage/store.go
Outdated
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` | ||
storage.Config `yaml:",inline"` | ||
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` | ||
BoltDBArchiverConfig local.ArchiverConfig `yaml:"bolt_db_archiver_config"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BoltDBArchiverConfig local.ArchiverConfig `yaml:"bolt_db_archiver_config"` | |
BoltDBArchiverConfig local.ArchiverConfig `yaml:"boltdb_archiver_config"` |
pkg/storage/store.go
Outdated
@@ -4,6 +4,10 @@ import ( | |||
"context" | |||
"flag" | |||
|
|||
"github.com/grafana/loki/pkg/storage/stores" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something is wrong with the formatting of those 2 new packages. They should be with everything loki related.
If you're using vs-code I think there is a way to configure it so that it does it automatically. https://gist.github.com/cyriltovena/d52bf9ae05c371ea0c8018d48d15d6bf might help you.
pkg/loki/modules.go
Outdated
// We want ingester to also query the store when using boltdb | ||
if ActiveIndexType(t.cfg.SchemaConfig) == "boltdb" { | ||
t.cfg.Ingester.QueryStore = true | ||
// When using archiver, limit max look back for query to MaxChunkAge + upload interval by archiver + 15 mins to query only data whose index is not pushed yet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the why we need a look back parameter ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In microservices mode it would query for data which ingester itself has flushed while in single binary mode it would query all the data for whole query duration. This would mean there would be duplicates. I just set lookback period just enough to not miss any data.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are 2 use cases here around a filesystem and we should figure out what we want to support:
- A shared filesystem (like SAN or NFS)
- A non-shared filesystem (each instance running on it's own machine for example)
In the second use case each ingester would always be responsible for querying all of the data, but in the first case this would be treated just like s3 or gcs... i'm almost wondering if we should just define them as different store types to be able to use in the logic here for setting max QueryStoreMaxLookBack?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other concern is when we are using a shared object store, how much duplicate data are we going to get from each ingester, and is there a way to limit them to only using the index files they generate when querying the store?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there are 2 things here wrt storage, chunks and index files. I would say it is safe to assume chunks would always be in a shared filesystem otherwise queriers would not be able to fetch them. For index, it should not matter to differentiate because we anyways keep syncing the files and we just need to query ingesters for live data that they have not uploaded or just uploaded, which we already are doing here. I might be wrong here but does it make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to support the case where chunks/index are not on a shared filesystem, I think we can do this easy enough if the ingester doesn't force this limited lookback but instead can do a lookback of all the data, WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is doable but I think it would be better to have a config instead of inferring whether the storage is shared or not. We can expose store look back config in ingesters which would default to getting set like how we are doing now.
pkg/storage/store.go
Outdated
return err | ||
} | ||
|
||
storage.RegisterIndexClient("boltdb", func() (client chunk.IndexClient, e error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove those named returns as they are not used.
pkg/storage/stores/local/archiver.go
Outdated
// ArchiverModeWriteOnly is to allow only write operations | ||
ArchiverModeWriteOnly | ||
|
||
ArchiverFileUploadInterval = 15 * time.Minute |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have a comment for public const.
pkg/storage/stores/local/archiver.go
Outdated
f.BoolVar(&cfg.Enable, "boltdb.archiver.enable", false, "Enable archival of boltdb files to a store") | ||
f.StringVar(&cfg.StoreConfig.Store, "boltdb.archiver.store", "filesystem", "Store for keeping boltdb files") | ||
f.StringVar(&cfg.CacheLocation, "boltdb.archiver.cache-location", "", "Cache location for restoring boltDB files for queries") | ||
f.DurationVar(&cfg.CacheTTL, "boltdb.archiver.cache-ttl", 24*time.Hour, "TTL for boltDB files restored in cache for queries") | ||
f.DurationVar(&cfg.ResyncInterval, "boltdb.archiver.resync-interval", 5*time.Minute, "Resync downloaded files with the store") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your only documentation is those flags and I think they need a bit more details. Unless you plan to add a full page of documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I will have to add full page documentation about running it because we can't put too much in help text of flags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a good call !
pkg/ingester/ingester.go
Outdated
return err | ||
} | ||
|
||
err = sendBatches(queryServer.Context(), itr, queryServer, req.Limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some concerns here.
1 - You're sending data directly and stats won't reflect that change, you need some refactoring to support sending with stats.
2 - Even if you do support stats, I'm not sure you can send the data from the store and the ingester via GRPC sequentially. I could be wrong but I think you're losing ordering by not considering and deduping line prior sending them. In over words there could be overlap of data.
I'm not 100% but you'll need to build tests for this. if you keep it that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was under the impression that whatever duplicates would be there querier would dedup it. I will investigate more. I feel here order of entries is more important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Querier dedupe per batches:
// NewQueryClientIterator returns an iterator over a QueryClient.
func NewQueryClientIterator(client logproto.Querier_QueryClient, direction logproto.Direction) EntryIterator {
return &queryClientIterator{
client: client,
direction: direction,
}
}
func (i *queryClientIterator) Next() bool {
for i.curr == nil || !i.curr.Next() {
batch, err := i.client.Recv()
if err == io.EOF {
return false
} else if err != nil {
i.err = err
return false
}
i.curr = NewQueryResponseIterator(i.client.Context(), batch, i.direction)
}
return true
}
Imagine the second batch contains half of the data of the first batch. You'll end up not deduping this data. You can use that iterator in a test to prove it's fine.
} | ||
|
||
archiver, err := NewArchiver(archiverCfg, archiveStoreClient, func(name string, operation int) (db *bbolt.DB, e error) { | ||
return boltDBIndexClient.GetDB(name, operation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good candidate for an interface. I think it make more sense then a function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I am not sure why we need an interface here? We would only ever have 1 implementation here and it looks ugly to pass boltDBIndexClient
to Archiver
which itself embeds the Archiver
. Maybe I am wrong but it would be good to hear your thoughts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type BoltDBGetter interface {
GetLocal(name string, operation int) (*bbolt.DB,error)
}
It will easier for you to work with. I don't have strong opinions on that one. I just like to use interface for decoupling. This is just a nit.
pkg/storage/stores/local/archiver.go
Outdated
|
||
// uploadFile uploads one of the files locally written by ingesters to archive. | ||
func (a *Archiver) uploadFile(ctx context.Context, period string) error { | ||
if a.cfg.Mode == ArchiverModeReadWrite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a lot going in this file, I think you can break it into two file at least, one for uploading, one for downloading.
pkg/storage/stores/local/archiver.go
Outdated
return err | ||
} | ||
|
||
db, err := a.localBoltdbGetter(period, local.DBOperationRead) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ingester locally should also use the localdb for the store when you query the store for extra data that might not have been delivered yet to others. It looks like currently if we are a readwriter (single binary) archiver ingester will also return data from other ingester, this is not necessary and will generates a lot of duplicates.
see #1837 (comment) WDYT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't think of cleaner ways to limit ingesters from querying data which only it has generated in single binary mode. Do you have any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitively excited about the possibilities. I think you should create a cluster and run this in our environment there's a lot of questions around maintenance and upgrade path, I think you can speed things up by running your own cluster with canaries.
Yes, we will run it when it gets in decent shape. Thanks for all the reviews! |
c359313
to
4e660a9
Compare
pkg/ingester/ingester.go
Outdated
return err | ||
} | ||
|
||
itr.Push(storeItr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Push advance the iterator.
I would create the heap at the endl have an array of var iters []iter.EntryIterator
at the beginning. If the array has more than one item use that item otherwise use a NewHeapIterator.
pkg/ingester/instance.go
Outdated
defer helpers.LogError("closing iterator", iter.Close) | ||
|
||
return sendBatches(ctx, iter, queryServer, req.Limit) | ||
return iter.NewHeapIterator(ctx, iters, req.Direction), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid multiple HeapIterator nesting, which can slow down execution, you could return an array of iterators here and apprends the store later if needed.
7c3ae3c
to
8d09c40
Compare
795555c
to
4ea0cb4
Compare
Hey @periklis, I really appreciate your kind words and showing interest in this. While this is not yet battle-tested, query correctness and not losing any index is utmost important to us as well. We have been running this since last 2 weeks in an internal cluster beside another cluster without boltdb shipper. We are ensuring both of them get the same logs and I am going to write a tool which would query both the clusters, compare the results and push some metrics based on the results. I will let you know when it is ready so that you can also give it a try if you want. Please feel free to reach out to me for any help/issues. We can always connect in #loki-dev grafana slack channel. |
584f3be
to
c29759f
Compare
Sorry I missed commenting on your question. See the attached image which shows files for the ongoing week. We are running 5 stateful ingesters so there is 1 boltdb file per ingester. We add a timestamp at the end of the filename to avoid someone not using k8s overwriting previous files after restarts and some other issues they can run into. When tables rotate again on Thursday we will have another folder with name If you are interested in reading the contents of the boltdb files then I think it would not be straight forward since index entries depend on schema version that you are using and you will have to import some of the code from loki repo to save some efforts. |
cmd/loki/loki-local-config.yaml
Outdated
@@ -18,11 +18,11 @@ ingester: | |||
schema_config: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's revert that file. You can add a new file next time and add it to your global git gitignore.
pkg/ingester/ingester.go
Outdated
} | ||
|
||
if start.Before(end) { | ||
storeRequest := recreateRequestWithTime(req, start, end) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not clear, but my idea was there's a lot of if
here and I don't think you have a test covering each of those.
I think you should create a function like this:
// buildStoreRequest returns a store request from a ingester request, return nit if no request should be made based on configuration. The request may be truncated due to QueryStoreMaxLookBackPeriod, explain QueryStoreMaxLookBackPeriod purpose here
func buildStoreRequest(config Config,req *logproto.QueryRequest) (*logql.SelectParams,error)
This has the advantages of making it easier to test. Now you can write tests with a combination of requests and configs and set some expectations has to when a request will be fired and how/which.
Now usage wise it would look like this:
if req, err:= buildStoreRequest(req,i.cfg);err !=nil && req != nil {
storeItr, err := i.store.LazyQuery(ctx,req)
if err != nil {
return err
}
itrs = append(itrs, storeItr)
}
@@ -35,3 +38,30 @@ func TestUniqueDeps(t *testing.T) { | |||
expected := []moduleName{Server, Overrides, Distributor, Ingester} | |||
assert.Equal(t, expected, uniqueDeps(input)) | |||
} | |||
|
|||
func TestActiveIndexType(t *testing.T) { | |||
var cfg chunk.SchemaConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can transform those into table tests with name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I am not sure I get it. Can you please elaborate more so that we don't have to come back again on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh I didn't know the term. Thanks for the link!
// Note: We are assuming that user would never store chunks in table based store otherwise NewObjectClient would return an error. | ||
|
||
// ToDo: Try passing on ObjectType from Cortex to the callback for creating custom index client. | ||
boltdbShipperEncounter := 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we do need that to be in cortex. I'm fine with having this hack in the meantime. How do we know that the callback is called in the same order of the configs ?
We need some tests too.
@@ -56,6 +62,16 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf | |||
}, nil | |||
} | |||
|
|||
// NewTableClient creates a TableClient for managing tables for index/chunk store. | |||
// ToDo: Add support in Cortex for registering custom table client like index client. | |||
func NewTableClient(name string, cfg Config) (chunk.TableClient, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is supported now I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not supported yet. We have updated cortex version in vendor to v1.0.0
while my changes are added after that. I will have to update it again to latest master. I think we should do a follow-up commit for that.
pkg/storage/stores/factory.go
Outdated
) | ||
|
||
// NewObjectClient makes a new ObjectClient of the desired type. | ||
func NewObjectClient(storeType string, cfg storage.Config) (chunk.ObjectClient, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you use the factory from cortex ? https://github.com/cortexproject/cortex/blob/8f59b141ac55e8884436047cddb93498234a11e4/pkg/chunk/storage/factory.go#L280 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This too needs updating cortex to latest master. I will open a PR to update it.
pkg/storage/stores/local/uploads.go
Outdated
return err | ||
} | ||
|
||
filePath := path.Join(snapshotPath, fmt.Sprintf("%s.%d", s.uploader, time.Now().Unix())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tempfile might be a better name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
files are stored in folder per periodic table and are named after ingester flushed every 15 mins to make index available to other services files are also flushed before ingester stops to avoid any data loss new stores can be implemented easily ingester to also query store when using boltdb
…from periodic config, other refactorings
0a9aa10
to
753f57c
Compare
@sandeepsukhani thank for this work, may I know if there are any plans to support indexing for azure tables similar to dynamodb , we have azure backend and that's primarily the requirement we are looking for too. Thank you |
What this PR does / why we need it:
This can be useful for running Loki using just boltdb and any of the supported object stores.
Some details about implementation:
Checklist