Skip to content

Commit

Permalink
#176181251 simplify grpc and worker startup for testing (go-delve#79)
Browse files Browse the repository at this point in the history
* simplify grpc and worker startup for testing

* do not add a tracer if the rpc host is an empty string

* restored direct use of pgx

* wrong event type used in db setup
  • Loading branch information
Neil Clifton authored Jan 5, 2021
1 parent c55bb76 commit 8b1245c
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 288 deletions.
2 changes: 2 additions & 0 deletions backend/webhook/integration_test/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type TestFixtures struct {

func New() *TestFixtures {

log.Println("setup fixtures")
var env FixturesEnv
if err := envconfig.Process("INTEGRATION_TEST_FIXTURE", &env); err != nil {
log.Fatal("failed to read env vars:", err)
Expand All @@ -57,6 +58,7 @@ func (tf *TestFixtures) SetupRedis() {
}

func (tf *TestFixtures) Teardown() {
log.Println("teardown fixtures")
for i := len(tf.teardowns) - 1; i >= 0; i-- {
tf.teardowns[i]()
}
Expand Down
38 changes: 20 additions & 18 deletions backend/webhook/integration_test/mms_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ package test

import (
"encoding/json"
"fmt"
"log"
"testing"
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -36,6 +35,8 @@ type ExpectedRequestBody struct {
}

func Test_PublishMMSStatusUpdate(t *testing.T) {
log.Println("test PublishMMSStatusUpdate")

setup := setupForPublishMMSStatusUpdate(t)
defer setup.teardown(t)
client := setup.getClient(t)
Expand Down Expand Up @@ -65,27 +66,29 @@ func Test_PublishMMSStatusUpdate(t *testing.T) {
StatusUpdatedAt: timestampNow,
},
want: func(reply *webhookpb.NoReply, params *webhookpb.PublishMMSStatusUpdateParams) {
assert.Equal(t, len(setup.httpRequests), 1, fmt.Sprintf("expected 1 request sent to %s", setup.httpServer.URL))
req := setup.httpRequests[0]
assert.Equal(t, req.Method, "POST", "request method")
assert.Equal(t, req.Header.Get("Content-Type"), "application/json", "request has expected Content-Type")
expectedBody, err := json.Marshal(
ExpectedRequestBody{service.EventMMSStatus,
ExpectedData{"42", "xxy", "123", "35426378914", "46354078643", "done", "test is done",
ExpectedTimestamp{timestampNow.Nanos, timestampNow.Seconds}}})
if err != nil {
t.Fatalf("error: %+v", err)

waitForRequest(setup, t)
if len(setup.httpRequests) > 0 {
req := setup.httpRequests[0]
assert.Equal(t, req.Method, "POST", "request method")
assert.Equal(t, req.Header.Get("Content-Type"), "application/json", "request has expected Content-Type")
expectedBody, err := json.Marshal(
ExpectedRequestBody{service.EventMMSStatus,
ExpectedData{"42", "xxy", "123", "35426378914", "46354078643", "done", "test is done",
ExpectedTimestamp{timestampNow.Nanos, timestampNow.Seconds}}})
if err != nil {
t.Fatalf("error: %+v", err)
}
assert.JSONEq(t, string(expectedBody), setup.httpRequestBodies[0], "request body json")
}
assert.JSONEq(t, string(expectedBody), setup.httpRequestBodies[0], "request body json")

},
wantErr: wantErr{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
time.Sleep(5 * time.Second)
got, err := client.PublishMMSStatusUpdate(setup.ctx, tt.params)
time.Sleep(5 * time.Second)
if tt.wantErr.status != nil && err != nil {
errStatus, ok := status.FromError(err)
assert.Equal(t, ok, tt.wantErr.ok, "grpc ok")
Expand All @@ -100,12 +103,11 @@ func Test_PublishMMSStatusUpdate(t *testing.T) {
}

func setupForPublishMMSStatusUpdate(t *testing.T) *testDeps {

setup := setupForTest(t, tfx)
setup := newSetup(t, tfx, listener)
setup.startHttpServer(t)
setup.startWorker(t)
setup.adb.HaveInDatabase("webhook",
"id, account_id, event, name, url, rate_limit, created_at, updated_at",
[]interface{}{32767, "42", service.EventMMSStatus, "name1", setup.httpServer.URL, 2, "2020-01-12 22:41:42", "2020-01-12 22:41:42"})
setup.startWorker(t)
return setup
}
16 changes: 16 additions & 0 deletions backend/webhook/integration_test/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd $DIR &> /dev/null

export WEBHOOK_RABBIT_EXCHANGE=webhook
export WEBHOOK_RABBIT_EXCHANGE_TYPE=direct
export WEBHOOK_MIGRATION_ROOT="file://../migration/sql"
export INTEGRATION_TEST_FIXTURE_POSTGRES_USER=gnomock
export INTEGRATION_TEST_FIXTURE_POSTGRES_USER_PASSWORD=gnomick
export INTEGRATION_TEST_FIXTURE_RABBITMQ_USER=gnomock
export INTEGRATION_TEST_FIXTURE_RABBITMQ_USER_PASSWORD=gnomick

go test -timeout 30s -tags integration -run ^Test_.*$

popd &> /dev/null
106 changes: 54 additions & 52 deletions backend/webhook/integration_test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"time"

"github.com/kelseyhightower/envconfig"
opentracing "github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"

Expand All @@ -38,7 +38,6 @@ type testDeps struct {
listener *bufconn.Listener
connectionToRPC *grpc.ClientConn
adb *assertdb.AssertDb
appClose func() error
httpServer *httptest.Server
httpRequests []*http.Request
httpRequestBodies []string
Expand All @@ -57,61 +56,49 @@ func getWebhookEnv() *WebhookEnv {
}
return &env
}

func (setup *testDeps) teardown(t *testing.T) {
if setup.adb != nil {
setup.adb.Teardown()
}
setup.connectionToRPC.Close()
if setup.appClose != nil {
err := setup.appClose()
if err != nil {
t.Fatalf("failed to close application: %+v", err)
}
}
if setup.httpServer != nil {
setup.httpServer.Close()
func startGrpcServer(tfx *fixtures.TestFixtures) *bufconn.Listener {
env := getWebhookEnv()
app := service.New()
app.Env = &service.WebhookEnv{
RPCHost: "",
RPCPort: "",
RabbitURL: tfx.Rabbit.ConnStr,
PostgresURL: tfx.Postgres.ConnStr,
RabbitExchange: env.RabbitExchange,
RabbitExchangeType: env.RabbitExchangeType,
}
listener := bufconn.Listen(1024 * 1024)
app.Listener = listener

// we need to use a rabbitmq connection that will not do a os.Exit() when we stop the service
app.IgnoreClosedQueueConnection = true
go app.Run()

return listener
}

func setupForTest(t *testing.T, tfx *fixtures.TestFixtures) *testDeps {
func newSetup(t *testing.T, tfx *fixtures.TestFixtures, listener *bufconn.Listener) *testDeps {

setup := &testDeps{
ctx: context.Background(),
tfx: tfx,
env: getWebhookEnv(),
ctx: context.Background(),
tfx: tfx,
env: getWebhookEnv(),
listener: listener,
}

app := service.New()
app.SetEnv(&service.WebhookEnv{
RPCHost: "webhook service under test",
RPCPort: "N/A",
RabbitURL: tfx.Rabbit.ConnStr,
PostgresURL: tfx.Postgres.ConnStr,
RabbitExchange: setup.env.RabbitExchange,
RabbitExchangeType: setup.env.RabbitExchangeType,
})
app.SetListener(setup.getBufListener())
app.SetTracer(setup.getNoopTracer())
// we need to use a rabbitmq connection that will not fatal the test when we stop the service
app.IgnoreClosedQueueConnection()
go app.Run()
setup.appClose = app.Close

setup.adb = assertdb.New(t, setup.tfx.Postgres.ConnStr)

return setup
}

func (setup *testDeps) getNoopTracer() opentracing.Tracer {
return opentracing.GlobalTracer()
}
func (setup *testDeps) teardown(t *testing.T) {
if setup.adb != nil {
setup.adb.Teardown()
}
if setup.httpServer != nil {
setup.httpServer.Close()
}

func (setup *testDeps) getBufListener() *bufconn.Listener {
bufferSize := 1024 * 1024
setup.listener = bufconn.Listen(bufferSize)
return setup.listener
}

func (setup *testDeps) getClient(t *testing.T) webhookpb.ServiceClient {
Expand All @@ -133,21 +120,22 @@ func getBufDialer(listener *bufconn.Listener) func(context.Context, string) (net
}

func (setup *testDeps) startWorker(t *testing.T) {
// use go routine to start the webhook worker
wkr := worker.New()
wkr.SetEnv(&worker.WebhookEnv{
wkr.Env = &worker.WebhookEnv{
RPCPort: 0,
RPCHost: "n/a",
RPCHost: "",
RabbitURL: setup.tfx.Rabbit.ConnStr,
ClientTimeout: 3,
WorkerQueueName: "webhook.post",
RedisURL: setup.tfx.Redis.Address,
NRName: "",
NRLicense: "",
NRTracing: false,
})
wkr.IgnoreClosedQueueConnection()
}
wkr.IgnoreClosedQueueConnection = true
// use go routine to run the webhook worker
go wkr.Run()
time.Sleep(100 * time.Millisecond) // wait a bit for the worker to become ready

}

Expand All @@ -156,11 +144,25 @@ func (setup *testDeps) startHttpServer(t *testing.T) {
setup.httpRequestBodies = make([]string, 0)
setup.httpServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
setup.httpRequests = append(setup.httpRequests, r)
body,err := ioutil.ReadAll(r.Body)
if err != nil{
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("failed to read request body, %+v", err)
}
setup.httpRequestBodies = append(setup.httpRequestBodies,string(body))
fmt.Fprintln(w, "Hello, client")
setup.httpRequestBodies = append(setup.httpRequestBodies, string(body))
fmt.Fprintln(w, "thank you")
}))
}


func waitForRequest(setup *testDeps, t *testing.T) {
var cnt = 0
log.Println("waiting for http request")
for len(setup.httpRequests) == 0 {
if cnt > 500 {
assert.Fail(t, "timed out waiting for request")
}
time.Sleep(time.Millisecond)
cnt++
}
log.Printf("received http request after %d milliseconds", cnt)
}
26 changes: 20 additions & 6 deletions backend/webhook/integration_test/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package test

import (
"log"
"os"
"reflect"

Expand All @@ -12,6 +13,7 @@ import (

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"

"github.com/burstsms/mtmo-tp/backend/webhook/rpc/webhookpb"

Expand All @@ -20,23 +22,26 @@ import (
)

var tfx *fixtures.TestFixtures
var listener *bufconn.Listener

func TestMain(m *testing.M) {
tfx = fixtures.New()
tfx.SetupPostgres("webhook", getWebhookEnv().MigrationRoot)
tfx.SetupRabbit()
tfx.SetupRedis()
listener = startGrpcServer(tfx)
code := m.Run()
defer os.Exit(code)
defer tfx.Teardown()
}

func setupForInsert(t *testing.T, tfx *fixtures.TestFixtures) *testDeps {
return setupForTest(t, tfx)
func setupForInsert(t *testing.T) *testDeps {
return newSetup(t, tfx, listener)
}

func Test_Insert(t *testing.T) {
setup := setupForInsert(t, tfx)
setup := setupForInsert(t)
log.Println("test Insert")
defer setup.teardown(t)
client := setup.getClient(t)

Expand Down Expand Up @@ -97,17 +102,22 @@ func Test_Insert(t *testing.T) {
}

func setupForFind(t *testing.T) *testDeps {
setup := setupForTest(t, tfx)
setup := newSetup(t, tfx, listener)

setup.adb.HaveInDatabase("webhook",
"id, account_id, event, name, url, rate_limit, created_at, updated_at",
[]interface{}{32767, "42", "event1", "name1", "url1", 2, "2021-01-12 22:41:42", "2021-01-13 22:25:25"})

setup.adb.HaveInDatabase("webhook",
"id, account_id, event, name, url, rate_limit, created_at, updated_at",
[]interface{}{32768, "42", "event", "name", "url", 1, "2021-01-12 22:42:42", "2021-01-13 22:24:24"})

return setup
}

func Test_Find(t *testing.T) {
log.Println("test Find")

setup := setupForFind(t)
defer setup.teardown(t)
client := setup.getClient(t)
Expand Down Expand Up @@ -175,14 +185,16 @@ func Test_Find(t *testing.T) {
}

func setupForUpdate(t *testing.T) *testDeps {
setup := setupForTest(t, tfx)
setup := newSetup(t, tfx, listener)
setup.adb.HaveInDatabase("webhook",
"id, account_id, event, name, url, rate_limit, created_at, updated_at",
[]interface{}{32767, "42", "event1", "name1", "url1", 2, "2020-01-12 22:41:42", "2020-01-12 22:41:42"})
return setup
}

func Test_Update(t *testing.T) {
log.Println("test Update")

setup := setupForUpdate(t)
defer setup.teardown(t)
client := setup.getClient(t)
Expand Down Expand Up @@ -288,14 +300,16 @@ func Test_Update(t *testing.T) {
}

func setupForDelete(t *testing.T) *testDeps {
setup := setupForTest(t, tfx)
setup := newSetup(t, tfx, listener)
setup.adb.HaveInDatabase("webhook",
"id, account_id, event, name, url, rate_limit, created_at, updated_at",
[]interface{}{32767, "42", "event1", "name1", "url1", 2, "2021-01-12 22:41:42", "2021-01-13 22:25:25"})
return setup
}

func Test_Delete(t *testing.T) {
log.Println("test Delete")

setup := setupForDelete(t)
defer setup.teardown(t)
client := setup.getClient(t)
Expand Down
1 change: 0 additions & 1 deletion backend/webhook/rpc/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func main() {
})

sqlDB, err := pgxpool.Connect(context.Background(), env.PostgresURL)

if err != nil {
log.Fatalf("failed to init postgres: %s\n with error: %s", env.PostgresURL, err)
}
Expand Down
Loading

0 comments on commit 8b1245c

Please sign in to comment.