Skip to content

Commit

Permalink
feedback: Add end-to-end write and read tests for multiple organizations
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartcarnie committed Feb 20, 2019
1 parent 9328eb7 commit 7f8e2c6
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 5 deletions.
5 changes: 5 additions & 0 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,3 +626,8 @@ func (m *Launcher) TaskStore() taskbackend.Store {
func (m *Launcher) TaskScheduler() taskbackend.Scheduler {
return m.scheduler
}

// KeyValueService returns the internal key-value service.
func (m *Launcher) KeyValueService() *kv.Service {
return m.kvService
}
35 changes: 30 additions & 5 deletions cmd/influxd/launcher/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,23 +231,36 @@ func (l *Launcher) ShutdownOrFail(tb testing.TB, ctx context.Context) {

// SetupOrFail creates a new user, bucket, org, and auth token. Fail on error.
func (l *Launcher) SetupOrFail(tb testing.TB) {
svc := &http.SetupService{Addr: l.URL()}
results, err := svc.Generate(ctx, &platform.OnboardingRequest{
results := l.OnBoardOrFail(tb, &platform.OnboardingRequest{
User: "USER",
Password: "PASSWORD",
Org: "ORG",
Bucket: "BUCKET",
})
if err != nil {
tb.Fatal(err)
}

l.User = results.User
l.Org = results.Org
l.Bucket = results.Bucket
l.Auth = results.Auth
}

// OnBoardOrFail attempts an on-boarding request or fails on error.
// The on-boarding status is also reset to allow multiple user/org/buckets to be created.
func (l *Launcher) OnBoardOrFail(tb testing.TB, req *platform.OnboardingRequest) *platform.OnboardingResults {
tb.Helper()
res, err := l.KeyValueService().Generate(context.Background(), req)
if err != nil {
tb.Fatal(err)
}

err = l.KeyValueService().PutOnboardingStatus(context.Background(), false)
if err != nil {
tb.Fatal(err)
}

return res
}

func (l *Launcher) FluxService() *http.FluxService {
return &http.FluxService{Addr: l.URL(), Token: l.Auth.Token}
}
Expand All @@ -274,3 +287,15 @@ func (l *Launcher) MustNewHTTPRequest(method, rawurl, body string) *nethttp.Requ
req.Header.Set("Authorization", "Token "+l.Auth.Token)
return req
}

// MustNewHTTPRequest returns a new nethttp.Request with base URL and auth attached. Fail on error.
func (l *Launcher) NewHTTPRequestOrFail(tb testing.TB, method, rawurl, token string, body string) *nethttp.Request {
tb.Helper()
req, err := nethttp.NewRequest(method, l.URL()+rawurl, strings.NewReader(body))
if err != nil {
tb.Fatal(err)
}

req.Header.Set("Authorization", "Token "+token)
return req
}
89 changes: 89 additions & 0 deletions cmd/influxd/launcher/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package launcher_test

import (
"bytes"
"fmt"
"io/ioutil"
nethttp "net/http"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/http"
)

func TestStorage_WriteAndQuery(t *testing.T) {
l := RunLauncherOrFail(t, ctx)

org1 := l.OnBoardOrFail(t, &influxdb.OnboardingRequest{
User: "USER-1",
Password: "PASSWORD-1",
Org: "ORG-01",
Bucket: "BUCKET",
})
org2 := l.OnBoardOrFail(t, &influxdb.OnboardingRequest{
User: "USER-2",
Password: "PASSWORD-1",
Org: "ORG-02",
Bucket: "BUCKET",
})

defer l.ShutdownOrFail(t, ctx)

// Execute single write against the server.
l.WriteOrFail(t, org1, `m,k=v1 f=100i 946684800000000000`)
l.WriteOrFail(t, org2, `m,k=v2 f=200i 946684800000000000`)

qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`

exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
`,result,table,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,100,f,m,v1` + "\r\n\r\n"
if got := l.FluxQueryOrFail(t, org1.Org, org1.Auth.Token, qs); !cmp.Equal(got, exp) {
t.Errorf("unexpected query results -got/+exp\n%s", cmp.Diff(got, exp))
}

exp = `,result,table,_start,_stop,_time,_value,_field,_measurement,k` + "\r\n" +
`,result,table,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,200,f,m,v2` + "\r\n\r\n"
if got := l.FluxQueryOrFail(t, org2.Org, org2.Auth.Token, qs); !cmp.Equal(got, exp) {
t.Errorf("unexpected query results -got/+exp\n%s", cmp.Diff(got, exp))
}
}

// WriteOrFail attempts a write to the organization and bucket identified by to or fails if there is an error.
func (l *Launcher) WriteOrFail(tb testing.TB, to *influxdb.OnboardingResults, data string) {
tb.Helper()
resp, err := nethttp.DefaultClient.Do(l.NewHTTPRequestOrFail(tb, "POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", to.Org.ID, to.Bucket.ID), to.Auth.Token, data))
if err != nil {
tb.Fatal(err)
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
tb.Fatal(err)
}

if err := resp.Body.Close(); err != nil {
tb.Fatal(err)
}

if resp.StatusCode != nethttp.StatusNoContent {
tb.Fatalf("unexpected status code: %d, body: %s, headers: %v", resp.StatusCode, body, resp.Header)
}
}

// FluxQueryOrFail performs a query to the specified organization and returns the results
// or fails if there is an error.
func (l *Launcher) FluxQueryOrFail(tb testing.TB, org *influxdb.Organization, token string, query string) string {
tb.Helper()
var buf bytes.Buffer

fs := &http.FluxService{Addr: l.URL(), Token: token}

req := (http.QueryRequest{Query: query, Org: org}).WithDefaults()
if preq, err := req.ProxyRequest(); err != nil {
tb.Fatal(err)
} else if _, err := fs.Query(ctx, &buf, preq); err != nil {
tb.Fatal(err)
}
return buf.String()
}

0 comments on commit 7f8e2c6

Please sign in to comment.