Skip to content

Commit

Permalink
rule: native histogram support (#6390)
Browse files Browse the repository at this point in the history
* Added native histogram support for ruler

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Formatted imports

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Fixed imports

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Formated imports

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Fixed native histogram tests

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

Fixed receiver type

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Fix for rebase

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Added docs for query endpoints differences

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Fixed comments and naming

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* made HTTPConfig optional

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* made HTTPConfig optional

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Reverted and added check

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Fixes from comments

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* renamed queryconfig to clientconfig

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* common prepareEndpointSet

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* fixed lint

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Fixed sidecar

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

* Fixed tests

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>

---------

Signed-off-by: Sebastian Rabenhorst <sebastian.rabenhorst@shopify.com>
  • Loading branch information
rabenhorst authored Dec 20, 2023
1 parent 480c8d9 commit a900cb5
Show file tree
Hide file tree
Showing 26 changed files with 840 additions and 410 deletions.
147 changes: 91 additions & 56 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,56 +502,29 @@ func runQuery(
}

var (
endpoints = query.NewEndpointSet(
time.Now,
endpoints = prepareEndpointSet(
g,
logger,
reg,
func() (specs []*query.GRPCEndpointSpec) {
// Add strict & static nodes.
for _, addr := range strictStores {
specs = append(specs, query.NewGRPCEndpointSpec(addr, true))
}

for _, addr := range strictEndpoints {
specs = append(specs, query.NewGRPCEndpointSpec(addr, true))
}

for _, dnsProvider := range []*dns.Provider{
dnsStoreProvider,
dnsRuleProvider,
dnsExemplarProvider,
dnsMetadataProvider,
dnsTargetProvider,
dnsEndpointProvider,
} {
var tmpSpecs []*query.GRPCEndpointSpec

for _, addr := range dnsProvider.Addresses() {
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false))
}
tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs)
specs = append(specs, tmpSpecs...)
}

for _, eg := range endpointGroupAddrs {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

for _, eg := range strictEndpointGroups {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

return specs
[]*dns.Provider{
dnsStoreProvider,
dnsRuleProvider,
dnsExemplarProvider,
dnsMetadataProvider,
dnsTargetProvider,
dnsEndpointProvider,
},
duplicatedStores,
strictStores,
strictEndpoints,
endpointGroupAddrs,
strictEndpointGroups,
dialOpts,
unhealthyStoreTimeout,
endpointInfoTimeout,
queryConnMetricLabels...,
)

proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy), options...)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
Expand All @@ -566,20 +539,6 @@ func runQuery(
)
)

// Periodically update the store set with the addresses we see in our cluster.
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
endpoints.Update(ctx)
return nil
})
}, func(error) {
cancel()
endpoints.Close()
})
}

// Run File Service Discovery and update the store set when the files are modified.
if fileSD != nil {
var fileSDUpdates chan []*targetgroup.Group
Expand Down Expand Up @@ -861,6 +820,82 @@ func removeDuplicateEndpointSpecs(logger log.Logger, duplicatedStores prometheus
return deduplicated
}

func prepareEndpointSet(
g *run.Group,
logger log.Logger,
reg *prometheus.Registry,
dnsProviders []*dns.Provider,
duplicatedStores prometheus.Counter,
strictStores []string,
strictEndpoints []string,
endpointGroupAddrs []string,
strictEndpointGroups []string,
dialOpts []grpc.DialOption,
unhealthyStoreTimeout time.Duration,
endpointInfoTimeout time.Duration,
queryConnMetricLabels ...string,
) *query.EndpointSet {
endpointSet := query.NewEndpointSet(
time.Now,
logger,
reg,
func() (specs []*query.GRPCEndpointSpec) {
// Add strict & static nodes.
for _, addr := range strictStores {
specs = append(specs, query.NewGRPCEndpointSpec(addr, true))
}

for _, addr := range strictEndpoints {
specs = append(specs, query.NewGRPCEndpointSpec(addr, true))
}

for _, dnsProvider := range dnsProviders {
var tmpSpecs []*query.GRPCEndpointSpec

for _, addr := range dnsProvider.Addresses() {
tmpSpecs = append(tmpSpecs, query.NewGRPCEndpointSpec(addr, false))
}
tmpSpecs = removeDuplicateEndpointSpecs(logger, duplicatedStores, tmpSpecs)
specs = append(specs, tmpSpecs...)
}

for _, eg := range endpointGroupAddrs {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, false, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

for _, eg := range strictEndpointGroups {
addr := fmt.Sprintf("dns:///%s", eg)
spec := query.NewGRPCEndpointSpec(addr, true, extgrpc.EndpointGroupGRPCOpts()...)
specs = append(specs, spec)
}

return specs
},
dialOpts,
unhealthyStoreTimeout,
endpointInfoTimeout,
queryConnMetricLabels...,
)

// Periodically update the store set with the addresses we see in our cluster.
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
endpointSet.Update(ctx)
return nil
})
}, func(error) {
cancel()
endpointSet.Close()
})
}

return endpointSet
}

// LookbackDeltaFactory creates from 1 to 3 lookback deltas depending on
// dynamicLookbackDelta and eo.LookbackDelta and returns a function
// that returns appropriate lookback delta for given maxSourceResolutionMillis.
Expand Down
Loading

0 comments on commit a900cb5

Please sign in to comment.