Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Added mutex to workflow job submission mock struc to make it thread s…
Browse files Browse the repository at this point in the history
…afe. Removed the delay.
  • Loading branch information
lynxbat committed Mar 6, 2016
1 parent 6cedaaa commit 9198f97
Showing 1 changed file with 32 additions and 23 deletions.
55 changes: 32 additions & 23 deletions scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"os"
"path"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -127,18 +128,24 @@ func TestCollectPublishWorkflow(t *testing.T) {

// The mocks below are here for testing work submission
type Mock1 struct {
sync.Mutex
count int
errorIndex int
delay time.Duration
queue map[string]int
qstring string
}

func (m *Mock1) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) {
return nil, nil
}

func (m *Mock1) Work(j job) queuedJob {
m.Lock()
defer m.Unlock()
time.Sleep(m.delay)
m.queue[j.TypeString()]++
m.qstring += "," + j.Name()
return m
}

Expand All @@ -147,7 +154,8 @@ func (m *Mock1) Promise() promise.Promise {
}

func (m *Mock1) Await() []error {
time.Sleep(m.delay)
m.Lock()
defer m.Unlock()
m.count++
if m.count == m.errorIndex {
return []error{errors.New("I am an error")}
Expand Down Expand Up @@ -178,15 +186,15 @@ func (m *Mock1) AndThenUntil(_ time.Duration, _ func([]error)) {
}

func TestWorkJobs(t *testing.T) {
// log.SetLevel(log.DebugLevel)
Convey("Test speed and concurrency of TestWorkJobs\n", t, func() {
Convey("submit multiple jobs\n", func() {
m := &Mock1{queue: make(map[string]int)}
m.delay = time.Millisecond * 100
pj := newCollectorJob(nil, time.Second*1, m, nil, "")
m1 := &Mock1{queue: make(map[string]int)}
pj := newCollectorJob(nil, time.Second*1, m1, nil, "")
prs := make([]*processNode, 0)
pus := make([]*publishNode, 0)
counter := 0
t := &task{manager: m, id: "1", name: "mock"}
t := &task{manager: m1, id: "1", name: "mock"}
for x := 0; x < 3; x++ {
n := cdata.NewNode()
pr := &processNode{config: n, name: fmt.Sprintf("prjob%d", counter)}
Expand All @@ -196,18 +204,17 @@ func TestWorkJobs(t *testing.T) {
pus = append(pus, pu)
}
workJobs(prs, pus, t, pj)
So(m.queue["processor"], ShouldEqual, 3)
So(m.queue["publisher"], ShouldEqual, 3)
So(t.failedRuns, ShouldEqual, 0)
So(m1.queue["processor"], ShouldEqual, 3)
So(m1.queue["publisher"], ShouldEqual, 3)
})
Convey("submit multiple jobs with nesting", func() {
m := &Mock1{queue: make(map[string]int)}
m.delay = time.Millisecond * 100
pj := newCollectorJob(nil, time.Second*1, m, nil, "")
m2 := &Mock1{queue: make(map[string]int)}
pj := newCollectorJob(nil, time.Second*1, m2, nil, "")
prs := make([]*processNode, 0)
pus := make([]*publishNode, 0)
counter := 0
t := &task{manager: m, id: "1", name: "mock"}
t := &task{manager: m2, id: "1", name: "mock"}
// 3 proc + 3 pub
for x := 0; x < 3; x++ {
n := cdata.NewNode()
Expand All @@ -233,22 +240,23 @@ func TestWorkJobs(t *testing.T) {
pr.PublishNodes = cpus
}
workJobs(prs, pus, t, pj)
So(t.failedRuns, ShouldEqual, 0)
// (3*3)+3
So(m.queue["processor"], ShouldEqual, 12)
So(m2.queue["processor"], ShouldEqual, 12)
// (3*3)
So(m.queue["publisher"], ShouldEqual, 12)
So(t.failedRuns, ShouldEqual, 0)
So(m2.queue["publisher"], ShouldEqual, 12)

})
Convey("submit multiple jobs where one has an error", func() {
m := &Mock1{queue: make(map[string]int)}
m3 := &Mock1{queue: make(map[string]int)}
// make the 13th job fail
m.errorIndex = 13
m.delay = time.Millisecond * 100
pj := newCollectorJob(nil, time.Second*1, m, nil, "")
m3.errorIndex = 13
m3.delay = time.Millisecond * 100
pj := newCollectorJob(nil, time.Second*1, m3, nil, "")
prs := make([]*processNode, 0)
pus := make([]*publishNode, 0)
counter := 0
t := &task{manager: m, id: "1", name: "mock"}
t := &task{manager: m3, id: "1", name: "mock"}
// 3 proc + 3 pub
for x := 0; x < 3; x++ {
n := cdata.NewNode()
Expand All @@ -274,12 +282,13 @@ func TestWorkJobs(t *testing.T) {
pr.PublishNodes = cpus
}
workJobs(prs, pus, t, pj)
// (3*3)+3
So(m.queue["processor"], ShouldEqual, 12)
// (3*3)
So(m.queue["publisher"], ShouldEqual, 12)
So(t.failedRuns, ShouldEqual, 1)
So(t.lastFailureMessage, ShouldEqual, "I am an error")
// (3*3)+3
So(m3.queue["processor"], ShouldEqual, 12)
// (3*3)
So(m3.queue["publisher"], ShouldEqual, 12)

})

})
Expand Down

0 comments on commit 9198f97

Please sign in to comment.