Skip to content

Commit

Permalink
Fix deadlock in disconnecting querier (#5063)
Browse files Browse the repository at this point in the history
* Fix deadlock in disconnecting querier

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update pkg/querier/worker/worker_test.go

Co-authored-by: Trevor Whitney <trevorjwhitney@gmail.com>

* Update pkg/querier/worker/worker_test.go

Co-authored-by: Trevor Whitney <trevorjwhitney@gmail.com>

* Uses in-memory GRPC server for tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Trevor Whitney <trevorjwhitney@gmail.com>
  • Loading branch information
cyriltovena and trevorwhitney authored Jan 10, 2022
1 parent a7795e5 commit 6ef934b
Show file tree
Hide file tree
Showing 12 changed files with 1,647 additions and 35 deletions.
30 changes: 30 additions & 0 deletions pkg/lokifrontend/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package transport

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
)

func TestWriteError(t *testing.T) {
for _, test := range []struct {
status int
err error
}{
{http.StatusInternalServerError, errors.New("unknown")},
{http.StatusGatewayTimeout, context.DeadlineExceeded},
{StatusClientClosedRequest, context.Canceled},
{http.StatusBadRequest, httpgrpc.Errorf(http.StatusBadRequest, "")},
} {
t.Run(test.err.Error(), func(t *testing.T) {
w := httptest.NewRecorder()
writeError(w, test.err)
require.Equal(t, test.status, w.Result().StatusCode)
})
}
}
13 changes: 1 addition & 12 deletions pkg/lokifrontend/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ import (
lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc"
)

var (
errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")
)
var errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests")

// Config for a Frontend.
type Config struct {
Expand Down Expand Up @@ -198,14 +196,6 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
f.requestQueue.RegisterQuerierConnection(querierID)
defer f.requestQueue.UnregisterQuerierConnection(querierID)

// If the downstream request(from querier -> frontend) is cancelled,
// we need to ping the condition variable to unblock getNextRequestForQuerier.
// Ideally we'd have ctx aware condition variables...
go func() {
<-server.Context().Done()
f.requestQueue.QuerierDisconnecting()
}()

lastUserIndex := queue.FirstUser()

for {
Expand Down Expand Up @@ -302,7 +292,6 @@ func getQuerierID(server frontendv1pb.Frontend_ProcessServer) (string, error) {
Url: "/invalid_request_sent_by_frontend",
},
})

if err != nil {
return "", err
}
Expand Down
300 changes: 300 additions & 0 deletions pkg/lokifrontend/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
package v1

import (
"context"
"fmt"
"io/ioutil"
"net"
"net/http"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/pkg/lokifrontend/frontend/transport"
"github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb"
querier_worker "github.com/grafana/loki/pkg/querier/worker"
"github.com/grafana/loki/pkg/scheduler/queue"
)

const (
query = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&step=120"
responseBody = `{"status":"success","data":{"resultType":"Matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}`
)

func TestFrontend(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("Hello World"))
require.NoError(t, err)
})
test := func(addr string, _ *Frontend) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil)
require.NoError(t, err)
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req)
require.NoError(t, err)

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)

body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

assert.Equal(t, "Hello World", string(body))
}

testFrontend(t, defaultFrontendConfig(), handler, test, false, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, true, nil)
}

func TestFrontendPropagateTrace(t *testing.T) {
closer, err := config.Configuration{}.InitGlobalTracer("test")
require.NoError(t, err)
defer closer.Close()

observedTraceID := make(chan string, 2)

handler := middleware.Tracer{}.Wrap(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sp := opentracing.SpanFromContext(r.Context())
defer sp.Finish()

traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID())
observedTraceID <- traceID

_, err = w.Write([]byte(responseBody))
require.NoError(t, err)
}))

test := func(addr string, _ *Frontend) {
sp, ctx := opentracing.StartSpanFromContext(context.Background(), "client")
defer sp.Finish()
traceID := fmt.Sprintf("%v", sp.Context().(jaeger.SpanContext).TraceID())

req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", addr, query), nil)
require.NoError(t, err)
req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(ctx, "1"), req)
require.NoError(t, err)

req, tr := nethttp.TraceRequest(opentracing.GlobalTracer(), req)
defer tr.Finish()

client := http.Client{
Transport: &nethttp.Transport{},
}
resp, err := client.Do(req)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)

defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
require.NoError(t, err)

