Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Fix Integration test (#204)
Browse files Browse the repository at this point in the history
* Fix Integration test

Make sure the free disk space min is set to a very low value (100 bytes)
so that controller can actually find eligible stores.

* Enable tests

* Disable queue depth test
* Parallelize tests

* Reduce the number of messages in Smart Retry test

* Reduce the prefetch size
  • Loading branch information
aravindvs authored May 12, 2017
1 parent 2d4edaf commit e9bb700
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 14 deletions.
34 changes: 28 additions & 6 deletions test/integration/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"testing"
"time"

log "github.com/Sirupsen/logrus"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/cherami-server/clients/metadata"
"github.com/uber/cherami-server/common"
"github.com/uber/cherami-server/common/configure"
Expand All @@ -40,11 +44,8 @@ import (
"github.com/uber/cherami-server/services/outputhost"
"github.com/uber/cherami-server/services/storehost"
"github.com/uber/cherami-server/test"

log "github.com/Sirupsen/logrus"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
thriftM "github.com/uber/cherami-thrift/.generated/go/metadata"
"github.com/uber/tchannel-go/thrift"
)

type (
Expand Down Expand Up @@ -129,9 +130,30 @@ func (tb *testBase) SetupSuite(t *testing.T) {

singletonTestBase.setupSuiteImpl(t)
singletonTestBase.SetUp(map[string]int{}, 1)
singletonTestBase.setupServiceConfig(t)
*tb = singletonTestBase
}

func (tb *testBase) setupServiceConfig(t *testing.T) {
cItem := &thriftM.ServiceConfigItem{
ServiceName: common.StringPtr("cherami-storehost"),
ServiceVersion: common.StringPtr("*"),
Sku: common.StringPtr("*"),
Hostname: common.StringPtr("*"),
ConfigKey: common.StringPtr("minfreediskspacebytes"),
ConfigValue: common.StringPtr("100"), // set to a very low value - 100 bytes for the test
}

req := &thriftM.UpdateServiceConfigRequest{ConfigItem: cItem}

ctx, cancel := thrift.NewContext(15 * time.Second)
defer cancel()

err := tb.mClient.UpdateServiceConfig(ctx, req)
tb.NoError(err)

}

func (tb *testBase) setupSuiteImpl(t *testing.T) {
tb.SetT(t)
tb.keyspace = "integration_test"
Expand Down Expand Up @@ -283,7 +305,7 @@ func (tb *testBase) SetUp(clusterSz map[string]int, numReplicas int) {
tb.Controllers[hostID] = ch
}

tappedServices := []string{common.InputServiceName, common.OutputServiceName, common.StoreServiceName}
tappedServices := []string{common.InputServiceName, common.OutputServiceName, common.StoreServiceName, common.FrontendServiceName}
for _, s := range tappedServices {
var ch *controllerhost.Mcp
for _, ch = range tb.Controllers {
Expand Down
34 changes: 30 additions & 4 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ type NetIntegrationSuiteParallelD struct {
type NetIntegrationSuiteParallelE struct {
testBase
}
type NetIntegrationSuiteParallelF struct {
testBase
}
type NetIntegrationSuiteParallelG struct {
testBase
}

type NetIntegrationSuiteSerial struct {
testBase
Expand Down Expand Up @@ -101,6 +107,18 @@ func TestNetIntegrationSuiteParallelE(t *testing.T) {
t.Parallel()
suite.Run(t, s)
}
func TestNetIntegrationSuiteParallelF(t *testing.T) {
s := new(NetIntegrationSuiteParallelF)
s.testBase.SetupSuite(t)
t.Parallel()
suite.Run(t, s)
}
func TestNetIntegrationSuiteParallelG(t *testing.T) {
s := new(NetIntegrationSuiteParallelG)
s.testBase.SetupSuite(t)
t.Parallel()
suite.Run(t, s)
}

// Disabled, since it is apparently impossible to get this test to run without racing with the parallel tests
func XXXTestNetIntegrationSuiteSerial(t *testing.T) {
Expand Down Expand Up @@ -152,6 +170,7 @@ func (s *NetIntegrationSuiteParallelC) TestMsgCacheLimit() {
ipaddr, port, _ := net.SplitHostPort(s.GetFrontend().GetTChannel().PeerInfo().HostPort)
portNum, _ := strconv.Atoi(port)
cheramiClient := createCheramiClient("cherami-test-limit", ipaddr, portNum, nil)
defer cheramiClient.Close()

// Create the destination to publish message
crReq := cherami.NewCreateDestinationRequest()
Expand Down Expand Up @@ -306,6 +325,7 @@ func (s *NetIntegrationSuiteParallelE) TestWriteEndToEndSuccessWithCassandra() {
ipaddr, port, _ := net.SplitHostPort(s.GetFrontend().GetTChannel().PeerInfo().HostPort)
portNum, _ := strconv.Atoi(port)
cheramiClient, _ := client.NewClient("cherami-test", ipaddr, portNum, nil)
defer cheramiClient.Close()

// Create the destination to publish message
crReq := cherami.NewCreateDestinationRequest()
Expand Down Expand Up @@ -436,7 +456,7 @@ ReadLoop:
s.Nil(err, "Failed to delete destination")
}

func (s *NetIntegrationSuiteParallelE) _TestWriteWithDrain() { // Disabled pending fix for flakiness
func (s *NetIntegrationSuiteParallelE) TestWriteWithDrain() { // Disabled pending fix for flakiness
destPath := "/dest/testWriteDrain"
cgPath := "/cg/testWriteDrain"
testMsgCount := 1000
Expand Down Expand Up @@ -625,6 +645,7 @@ func (s *NetIntegrationSuiteSerial) TestWriteEndToEndMultipleStore() {
ipaddr, port, _ := net.SplitHostPort(s.GetFrontend().GetTChannel().PeerInfo().HostPort)
portNum, _ := strconv.Atoi(port)
cheramiClient := createCheramiClient("cherami-test-multiple", ipaddr, portNum, nil)
defer cheramiClient.Close()

// Create the destination to publish message
crReq := cherami.NewCreateDestinationRequest()
Expand Down Expand Up @@ -793,6 +814,7 @@ func (s *NetIntegrationSuiteParallelB) _TestTimerQueue() {
ipaddr, port, _ := net.SplitHostPort(s.GetFrontend().GetTChannel().PeerInfo().HostPort)
portNum, _ := strconv.Atoi(port)
cheramiClient := createCheramiClient("cherami-test-timer", ipaddr, portNum, nil)
defer cheramiClient.Close()

// Create the destination to publish message
crReq := cherami.NewCreateDestinationRequest()
Expand Down Expand Up @@ -1066,6 +1088,7 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() {
ipaddr, port, _ := net.SplitHostPort(s.GetFrontend().GetTChannel().PeerInfo().HostPort)
portNum, _ := strconv.Atoi(port)
cheramiClient := createCheramiClient("cherami-test-dlq", ipaddr, portNum, nil)
defer cheramiClient.Close()

// Create the destination to publish message
crReq := cherami.NewCreateDestinationRequest()
Expand Down Expand Up @@ -1597,6 +1620,7 @@ func (s *NetIntegrationSuiteParallelD) TestSmartRetryDisableDuringDLQMerge() {
ipaddr, port, _ := net.SplitHostPort(s.GetFrontend().GetTChannel().PeerInfo().HostPort)
portNum, _ := strconv.Atoi(port)
cheramiClient := createCheramiClient("cherami-test-smartretry-dlq", ipaddr, portNum, nil)
defer cheramiClient.Close()

// Create the destination to publish message
crReq := cherami.NewCreateDestinationRequest()
Expand Down Expand Up @@ -1688,6 +1712,7 @@ func (s *NetIntegrationSuiteParallelD) TestSmartRetryDisableDuringDLQMerge() {
delivery := make(chan client.Delivery, 1)
delivery, err = consumerTest.Open(delivery)
s.NoError(err)
defer consumerTest.Close()

beforeMergeDLQDeliveryCount := -1

Expand Down Expand Up @@ -1781,17 +1806,18 @@ readLoop:
}
}

func (s *NetIntegrationSuiteParallelA) _TestSmartRetry() {
func (s *NetIntegrationSuiteParallelF) TestSmartRetry() {
destPath := "/test.runner.SmartRetry/TestSmartRetry"
cgPath := "/test.runner.SmartRetry/TestSmartRetryCG"
testMsgCount := 1000
testMsgCount := 100
var ackedMsgID int

log := common.GetDefaultLogger()
// Create the client
ipaddr, port, _ := net.SplitHostPort(s.GetFrontend().GetTChannel().PeerInfo().HostPort)
portNum, _ := strconv.Atoi(port)
cheramiClient := createCheramiClient("cherami-test-smartretry", ipaddr, portNum, nil)
defer cheramiClient.Close()

// Create the destination to publish message
crReq := cherami.NewCreateDestinationRequest()
Expand Down Expand Up @@ -2038,7 +2064,7 @@ ReadLoop2_TheReloopening:

}

func (s *NetIntegrationSuiteParallelE) _TestStartFromWithCassandra() {
func (s *NetIntegrationSuiteParallelE) TestStartFromWithCassandra() {
destPath := "/dest/TestStartFromWithCassandra"
cgPathEverything := "/cg/TestStartFromWithCassandraEverything"
cgPathStartFrom := "/cg/TestStartFromWithCassandra"
Expand Down
6 changes: 3 additions & 3 deletions test/integration/kafka_liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// If running on Mac and java 1.7 for ZooKeeper/Kafka, run following command before starting Kafka:
// echo "127.0.0.1 $HOSTNAME" | sudo tee -a /etc/hosts

func (s *NetIntegrationSuiteParallelE) TestKafkaLivenessBySarama() {
func (s *NetIntegrationSuiteParallelG) TestKafkaLivenessBySarama() {
msgValue := "testing message " + uuid.New()

producer, partition, err := s.produceKafkaMessage(msgValue)
Expand Down Expand Up @@ -82,7 +82,7 @@ FOR:
s.Assert().True(receivedMessage)
}

func (s *NetIntegrationSuiteParallelE) TestKafkaLivenessBySaramaCluster() {
func (s *NetIntegrationSuiteParallelG) TestKafkaLivenessBySaramaCluster() {
msgValue := "testing message " + uuid.New()

producer, partition, err := s.produceKafkaMessage(msgValue)
Expand Down Expand Up @@ -139,7 +139,7 @@ FOR:
s.Assert().True(receivedMessage)
}

func (s *NetIntegrationSuiteParallelE) produceKafkaMessage(msgValue string) (producer sarama.SyncProducer, partition int32, err error) {
func (s *NetIntegrationSuiteParallelG) produceKafkaMessage(msgValue string) (producer sarama.SyncProducer, partition int32, err error) {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Return.Successes = true
Expand Down
2 changes: 1 addition & 1 deletion test/integration/kfc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (t *kafkaMsg) String() string {
t.seq, t.topic, t.key, len(t.val), t.part, t.offs, t.count)
}

func (s *NetIntegrationSuiteParallelE) TestKafkaForCherami() {
func (s *NetIntegrationSuiteParallelG) TestKafkaForCherami() {

destPath, cgPath := "/kafka_test_dest/kfc", "/kafka_test_cg/kfc"

Expand Down

0 comments on commit e9bb700

Please sign in to comment.