// Query should do one call.
assert.Equal(t, traceID, <-observedTraceID)
}
testFrontend(t, defaultFrontendConfig(), handler, test, false, nil)
testFrontend(t, defaultFrontendConfig(), handler, test, true, nil)
}

func TestFrontendCheckReady(t *testing.T) {
for _, tt := range []struct {
name string
connectedClients int
msg string
readyForRequests bool
}{
{"connected clients are ready", 3, "", true},
{"no url, no clients is not ready", 0, "not ready: number of queriers connected to query-frontend is 0", false},
} {
t.Run(tt.name, func(t *testing.T) {
f := &Frontend{
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
),
}
for i := 0; i < tt.connectedClients; i++ {
f.requestQueue.RegisterQuerierConnection("test")
}
err := f.CheckReady(context.Background())
errMsg := ""

if err != nil {
errMsg = err.Error()
}

require.Equal(t, tt.msg, errMsg)
})
}
}

// TestFrontendCancel ensures that when client requests are cancelled,
// the underlying query is correctly cancelled _and not retried_.
func TestFrontendCancel(t *testing.T) {
var tries atomic.Int32
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-r.Context().Done()
tries.Inc()
})
test := func(addr string, _ *Frontend) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil)
require.NoError(t, err)
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
req = req.WithContext(ctx)

go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()

_, err = http.DefaultClient.Do(req)
require.Error(t, err)

time.Sleep(100 * time.Millisecond)
assert.Equal(t, int32(1), tries.Load())
}
testFrontend(t, defaultFrontendConfig(), handler, test, false, nil)
tries.Store(0)
testFrontend(t, defaultFrontendConfig(), handler, test, true, nil)
}

func TestFrontendMetricsCleanup(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("Hello World"))
require.NoError(t, err)
})

for _, matchMaxConcurrency := range []bool{false, true} {
reg := prometheus.NewPedanticRegistry()

test := func(addr string, fr *Frontend) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil)
require.NoError(t, err)
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req)
require.NoError(t, err)

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

assert.Equal(t, "Hello World", string(body))

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_frontend_queue_length Number of queries in the queue.
# TYPE cortex_query_frontend_queue_length gauge
cortex_query_frontend_queue_length{user="1"} 0
`), "cortex_query_frontend_queue_length"))

fr.cleanupInactiveUserMetrics("1")

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_frontend_queue_length Number of queries in the queue.
# TYPE cortex_query_frontend_queue_length gauge
`), "cortex_query_frontend_queue_length"))
}

testFrontend(t, defaultFrontendConfig(), handler, test, matchMaxConcurrency, reg)
}
}

func testFrontend(t *testing.T, config Config, handler http.Handler, test func(addr string, frontend *Frontend), matchMaxConcurrency bool, reg prometheus.Registerer) {
logger := log.NewNopLogger()

var workerConfig querier_worker.Config
flagext.DefaultValues(&workerConfig)
workerConfig.Parallelism = 1
workerConfig.MatchMaxConcurrency = matchMaxConcurrency
workerConfig.MaxConcurrentRequests = 1

// localhost:0 prevents firewall warnings on Mac OS X.
grpcListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
workerConfig.FrontendAddress = grpcListen.Addr().String()

httpListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

v1, err := New(config, limits{}, logger, reg)
require.NoError(t, err)
require.NotNil(t, v1)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1))
defer func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), v1))
}()

grpcServer := grpc.NewServer(
grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(opentracing.GlobalTracer())),
)
defer grpcServer.GracefulStop()

frontendv1pb.RegisterFrontendServer(grpcServer, v1)

// Default HTTP handler config.
handlerCfg := transport.HandlerConfig{}
flagext.DefaultValues(&handlerCfg)

rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1)
r := mux.NewRouter()
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(transport.NewHandler(handlerCfg, rt, logger, nil)))

httpServer := http.Server{
Handler: r,
}
defer httpServer.Shutdown(context.Background()) //nolint:errcheck

go httpServer.Serve(httpListen) //nolint:errcheck
go grpcServer.Serve(grpcListen) //nolint:errcheck

var worker services.Service
worker, err = querier_worker.NewQuerierWorker(workerConfig, nil, httpgrpc_server.NewServer(handler), logger, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))

test(httpListen.Addr().String(), v1)

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), worker))
}

func defaultFrontendConfig() Config {
config := Config{}
flagext.DefaultValues(&config)
return config
}

type limits struct {
queriers int
}

func (l limits) MaxQueriersPerUser(_ string) int {
return l.queriers
}
Loading

0 comments on commit 6ef934b

Please sign in to comment.