From 390c66f43fd65e8649bea6c2611381d8ca9efe3c Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Tue, 29 Sep 2015 17:16:39 -0700 Subject: [PATCH] Changes schedule.task.id from uint64 to a uuid --- cmd/pulsectl/task.go | 58 ++--- control/available_plugin.go | 12 +- control/control.go | 8 +- control/control_test.go | 12 +- control/metrics.go | 3 + control/runner.go | 4 +- core/control_event/control_event.go | 6 +- core/scheduler_event/scheduler_event.go | 14 +- core/task.go | 12 +- mgmt/rest/client/client_func_test.go | 50 ++-- mgmt/rest/client/task.go | 10 +- mgmt/rest/rbody/task.go | 24 +- mgmt/rest/rest_func_test.go | 30 ++- mgmt/rest/server.go | 12 +- mgmt/rest/task.go | 44 +--- mgmt/rest/tribe_test.go | 323 +++++++++++++++++------- mgmt/tribe/agreement/agreement.go | 3 +- mgmt/tribe/messages.go | 3 +- mgmt/tribe/tribe.go | 63 +++-- mgmt/tribe/tribe_test.go | 41 ++- mgmt/tribe/worker/worker.go | 155 +++++++----- scheduler/scheduler.go | 18 +- scheduler/scheduler_test.go | 6 +- scheduler/task.go | 39 +-- scheduler/task_test.go | 7 +- scheduler/watcher.go | 18 +- scheduler/watcher_test.go | 36 +-- 27 files changed, 589 insertions(+), 422 deletions(-) diff --git a/cmd/pulsectl/task.go b/cmd/pulsectl/task.go index e64006705..ffdfea880 100644 --- a/cmd/pulsectl/task.go +++ b/cmd/pulsectl/task.go @@ -7,7 +7,6 @@ import ( "os" "os/signal" "path/filepath" - "strconv" "strings" "syscall" "text/tabwriter" @@ -96,7 +95,7 @@ func createTaskUsingTaskManifest(ctx *cli.Context) { os.Exit(1) } fmt.Println("Task created") - fmt.Printf("ID: %d\n", r.ID) + fmt.Printf("ID: %s\n", r.ID) fmt.Printf("Name: %s\n", r.Name) fmt.Printf("State: %s\n", r.State) } @@ -211,7 +210,7 @@ func createTaskUsingWFManifest(ctx *cli.Context) { os.Exit(1) } fmt.Println("Task created") - fmt.Printf("ID: %d\n", r.ID) + fmt.Printf("ID: %s\n", r.ID) fmt.Printf("Name: %s\n", r.Name) fmt.Printf("State: %s\n", r.State) } @@ -285,19 +284,14 @@ func watchTask(ctx *cli.Context) { os.Exit(1) } - id, err := strconv.ParseUint(ctx.Args().First(), 0, 64) - if err != nil { - fmt.Printf("Incorrect usage - %v\n", err.Error()) - cli.ShowCommandHelp(ctx, ctx.Command.Name) - os.Exit(1) - } - r := pClient.WatchTask(uint(id)) + id := ctx.Args().First() + r := pClient.WatchTask(id) if r.Err != nil { fmt.Printf("Error starting task:\n%v\n", r.Err) cli.ShowCommandHelp(ctx, ctx.Command.Name) os.Exit(1) } - fmt.Printf("Watching Task (%d):\n", id) + fmt.Printf("Watching Task (%s):\n", id) // catch interrupt so we signal the server we are done before exiting c := make(chan os.Signal, 1) @@ -354,18 +348,14 @@ func startTask(ctx *cli.Context) { os.Exit(1) } - id, err := strconv.ParseUint(ctx.Args().First(), 0, 64) - if err != nil { - fmt.Printf("Incorrect usage - %v\n", err.Error()) - os.Exit(1) - } - r := pClient.StartTask(int(id)) + id := ctx.Args().First() + r := pClient.StartTask(id) if r.Err != nil { fmt.Printf("Error starting task:\n%v\n", r.Err) os.Exit(1) } fmt.Println("Task started:") - fmt.Printf("ID: %d\n", r.ID) + fmt.Printf("ID: %s\n", r.ID) } func stopTask(ctx *cli.Context) { @@ -375,18 +365,14 @@ func stopTask(ctx *cli.Context) { os.Exit(1) } - id, err := strconv.ParseUint(ctx.Args().First(), 0, 64) - if err != nil { - fmt.Printf("Incorrect usage - %v\n", err.Error()) - os.Exit(1) - } - r := pClient.StopTask(int(id)) + id := ctx.Args().First() + r := pClient.StopTask(id) if r.Err != nil { fmt.Printf("Error stopping task:\n%v\n", r.Err) os.Exit(1) } fmt.Println("Task stopped:") - fmt.Printf("ID: %d\n", r.ID) + fmt.Printf("ID: %s\n", r.ID) } func removeTask(ctx *cli.Context) { @@ -396,18 +382,14 @@ func removeTask(ctx *cli.Context) { os.Exit(1) } - id, err := strconv.ParseUint(ctx.Args().First(), 0, 64) - if err != nil { - fmt.Printf("Incorrect usage - %v\n", err.Error()) - os.Exit(1) - } - r := pClient.RemoveTask(int(id)) + id := ctx.Args().First() + r := pClient.RemoveTask(id) if r.Err != nil { fmt.Printf("Error stopping task:\n%v\n", r.Err) os.Exit(1) } fmt.Println("Task removed:") - fmt.Printf("ID: %d\n", r.ID) + fmt.Printf("ID: %s\n", r.ID) } func exportTask(ctx *cli.Context) { @@ -416,12 +398,16 @@ func exportTask(ctx *cli.Context) { cli.ShowCommandHelp(ctx, ctx.Command.Name) os.Exit(1) } - id, err := strconv.ParseUint(ctx.Args().First(), 0, 32) - if err != nil { - fmt.Printf("Incorrect usage - %v\n", err.Error()) + id := ctx.Args().First() + task := pClient.GetTask(id) + if task.Err != nil { + fmt.Printf("Error exporting task:\n%v\n", task.Err) os.Exit(1) } - task := pClient.GetTask(uint(id)) tb, err := json.Marshal(task) + if err != nil { + fmt.Printf("Error exporting task:\n%v\n", err) + os.Exit(1) + } fmt.Println(string(tb)) } diff --git a/control/available_plugin.go b/control/available_plugin.go index c4687c0df..f907f66fd 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -257,7 +257,7 @@ type apPool struct { key string // The subscriptions to this pool. - subs map[uint64]*subscription + subs map[string]*subscription // The plugins in the pool. // the primary key is an increasing --> uint from @@ -282,7 +282,7 @@ func newPool(key string, plugins ...*availablePlugin) (*apPool, error) { RWMutex: &sync.RWMutex{}, version: ver, key: key, - subs: map[uint64]*subscription{}, + subs: map[string]*subscription{}, plugins: make(map[uint32]*availablePlugin), max: maximumRunningPlugins, concurrencyCount: 1, @@ -329,7 +329,7 @@ func (p *apPool) insert(ap *availablePlugin) error { // subscribe adds a subscription to the pool. // Using subscribe is idempotent. -func (p *apPool) subscribe(taskId uint64, subType subscriptionType) { +func (p *apPool) subscribe(taskId string, subType subscriptionType) { p.Lock() defer p.Unlock() @@ -346,7 +346,7 @@ func (p *apPool) subscribe(taskId uint64, subType subscriptionType) { // subscribe adds a subscription to the pool. // Using unsubscribe is idempotent. -func (p *apPool) unsubscribe(taskId uint64) { +func (p *apPool) unsubscribe(taskId string) { p.Lock() defer p.Unlock() delete(p.subs, taskId) @@ -435,7 +435,7 @@ func (p *apPool) count() int { } // NOTE: The data returned by subscriptions should be constant and read only. -func (p *apPool) subscriptions() map[uint64]*subscription { +func (p *apPool) subscriptions() map[string]*subscription { p.RLock() defer p.RUnlock() return p.subs @@ -492,7 +492,7 @@ func (p *apPool) moveSubscriptions(to *apPool) []subscription { type subscription struct { subType subscriptionType version int - taskId uint64 + taskId string } type availablePlugins struct { diff --git a/control/control.go b/control/control.go index 09f932ae2..99467b972 100644 --- a/control/control.go +++ b/control/control.go @@ -437,7 +437,7 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []pe return plugins, nil } -func (p *pluginControl) SubscribeDeps(taskId uint64, mts []core.Metric, plugins []core.Plugin) []perror.PulseError { +func (p *pluginControl) SubscribeDeps(taskId string, mts []core.Metric, plugins []core.Plugin) []perror.PulseError { var perrs []perror.PulseError collectors, errs := p.gatherCollectors(mts) @@ -480,7 +480,7 @@ func (p *pluginControl) SubscribeDeps(taskId uint64, mts []core.Metric, plugins return perrs } -func (p *pluginControl) sendPluginSubscriptionEvent(taskId uint64, pl core.Plugin) perror.PulseError { +func (p *pluginControl) sendPluginSubscriptionEvent(taskId string, pl core.Plugin) perror.PulseError { pt, err := core.ToPluginType(pl.TypeName()) if err != nil { return perror.New(err) @@ -501,7 +501,7 @@ func (p *pluginControl) sendPluginSubscriptionEvent(taskId uint64, pl core.Plugi return nil } -func (p *pluginControl) UnsubscribeDeps(taskId uint64, mts []core.Metric, plugins []core.Plugin) []perror.PulseError { +func (p *pluginControl) UnsubscribeDeps(taskId string, mts []core.Metric, plugins []core.Plugin) []perror.PulseError { var perrs []perror.PulseError collectors, errs := p.gatherCollectors(mts) @@ -528,7 +528,7 @@ func (p *pluginControl) UnsubscribeDeps(taskId uint64, mts []core.Metric, plugin return perrs } -func (p *pluginControl) sendPluginUnsubscriptionEvent(taskId uint64, pl core.Plugin) perror.PulseError { +func (p *pluginControl) sendPluginUnsubscriptionEvent(taskId string, pl core.Plugin) perror.PulseError { pt, err := core.ToPluginType(pl.TypeName()) if err != nil { return perror.New(err) diff --git a/control/control_test.go b/control/control_test.go index 42feecb03..02979e64a 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -664,8 +664,8 @@ func TestCollectMetrics(t *testing.T) { So(lp, ShouldNotBeNil) pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:dummy1:1") So(errp, ShouldBeNil) - pool.subscribe(1, unboundSubscriptionType) - err = c.sendPluginSubscriptionEvent(1, lp) + pool.subscribe("1", unboundSubscriptionType) + err = c.sendPluginSubscriptionEvent("1", lp) So(err, ShouldBeNil) m = append(m, m1, m2) @@ -758,8 +758,8 @@ func TestPublishMetrics(t *testing.T) { } pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("publisher:file:1") So(errp, ShouldBeNil) - pool.subscribe(1, unboundSubscriptionType) - errs := c.sendPluginSubscriptionEvent(1, p) + pool.subscribe("1", unboundSubscriptionType) + errs := c.sendPluginSubscriptionEvent("1", p) So(errs, ShouldBeNil) <-lpe.done time.Sleep(1500 * time.Millisecond) @@ -815,8 +815,8 @@ func TestProcessMetrics(t *testing.T) { } pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("processor:passthru:1") So(errp, ShouldBeNil) - pool.subscribe(1, unboundSubscriptionType) - errs := c.sendPluginSubscriptionEvent(1, p) + pool.subscribe("1", unboundSubscriptionType) + errs := c.sendPluginSubscriptionEvent("1", p) So(errs, ShouldBeNil) <-lpe.done time.Sleep(1500 * time.Millisecond) diff --git a/control/metrics.go b/control/metrics.go index 2dd038b70..087415c97 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -267,6 +267,9 @@ func (mc *metricCatalog) get(ns []string, ver int) (*metricType, perror.PulseErr if err != nil { return nil, err } + if mts == nil { + return nil, perror.New(errMetricNotFound) + } // a version IS given if ver > 0 { l, err := getVersion(mts, ver) diff --git a/control/runner.go b/control/runner.go index 2e34956c1..d14287a2c 100644 --- a/control/runner.go +++ b/control/runner.go @@ -377,7 +377,7 @@ func (r *runner) runPlugin(path string) { } } -func (r *runner) handleSubscription(pType, pName string, pVersion int, taskId uint64, subType subscriptionType) { +func (r *runner) handleSubscription(pType, pName string, pVersion int, taskId string, subType subscriptionType) { pool, err := r.availablePlugins.getPool(fmt.Sprintf("%s:%s:%d", pType, pName, pVersion)) if err != nil { runnerLog.WithFields(log.Fields{ @@ -430,7 +430,7 @@ func (r *runner) handleSubscription(pType, pName string, pVersion int, taskId ui r.runPlugin(plugin.Path) } } -func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskId uint64) error { +func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskId string) error { pool, err := r.availablePlugins.getPool(fmt.Sprintf("%s:%s:%d", pType, pName, pVersion)) if err != nil { runnerLog.WithFields(log.Fields{ diff --git a/core/control_event/control_event.go b/core/control_event/control_event.go index 2ddbc54e3..eef0aa9bf 100644 --- a/core/control_event/control_event.go +++ b/core/control_event/control_event.go @@ -66,7 +66,7 @@ type PluginSubscriptionEvent struct { PluginVersion int PluginType int SubscriptionType int - TaskId uint64 + TaskId string } func (ps PluginSubscriptionEvent) Namespace() string { @@ -74,7 +74,7 @@ func (ps PluginSubscriptionEvent) Namespace() string { } type PluginUnsubscriptionEvent struct { - TaskId uint64 + TaskId string PluginName string PluginVersion int PluginType int @@ -95,7 +95,7 @@ func (hfe HealthCheckFailedEvent) Namespace() string { } type MovePluginSubscriptionEvent struct { - TaskId uint64 + TaskId string PluginName string PreviousVersion int NewVersion int diff --git a/core/scheduler_event/scheduler_event.go b/core/scheduler_event/scheduler_event.go index bbd7915a7..f58104809 100644 --- a/core/scheduler_event/scheduler_event.go +++ b/core/scheduler_event/scheduler_event.go @@ -15,7 +15,7 @@ const ( ) type TaskStartedEvent struct { - TaskID uint64 + TaskID string } func (e TaskStartedEvent) Namespace() string { @@ -23,7 +23,7 @@ func (e TaskStartedEvent) Namespace() string { } type TaskCreatedEvent struct { - TaskID uint64 + TaskID string StartOnCreate bool } @@ -32,7 +32,7 @@ func (e TaskCreatedEvent) Namespace() string { } type TaskDeletedEvent struct { - TaskID uint64 + TaskID string } func (e TaskDeletedEvent) Namespace() string { @@ -40,7 +40,7 @@ func (e TaskDeletedEvent) Namespace() string { } type TaskStoppedEvent struct { - TaskID uint64 + TaskID string } func (e TaskStoppedEvent) Namespace() string { @@ -48,7 +48,7 @@ func (e TaskStoppedEvent) Namespace() string { } type TaskDisabledEvent struct { - TaskID uint64 + TaskID string Why string } @@ -57,7 +57,7 @@ func (e TaskDisabledEvent) Namespace() string { } type MetricCollectedEvent struct { - TaskID uint64 + TaskID string Metrics []core.Metric } @@ -66,7 +66,7 @@ func (e MetricCollectedEvent) Namespace() string { } type MetricCollectionFailedEvent struct { - TaskID uint64 + TaskID string Errors []error } diff --git a/core/task.go b/core/task.go index 5c2ab8b2b..b998dbc5e 100644 --- a/core/task.go +++ b/core/task.go @@ -46,12 +46,13 @@ func (t TaskState) String() string { } type Task interface { - ID() uint64 + ID() string // Status() WorkflowState TODO, switch to string State() TaskState HitCount() uint GetName() string SetName(string) + SetID(string) MissedCount() uint FailedCount() uint LastFailureMessage() string @@ -59,6 +60,7 @@ type Task interface { CreationTime() *time.Time DeadlineDuration() time.Duration SetDeadlineDuration(time.Duration) + SetTaskID(id string) SetStopOnFailure(uint) GetStopOnFailure() uint Option(...TaskOption) TaskOption @@ -108,6 +110,14 @@ func SetTaskName(name string) TaskOption { } } +func SetTaskID(id string) TaskOption { + return func(t Task) TaskOption { + previous := t.ID() + t.SetID(id) + return SetTaskID(previous) + } +} + type TaskErrors interface { Errors() []perror.PulseError } diff --git a/mgmt/rest/client/client_func_test.go b/mgmt/rest/client/client_func_test.go index d75894aca..e7f259415 100644 --- a/mgmt/rest/client/client_func_test.go +++ b/mgmt/rest/client/client_func_test.go @@ -11,6 +11,7 @@ import ( "time" log "github.com/Sirupsen/logrus" + "github.com/pborman/uuid" "github.com/intelsdi-x/pulse/control" "github.com/intelsdi-x/pulse/mgmt/rest" @@ -29,7 +30,7 @@ var ( FILE_PLUGIN_PATH = []string{PULSE_PATH + "/plugin/pulse-publisher-file"} DIRECTORY_PATH = []string{PULSE_PATH + "/plugin/"} - NextPort = 9000 + NextPort = 45000 ) func getPort() int { @@ -415,10 +416,10 @@ func TestPulseClient(t *testing.T) { port := getPort() uri := startAPI(port) c := New(uri, "v1") - - p := c.StartTask(9999999) + uuid := uuid.New() + p := c.StartTask(uuid) So(p.Err, ShouldNotBeNil) - So(p.Err.Error(), ShouldEqual, "error 0: No task found with id '9999999' ") + So(p.Err.Error(), ShouldEqual, fmt.Sprintf("error 0: No task found with id '%s' ", uuid)) }) Convey("existing task", func() { port := getPort() @@ -438,10 +439,10 @@ func TestPulseClient(t *testing.T) { port := -1 uri := startAPI(port) c := New(uri, "v1") - - p := c.StartTask(9999999) + uuid := uuid.New() + p := c.StartTask(uuid) So(p.Err, ShouldNotBeNil) - So(p.Err.Error(), ShouldEqual, "Put http://localhost:-1/v1/tasks/9999999/start: dial tcp: unknown port tcp/-1") + So(p.Err.Error(), ShouldEqual, fmt.Sprintf("Put http://localhost:-1/v1/tasks/%s/start: dial tcp: unknown port tcp/-1", uuid)) }) }) Convey("StopTask", func() { @@ -449,10 +450,10 @@ func TestPulseClient(t *testing.T) { port := getPort() uri := startAPI(port) c := New(uri, "v1") - - p := c.StopTask(9999999) + uuid := uuid.New() + p := c.StopTask(uuid) So(p.Err, ShouldNotBeNil) - So(p.Err.Error(), ShouldEqual, "error 0: No task found with id '9999999' ") + So(p.Err.Error(), ShouldEqual, fmt.Sprintf("error 0: No task found with id '%s' ", uuid)) }) Convey("existing task", func() { port := getPort() @@ -476,10 +477,10 @@ func TestPulseClient(t *testing.T) { port := -1 uri := startAPI(port) c := New(uri, "v1") - - p := c.StopTask(9999999) + uuid := uuid.New() + p := c.StopTask(uuid) So(p.Err, ShouldNotBeNil) - So(p.Err.Error(), ShouldEqual, "Put http://localhost:-1/v1/tasks/9999999/stop: dial tcp: unknown port tcp/-1") + So(p.Err.Error(), ShouldEqual, fmt.Sprintf("Put http://localhost:-1/v1/tasks/%s/stop: dial tcp: unknown port tcp/-1", uuid)) }) }) Convey("RemoveTask", func() { @@ -487,10 +488,10 @@ func TestPulseClient(t *testing.T) { port := getPort() uri := startAPI(port) c := New(uri, "v1") - - p := c.RemoveTask(9999999) + uuid := uuid.New() + p := c.RemoveTask(uuid) So(p.Err, ShouldNotBeNil) - So(p.Err.Error(), ShouldEqual, "No task found with id '9999999'") + So(p.Err.Error(), ShouldEqual, fmt.Sprintf("No task found with id '%s'", uuid)) }) Convey("existing task", func() { port := getPort() @@ -515,8 +516,8 @@ func TestPulseClient(t *testing.T) { port := -1 uri := startAPI(port) c := New(uri, "v1") - - p := c.RemoveTask(9999999) + uuid := uuid.New() + p := c.RemoveTask(uuid) So(p.Err, ShouldNotBeNil) So(p.Err.Error(), ShouldEqual, "dial tcp: unknown port tcp/-1") }) @@ -540,11 +541,12 @@ func TestPulseClient(t *testing.T) { p2 := c.GetTasks() So(p2.Err, ShouldBeNil) - p3 := c.GetTask(uint(p.ID)) + p3 := c.GetTask(p.ID) So(p3.Err, ShouldBeNil) - p4 := c.GetTask(0) + uuid := uuid.New() + p4 := c.GetTask(uuid) So(p4.Err, ShouldNotBeNil) - So(p4.Err.Error(), ShouldEqual, "No task with Id '0'") + So(p4.Err.Error(), ShouldEqual, fmt.Sprintf("No task with Id '%s'", uuid)) So(p4.ScheduledTaskReturned, ShouldBeNil) }) Convey("do returns err!=nil", func() { @@ -552,7 +554,7 @@ func TestPulseClient(t *testing.T) { uri := startAPI(port) c := New(uri, "v1") - p := c.GetTask(0) + p := c.GetTask(uuid.New()) p2 := c.GetTasks() So(p.Err, ShouldNotBeNil) @@ -598,8 +600,8 @@ func TestPulseClient(t *testing.T) { } } }() - c.StopTask(p.ID) - c.StartTask(p.ID) + startResp := c.StartTask(p.ID) + So(startResp.Err, ShouldBeNil) <-wait a.Lock() So(len(a.events), ShouldEqual, 10) diff --git a/mgmt/rest/client/task.go b/mgmt/rest/client/task.go index 2fc9550e3..31c6189c0 100644 --- a/mgmt/rest/client/task.go +++ b/mgmt/rest/client/task.go @@ -63,7 +63,7 @@ func (c *Client) CreateTask(s *Schedule, wf *wmap.WorkflowMap, name string, star } } -func (c *Client) WatchTask(id uint) *WatchTasksResult { +func (c *Client) WatchTask(id string) *WatchTasksResult { r := &WatchTasksResult{ EventChan: make(chan *rbody.StreamedTaskEvent), DoneChan: make(chan struct{}), @@ -123,7 +123,7 @@ func (c *Client) GetTasks() *GetTasksResult { } } -func (c *Client) GetTask(id uint) *GetTaskResult { +func (c *Client) GetTask(id string) *GetTaskResult { resp, err := c.do("GET", fmt.Sprintf("/tasks/%v", id), ContentTypeJSON, nil) if err != nil { return &GetTaskResult{Err: err} @@ -139,7 +139,7 @@ func (c *Client) GetTask(id uint) *GetTaskResult { } } -func (c *Client) StartTask(id int) *StartTasksResult { +func (c *Client) StartTask(id string) *StartTasksResult { resp, err := c.do("PUT", fmt.Sprintf("/tasks/%v/start", id), ContentTypeJSON) if err != nil { @@ -157,7 +157,7 @@ func (c *Client) StartTask(id int) *StartTasksResult { } } -func (c *Client) StopTask(id int) *StopTasksResult { +func (c *Client) StopTask(id string) *StopTasksResult { resp, err := c.do("PUT", fmt.Sprintf("/tasks/%v/stop", id), ContentTypeJSON) if err != nil { return &StopTasksResult{Err: err} @@ -177,7 +177,7 @@ func (c *Client) StopTask(id int) *StopTasksResult { } } -func (c *Client) RemoveTask(id int) *RemoveTasksResult { +func (c *Client) RemoveTask(id string) *RemoveTasksResult { resp, err := c.do("DELETE", fmt.Sprintf("/tasks/%v", id), ContentTypeJSON) if err != nil { return &RemoveTasksResult{Err: err} diff --git a/mgmt/rest/rbody/task.go b/mgmt/rest/rbody/task.go index 158304949..7ad88666f 100644 --- a/mgmt/rest/rbody/task.go +++ b/mgmt/rest/rbody/task.go @@ -58,7 +58,7 @@ type ScheduledTaskReturned struct { } func (s *ScheduledTaskReturned) ResponseBodyMessage() string { - return fmt.Sprintf("Scheduled task (%d) returned", s.ID) + return fmt.Sprintf("Scheduled task (%s) returned", s.ID) } func (s *ScheduledTaskReturned) ResponseBodyType() string { @@ -68,7 +68,7 @@ func (s *ScheduledTaskReturned) ResponseBodyType() string { type AddScheduledTask ScheduledTask func (s *AddScheduledTask) ResponseBodyMessage() string { - return fmt.Sprintf("Scheduled task created (%d)", s.ID) + return fmt.Sprintf("Scheduled task created (%s)", s.ID) } func (s *AddScheduledTask) ResponseBodyType() string { @@ -77,7 +77,7 @@ func (s *AddScheduledTask) ResponseBodyType() string { func AddSchedulerTaskFromTask(t core.Task) *AddScheduledTask { st := &AddScheduledTask{ - ID: int(t.ID()), + ID: t.ID(), Name: t.GetName(), Deadline: t.DeadlineDuration().String(), CreationTimestamp: t.CreationTime().Unix(), @@ -97,7 +97,7 @@ func AddSchedulerTaskFromTask(t core.Task) *AddScheduledTask { } type ScheduledTask struct { - ID int `json:"id"` + ID string `json:"id"` Name string `json:"name"` Deadline string `json:"deadline"` Workflow *wmap.WorkflowMap `json:"workflow,omitempty"` @@ -116,7 +116,7 @@ func (s *ScheduledTask) CreationTime() time.Time { } func (s *ScheduledTask) ResponseBodyMessage() string { - return fmt.Sprintf("Scheduled task created (%d)", s.ID) + return fmt.Sprintf("Scheduled task created (%s)", s.ID) } func (s *ScheduledTask) ResponseBodyType() string { @@ -125,7 +125,7 @@ func (s *ScheduledTask) ResponseBodyType() string { func SchedulerTaskFromTask(t core.Task) *ScheduledTask { st := &ScheduledTask{ - ID: int(t.ID()), + ID: t.ID(), Name: t.GetName(), Deadline: t.DeadlineDuration().String(), CreationTimestamp: t.CreationTime().Unix(), @@ -144,11 +144,11 @@ func SchedulerTaskFromTask(t core.Task) *ScheduledTask { type ScheduledTaskStarted struct { // TODO return resource - ID int `json:"id"` + ID string `json:"id"` } func (s *ScheduledTaskStarted) ResponseBodyMessage() string { - return fmt.Sprintf("Scheduled task (%d) started", s.ID) + return fmt.Sprintf("Scheduled task (%s) started", s.ID) } func (s *ScheduledTaskStarted) ResponseBodyType() string { @@ -157,11 +157,11 @@ func (s *ScheduledTaskStarted) ResponseBodyType() string { type ScheduledTaskStopped struct { // TODO return resource - ID int `json:"id"` + ID string `json:"id"` } func (s *ScheduledTaskStopped) ResponseBodyMessage() string { - return fmt.Sprintf("Scheduled task (%d) stopped", s.ID) + return fmt.Sprintf("Scheduled task (%s) stopped", s.ID) } func (s *ScheduledTaskStopped) ResponseBodyType() string { @@ -170,11 +170,11 @@ func (s *ScheduledTaskStopped) ResponseBodyType() string { type ScheduledTaskRemoved struct { // TODO return resource - ID int `json:"id"` + ID string `json:"id"` } func (s *ScheduledTaskRemoved) ResponseBodyMessage() string { - return fmt.Sprintf("Scheduled task (%d) removed", s.ID) + return fmt.Sprintf("Scheduled task (%s) removed", s.ID) } func (s *ScheduledTaskRemoved) ResponseBodyType() string { diff --git a/mgmt/rest/rest_func_test.go b/mgmt/rest/rest_func_test.go index 283a96f60..faf6cb243 100644 --- a/mgmt/rest/rest_func_test.go +++ b/mgmt/rest/rest_func_test.go @@ -19,6 +19,7 @@ import ( "time" log "github.com/Sirupsen/logrus" + "github.com/pborman/uuid" "github.com/intelsdi-x/pulse/control" "github.com/intelsdi-x/pulse/mgmt/rest/rbody" @@ -103,8 +104,8 @@ func (w *watchTaskResult) close() { close(w.doneChan) } -func watchTask(id, port int) *watchTaskResult { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks/%d/watch", port, id)) +func watchTask(id string, port int) *watchTaskResult { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks/%s/watch", port, id)) if err != nil { log.Fatal(err) } @@ -153,16 +154,16 @@ func getTasks(port int) *rbody.APIResponse { return getAPIResponse(resp) } -func getTask(id, port int) *rbody.APIResponse { - resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks/%d", port, id)) +func getTask(id string, port int) *rbody.APIResponse { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks/%s", port, id)) if err != nil { log.Fatal(err) } return getAPIResponse(resp) } -func startTask(id, port int) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%d/start", port, id) +func startTask(id string, port int) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s/start", port, id) client := &http.Client{} b := bytes.NewReader([]byte{}) req, err := http.NewRequest("PUT", uri, b) @@ -177,8 +178,8 @@ func startTask(id, port int) *rbody.APIResponse { return getAPIResponse(resp) } -func stopTask(id, port int) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%d/stop", port, id) +func stopTask(id string, port int) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s/stop", port, id) client := &http.Client{} b := bytes.NewReader([]byte{}) req, err := http.NewRequest("PUT", uri, b) @@ -193,8 +194,8 @@ func stopTask(id, port int) *rbody.APIResponse { return getAPIResponse(resp) } -func removeTask(id, port int) *rbody.APIResponse { - uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%d", port, id) +func removeTask(id string, port int) *rbody.APIResponse { + uri := fmt.Sprintf("http://localhost:%d/v1/tasks/%s", port, id) client := &http.Client{} req, err := http.NewRequest("DELETE", uri, nil) if err != nil { @@ -981,10 +982,11 @@ func TestPluginRestCalls(t *testing.T) { port := getPort() startAPI(port) - r1 := removeTask(99999, port) + uuid := uuid.New() + r1 := removeTask(uuid, port) So(r1.Body, ShouldHaveSameTypeAs, new(rbody.Error)) plr1 := r1.Body.(*rbody.Error) - So(plr1.ErrorMessage, ShouldEqual, "No task found with id '99999'") + So(plr1.ErrorMessage, ShouldEqual, fmt.Sprintf("No task found with id '%s'", uuid)) }) Convey("removes a task", func() { port := getPort() @@ -1018,7 +1020,7 @@ func TestPluginRestCalls(t *testing.T) { }) }) Convey("Watch task - get - /v1/tasks/:id/watch", func() { - Convey("---", func() { + Convey("---", func(c C) { port := getPort() startAPI(port) @@ -1027,9 +1029,9 @@ func TestPluginRestCalls(t *testing.T) { uploadPlugin(PSUTIL_PLUGIN_PATH, port) r1 := createTask("1.json", "xenu", "500ms", true, port) + So(r1.Meta.Code, ShouldEqual, 201) So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) plr1 := r1.Body.(*rbody.AddScheduledTask) - id := plr1.ID // Change buffer window to 10ms (do not do this IRL) diff --git a/mgmt/rest/server.go b/mgmt/rest/server.go index 1807a576d..561298e24 100644 --- a/mgmt/rest/server.go +++ b/mgmt/rest/server.go @@ -61,12 +61,12 @@ type managesMetrics interface { type managesTasks interface { CreateTask(cschedule.Schedule, *wmap.WorkflowMap, bool, ...core.TaskOption) (core.Task, core.TaskErrors) - GetTasks() map[uint64]core.Task - GetTask(uint64) (core.Task, error) - StartTask(uint64) []perror.PulseError - StopTask(uint64) []perror.PulseError - RemoveTask(uint64) error - WatchTask(uint64, core.TaskWatcherHandler) (core.TaskWatcherCloser, error) + GetTasks() map[string]core.Task + GetTask(string) (core.Task, error) + StartTask(string) []perror.PulseError + StopTask(string) []perror.PulseError + RemoveTask(string) error + WatchTask(string, core.TaskWatcherHandler) (core.TaskWatcherCloser, error) } type managesTribe interface { diff --git a/mgmt/rest/task.go b/mgmt/rest/task.go index 9db79fa3a..c9e1bf4d4 100644 --- a/mgmt/rest/task.go +++ b/mgmt/rest/task.go @@ -6,7 +6,6 @@ import ( "io" "net/http" "sort" - "strconv" "time" log "github.com/Sirupsen/logrus" @@ -106,11 +105,7 @@ func (s *Server) getTasks(w http.ResponseWriter, r *http.Request, _ httprouter.P } func (s *Server) getTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - id, err := strconv.ParseUint(p.ByName("id"), 0, 64) - if err != nil { - respond(500, rbody.FromError(err), w) - return - } + id := p.ByName("id") t, err1 := s.mt.GetTask(id) if err1 != nil { respond(404, rbody.FromError(err1), w) @@ -128,11 +123,8 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. "client": r.RemoteAddr, }) - id, err := strconv.ParseUint(p.ByName("id"), 0, 64) - if err != nil { - respond(500, rbody.FromError(err), w) - return - } + id := p.ByName("id") + logger.WithFields(log.Fields{ "task-id": id, }).Debug("request to watch task") @@ -179,11 +171,11 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. "task-watcher-event": e.EventType, }).Debug("new event") switch e.EventType { - case rbody.TaskWatchMetricEvent, rbody.TaskWatchTaskStarted, rbody.TaskWatchTaskStopped: + case rbody.TaskWatchMetricEvent, rbody.TaskWatchTaskStarted: // The client can decide to stop receiving on the stream on Task Stopped. // We write the event to the buffer fmt.Fprintf(w, "%s\n", e.ToJSON()) - case rbody.TaskWatchTaskDisabled: + case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped: // A disabled task should end the streaming and close the connection fmt.Fprintf(w, "%s\n", e.ToJSON()) // Flush since we are sending nothing new @@ -214,46 +206,34 @@ func (s *Server) watchTask(w http.ResponseWriter, r *http.Request, p httprouter. } func (s *Server) startTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - id, err := strconv.ParseUint(p.ByName("id"), 0, 64) - if err != nil { - respond(500, rbody.FromError(err), w) - return - } + id := p.ByName("id") errs := s.mt.StartTask(id) if errs != nil { respond(404, rbody.FromPulseErrors(errs), w) return } // TODO should return resource - respond(200, &rbody.ScheduledTaskStarted{ID: int(id)}, w) + respond(200, &rbody.ScheduledTaskStarted{ID: id}, w) } func (s *Server) stopTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - id, err := strconv.ParseUint(p.ByName("id"), 0, 64) - if err != nil { - respond(500, rbody.FromError(err), w) - return - } + id := p.ByName("id") errs := s.mt.StopTask(id) if errs != nil { respond(404, rbody.FromPulseErrors(errs), w) return } - respond(200, &rbody.ScheduledTaskStopped{ID: int(id)}, w) + respond(200, &rbody.ScheduledTaskStopped{ID: id}, w) } func (s *Server) removeTask(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - id, err := strconv.ParseUint(p.ByName("id"), 0, 64) - if err != nil { - respond(500, rbody.FromError(err), w) - return - } - err = s.mt.RemoveTask(id) + id := p.ByName("id") + err := s.mt.RemoveTask(id) if err != nil { respond(404, rbody.FromError(err), w) return } - respond(200, &rbody.ScheduledTaskRemoved{ID: int(id)}, w) + respond(200, &rbody.ScheduledTaskRemoved{ID: id}, w) } func marshalTask(body io.ReadCloser) (*request.TaskCreationRequest, error) { diff --git a/mgmt/rest/tribe_test.go b/mgmt/rest/tribe_test.go index 47657d45b..792ddf145 100644 --- a/mgmt/rest/tribe_test.go +++ b/mgmt/rest/tribe_test.go @@ -8,6 +8,7 @@ import ( "net/http" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -20,9 +21,7 @@ import ( "github.com/intelsdi-x/pulse/scheduler" ) -func init() { - log.SetLevel(log.WarnLevel) -} +var lock sync.Mutex = sync.Mutex{} func getMembers(port int) *rbody.APIResponse { resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/v1/tribe/members", port)) @@ -96,14 +95,13 @@ func addAgreement(port int, name string) *rbody.APIResponse { return getAPIResponse(resp) } -func TestTribePluginAgreements(t *testing.T) { - var ( - lpName, lpType string - lpVersion int - ) - numOfNodes := 8 +func TestTribeTaskAgreements(t *testing.T) { + log.SetLevel(log.WarnLevel) + lock.Lock() + numOfNodes := 5 aName := "agreement1" mgtPorts := startTribes(numOfNodes) + lock.Unlock() Convey("A cluster is started", t, func() { Convey("Members are retrieved", func() { for _, i := range mgtPorts { @@ -135,7 +133,6 @@ func TestTribePluginAgreements(t *testing.T) { timedOut = true return default: - time.Sleep(50 * time.Millisecond) resp := getMember(port, name) if resp.Meta.Code == 200 { c.So(resp.Body.(*rbody.TribeMemberShow), ShouldHaveSameTypeAs, new(rbody.TribeMemberShow)) @@ -143,6 +140,7 @@ func TestTribePluginAgreements(t *testing.T) { return } } + time.Sleep(200 * time.Millisecond) } } }(i, fmt.Sprintf("member-%d", i)) @@ -150,13 +148,10 @@ func TestTribePluginAgreements(t *testing.T) { wg.Wait() So(timedOut, ShouldEqual, false) - Convey("A plugin is uploaded", func() { + Convey("Plugins and a task are uploaded", func() { resp := uploadPlugin(DUMMY_PLUGIN_PATH1, mgtPorts[0]) So(resp.Meta.Code, ShouldEqual, 201) So(resp.Meta.Type, ShouldEqual, rbody.PluginsLoadedType) - lpName = resp.Body.(*rbody.PluginsLoaded).LoadedPlugins[0].Name - lpVersion = resp.Body.(*rbody.PluginsLoaded).LoadedPlugins[0].Version - lpType = resp.Body.(*rbody.PluginsLoaded).LoadedPlugins[0].Type resp = uploadPlugin(DUMMY_PUBLISHER_PATH, mgtPorts[0]) So(resp.Meta.Code, ShouldEqual, 201) So(resp.Meta.Type, ShouldEqual, rbody.PluginsLoadedType) @@ -166,19 +161,47 @@ func TestTribePluginAgreements(t *testing.T) { resp = getAgreement(mgtPorts[0], aName) So(resp.Meta.Code, ShouldEqual, 200) So(len(resp.Body.(*rbody.TribeGetAgreement).Agreement.PluginAgreement.Plugins), ShouldEqual, 2) + resp = createTask("1.json", "task1", "1s", true, mgtPorts[0]) + So(resp.Meta.Code, ShouldEqual, 201) + So(resp.Meta.Type, ShouldEqual, rbody.AddScheduledTaskType) + So(resp.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - Convey("A task is created", func() { - resp = createTask("1.json", "task1", "1s", false, mgtPorts[0]) - So(resp.Meta.Code, ShouldEqual, 201) - So(resp.Meta.Type, ShouldEqual, rbody.AddScheduledTaskType) - So(resp.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - Convey("The cluster agrees on plugins", func(c C) { + Convey("The cluster agrees on tasks", func(c C) { + var wg sync.WaitGroup + timedOut := false + for _, i := range mgtPorts { + timer := time.After(15 * time.Second) + wg.Add(1) + go func(port int, name string) { + defer wg.Done() + for { + select { + case <-timer: + timedOut = true + return + default: + resp := getAgreement(port, name) + if resp.Meta.Code == 200 { + c.So(resp.Body.(*rbody.TribeGetAgreement), ShouldHaveSameTypeAs, new(rbody.TribeGetAgreement)) + if len(resp.Body.(*rbody.TribeGetAgreement).Agreement.TaskAgreement.Tasks) == 1 { + return + } + } + time.Sleep(200 * time.Millisecond) + } + } + }(i, aName) + } + wg.Wait() + So(timedOut, ShouldEqual, false) + + Convey("The task has been shared and loaded across the cluster", func(c C) { var wg sync.WaitGroup timedOut := false - for _, i := range mgtPorts { + for i := 0; i < numOfNodes; i++ { timer := time.After(15 * time.Second) wg.Add(1) - go func(port int, name string) { + go func(port int) { defer wg.Done() for { select { @@ -186,94 +209,197 @@ func TestTribePluginAgreements(t *testing.T) { timedOut = true return default: - resp := getAgreement(port, name) + resp := getTasks(port) if resp.Meta.Code == 200 { - c.So(resp.Body.(*rbody.TribeGetAgreement), ShouldHaveSameTypeAs, new(rbody.TribeGetAgreement)) - if len(resp.Body.(*rbody.TribeGetAgreement).Agreement.PluginAgreement.Plugins) == 2 && len(resp.Body.(*rbody.TribeGetAgreement).Agreement.TaskAgreement.Tasks) == 1 { + if len(resp.Body.(*rbody.ScheduledTaskListReturned).ScheduledTasks) == 1 { + log.Debugf("node %v has %d tasks", port, len(resp.Body.(*rbody.ScheduledTaskListReturned).ScheduledTasks)) return } + log.Debugf("node %v has %d tasks", port, len(resp.Body.(*rbody.ScheduledTaskListReturned).ScheduledTasks)) + } else { + log.Debugf("node %v error getting task", port) } - time.Sleep(50 * time.Millisecond) + time.Sleep(400 * time.Millisecond) } } - }(i, aName) + }(mgtPorts[i]) } wg.Wait() So(timedOut, ShouldEqual, false) + }) + }) + }) + }) + }) + }) + }) +} + +func TestTribePluginAgreements(t *testing.T) { + lock.Lock() + var ( + lpName, lpType string + lpVersion int + ) + numOfNodes := 5 + aName := "agreement1" + mgtPorts := startTribes(numOfNodes) + lock.Unlock() + Convey("A cluster is started", t, func() { + Convey("Members are retrieved", func() { + for _, i := range mgtPorts { + m := getMembers(i) + So(m.Body, ShouldHaveSameTypeAs, new(rbody.TribeMemberList)) + So(len(m.Body.(*rbody.TribeMemberList).Members), ShouldEqual, numOfNodes) + } + }) + Convey("An agreement is added", func() { + a := addAgreement(mgtPorts[0], aName) + So(a.Body, ShouldHaveSameTypeAs, new(rbody.TribeAddAgreement)) + Convey("All members join the agreement", func() { + for _, i := range mgtPorts { + j := joinAgreement(mgtPorts[0], fmt.Sprintf("member-%d", i), aName) + So(j.Meta.Code, ShouldEqual, 200) + So(j.Body, ShouldHaveSameTypeAs, new(rbody.TribeJoinAgreement)) + } + Convey("All members have joined the agreement", func(c C) { + var wg sync.WaitGroup + timedOut := false + for _, i := range mgtPorts { + timer := time.After(15 * time.Second) + wg.Add(1) + go func(port int, name string) { + defer wg.Done() + for { + select { + case <-timer: + timedOut = true + return + default: + resp := getMember(port, name) + if resp.Meta.Code == 200 { + c.So(resp.Body.(*rbody.TribeMemberShow), ShouldHaveSameTypeAs, new(rbody.TribeMemberShow)) + if resp.Body.(*rbody.TribeMemberShow).PluginAgreement == aName { + return + } + } + time.Sleep(200 * time.Millisecond) + } + } + }(i, fmt.Sprintf("member-%d", i)) + } + wg.Wait() + So(timedOut, ShouldEqual, false) - Convey("The plugins have been shared and loaded across the cluster", func(c C) { - var wg sync.WaitGroup - timedOut := false - for _, i := range mgtPorts { - timer := time.After(15 * time.Second) - wg.Add(1) - go func(port int) { - defer wg.Done() - for { - select { - case <-timer: - timedOut = true + Convey("A plugin is uploaded", func() { + resp := uploadPlugin(DUMMY_PLUGIN_PATH1, mgtPorts[0]) + So(resp.Meta.Code, ShouldEqual, 201) + So(resp.Meta.Type, ShouldEqual, rbody.PluginsLoadedType) + lpName = resp.Body.(*rbody.PluginsLoaded).LoadedPlugins[0].Name + lpVersion = resp.Body.(*rbody.PluginsLoaded).LoadedPlugins[0].Version + lpType = resp.Body.(*rbody.PluginsLoaded).LoadedPlugins[0].Type + resp = getPluginList(mgtPorts[0]) + So(resp.Meta.Code, ShouldEqual, 200) + So(len(resp.Body.(*rbody.PluginList).LoadedPlugins), ShouldEqual, 1) + resp = getAgreement(mgtPorts[0], aName) + So(resp.Meta.Code, ShouldEqual, 200) + So(len(resp.Body.(*rbody.TribeGetAgreement).Agreement.PluginAgreement.Plugins), ShouldEqual, 1) + + Convey("The cluster agrees on plugins", func(c C) { + var wg sync.WaitGroup + timedOut := false + for _, i := range mgtPorts { + timer := time.After(15 * time.Second) + wg.Add(1) + go func(port int, name string) { + defer wg.Done() + for { + select { + case <-timer: + timedOut = true + return + default: + resp := getAgreement(port, name) + if resp.Meta.Code == 200 { + c.So(resp.Body.(*rbody.TribeGetAgreement), ShouldHaveSameTypeAs, new(rbody.TribeGetAgreement)) + if len(resp.Body.(*rbody.TribeGetAgreement).Agreement.PluginAgreement.Plugins) == 1 { return - default: - resp := getPluginList(port) - if resp.Meta.Code == 200 { - c.So(resp.Body.(*rbody.PluginList), ShouldHaveSameTypeAs, new(rbody.PluginList)) - if len(resp.Body.(*rbody.PluginList).LoadedPlugins) == 2 { - resp2 := getTasks(port) - if resp2.Meta.Code == 200 { - if len(resp2.Body.(*rbody.ScheduledTaskListReturned).ScheduledTasks) == 1 { - return - } - } - return - } - } - time.Sleep(200 * time.Millisecond) } } - }(i) + time.Sleep(200 * time.Millisecond) + } } - wg.Wait() - So(timedOut, ShouldEqual, false) + }(i, aName) + } + wg.Wait() + So(timedOut, ShouldEqual, false) - Convey("A plugin is unloaded", func() { - resp := unloadPlugin(mgtPorts[0], lpType, lpName, lpVersion) - So(resp.Meta.Code, ShouldEqual, 200) - So(resp.Meta.Type, ShouldEqual, rbody.PluginUnloadedType) - resp = getPluginList(mgtPorts[0]) - So(resp.Meta.Code, ShouldEqual, 200) - So(len(resp.Body.(*rbody.PluginList).LoadedPlugins), ShouldEqual, 1) + Convey("The plugins have been shared and loaded across the cluster", func(c C) { + var wg sync.WaitGroup + timedOut := false + for _, i := range mgtPorts { + timer := time.After(15 * time.Second) + wg.Add(1) + go func(port int) { + defer wg.Done() + for { + select { + case <-timer: + timedOut = true + return + default: + resp := getPluginList(port) + if resp.Meta.Code == 200 { + c.So(resp.Body.(*rbody.PluginList), ShouldHaveSameTypeAs, new(rbody.PluginList)) + if len(resp.Body.(*rbody.PluginList).LoadedPlugins) == 1 { + return + } + } + time.Sleep(200 * time.Millisecond) + } + } + }(i) + } + wg.Wait() + So(timedOut, ShouldEqual, false) + + Convey("A plugin is unloaded", func() { + resp := unloadPlugin(mgtPorts[0], lpType, lpName, lpVersion) + So(resp.Meta.Code, ShouldEqual, 200) + So(resp.Meta.Type, ShouldEqual, rbody.PluginUnloadedType) + resp = getPluginList(mgtPorts[0]) + So(resp.Meta.Code, ShouldEqual, 200) + So(len(resp.Body.(*rbody.PluginList).LoadedPlugins), ShouldEqual, 0) - Convey("The cluster unloads the plugin", func() { - var wg sync.WaitGroup - timedOut := false - for i := 0; i < numOfNodes; i++ { - timer := time.After(15 * time.Second) - wg.Add(1) - go func(port int) { - defer wg.Done() - for { - select { - case <-timer: - timedOut = true - return - default: - resp := getPluginList(port) - if resp.Meta.Code == 200 { - c.So(resp.Body.(*rbody.PluginList), ShouldHaveSameTypeAs, new(rbody.PluginList)) - if len(resp.Body.(*rbody.PluginList).LoadedPlugins) == 1 { - return - } - tribeLogger.Debugf("member %v has %v plugins", port, len(resp.Body.(*rbody.PluginList).LoadedPlugins)) + Convey("The cluster unloads the plugin", func() { + var wg sync.WaitGroup + timedOut := false + for i := 0; i < numOfNodes; i++ { + timer := time.After(15 * time.Second) + wg.Add(1) + go func(port int) { + defer wg.Done() + for { + select { + case <-timer: + timedOut = true + return + default: + resp := getPluginList(port) + if resp.Meta.Code == 200 { + c.So(resp.Body.(*rbody.PluginList), ShouldHaveSameTypeAs, new(rbody.PluginList)) + if len(resp.Body.(*rbody.PluginList).LoadedPlugins) == 0 { + return } - time.Sleep(200 * time.Millisecond) + tribeLogger.Debugf("member %v has %v plugins", port, len(resp.Body.(*rbody.PluginList).LoadedPlugins)) } + time.Sleep(200 * time.Millisecond) } - }(mgtPorts[i]) - } - wg.Wait() - So(timedOut, ShouldEqual, false) - }) + } + }(mgtPorts[i]) + } + wg.Wait() + So(timedOut, ShouldEqual, false) }) }) }) @@ -286,8 +412,8 @@ func TestTribePluginAgreements(t *testing.T) { } func startTribes(count int) []int { - log.SetLevel(log.DebugLevel) - var seed string + + seed := "" var wg sync.WaitGroup var mgtPorts []int for i := 0; i < count; i++ { @@ -295,6 +421,7 @@ func startTribes(count int) []int { mgtPorts = append(mgtPorts, mgtPort) tribePort := getAvailablePort() conf := tribe.DefaultConfig(fmt.Sprintf("member-%v", mgtPort), "127.0.0.1", tribePort, seed, mgtPort) + conf.MemberlistConfig.PushPullInterval = 5 * time.Second if seed == "" { seed = fmt.Sprintf("%s:%d", "127.0.0.1", tribePort) } @@ -341,16 +468,20 @@ func startTribes(count int) []int { return mgtPorts } +var nextPort uint64 = 51234 + func getAvailablePort() int { - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + atomic.AddUint64(&nextPort, 1) + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%d", nextPort)) if err != nil { panic(err) } l, err := net.ListenTCP("tcp", addr) + defer l.Close() if err != nil { - panic(err) + return getAvailablePort() } - defer l.Close() + return l.Addr().(*net.TCPAddr).Port } diff --git a/mgmt/tribe/agreement/agreement.go b/mgmt/tribe/agreement/agreement.go index 81db8edf3..90ca7a5bf 100644 --- a/mgmt/tribe/agreement/agreement.go +++ b/mgmt/tribe/agreement/agreement.go @@ -39,7 +39,8 @@ type taskAgreement struct { } type Task struct { - ID uint64 `json:"id"` + ID string `json:"id"` + StartOnCreate bool `json:"start_on_create"` } func New(name string) *Agreement { diff --git a/mgmt/tribe/messages.go b/mgmt/tribe/messages.go index 35ff01155..3653d09cc 100644 --- a/mgmt/tribe/messages.go +++ b/mgmt/tribe/messages.go @@ -108,7 +108,8 @@ func (a *agreementMsg) String() string { type taskMsg struct { LTime LTime UUID string - TaskID uint64 + TaskID string + StartOnCreate bool AgreementName string Type msgType } diff --git a/mgmt/tribe/tribe.go b/mgmt/tribe/tribe.go index f7317c3ce..28274d417 100644 --- a/mgmt/tribe/tribe.go +++ b/mgmt/tribe/tribe.go @@ -68,26 +68,26 @@ type tribe struct { type config struct { seed string restAPIPort int - memberlistConfig *memberlist.Config + MemberlistConfig *memberlist.Config } func DefaultConfig(name, advertiseAddr string, advertisePort int, seed string, restAPIPort int) *config { c := &config{seed: seed, restAPIPort: restAPIPort} - c.memberlistConfig = memberlist.DefaultLANConfig() - c.memberlistConfig.PushPullInterval = 300 * time.Second - c.memberlistConfig.Name = name - c.memberlistConfig.BindAddr = advertiseAddr - c.memberlistConfig.BindPort = advertisePort - c.memberlistConfig.GossipNodes = c.memberlistConfig.GossipNodes * 2 + c.MemberlistConfig = memberlist.DefaultLANConfig() + c.MemberlistConfig.PushPullInterval = 300 * time.Second + c.MemberlistConfig.Name = name + c.MemberlistConfig.BindAddr = advertiseAddr + c.MemberlistConfig.BindPort = advertisePort + c.MemberlistConfig.GossipNodes = c.MemberlistConfig.GossipNodes * 2 return c } func New(c *config) (*tribe, error) { logger := logger.WithFields(log.Fields{ "_block": "New", - "port": c.memberlistConfig.BindPort, - "addr": c.memberlistConfig.BindAddr, - "name": c.memberlistConfig.Name, + "port": c.MemberlistConfig.BindPort, + "addr": c.MemberlistConfig.BindAddr, + "name": c.MemberlistConfig.Name, }) tribe := &tribe{ @@ -95,7 +95,7 @@ func New(c *config) (*tribe, error) { members: map[string]*agreement.Member{}, msgBuffer: make([]msg, 512), intentBuffer: []msg{}, - logger: logger.WithField("_name", c.memberlistConfig.Name), + logger: logger.WithField("_name", c.MemberlistConfig.Name), tags: map[string]string{agreement.RestAPIPort: strconv.Itoa(c.restAPIPort)}, pluginWorkQueue: make(chan worker.PluginRequest, 999), taskWorkQueue: make(chan worker.TaskRequest, 999), @@ -109,10 +109,10 @@ func New(c *config) (*tribe, error) { } //configure delegates - c.memberlistConfig.Delegate = &delegate{tribe: tribe} - c.memberlistConfig.Events = &memberDelegate{tribe: tribe} + c.MemberlistConfig.Delegate = &delegate{tribe: tribe} + c.MemberlistConfig.Events = &memberDelegate{tribe: tribe} - ml, err := memberlist.Create(c.memberlistConfig) + ml, err := memberlist.Create(c.MemberlistConfig) if err != nil { logger.WithFields(log.Fields{ "_block": "New", @@ -254,7 +254,7 @@ func (t *tribe) HandleGomitEvent(e gomit.Event) { "plugin_name": v.Name, "plugin_version": v.Version, "plugin_type": core.PluginType(v.Type).String(), - }).Debugf("Handling load plugin event") + }).Errorf("Handling load plugin event") plugin := agreement.Plugin{ Name_: v.Name, Version_: v.Version, @@ -293,11 +293,11 @@ func (t *tribe) HandleGomitEvent(e gomit.Event) { "event": e.Namespace(), "task_id": v.TaskID, "task_start_on_create": v.StartOnCreate, - }).Debugf("Handling task create event") + }).Errorf("Handling task create event") task := agreement.Task{ - ID: v.TaskID, + ID: v.TaskID, + StartOnCreate: v.StartOnCreate, } - if m, ok := t.members[t.memberlist.LocalNode().Name]; ok { if m.TaskAgreements != nil { for n, a := range m.TaskAgreements { @@ -306,7 +306,6 @@ func (t *tribe) HandleGomitEvent(e gomit.Event) { } } } - } } @@ -432,6 +431,7 @@ func (t *tribe) AddTask(agreementName string, task agreement.Task) perror.PulseE msg := &taskMsg{ LTime: t.clock.Increment(), TaskID: task.ID, + StartOnCreate: task.StartOnCreate, AgreementName: agreementName, UUID: uuid.New(), Type: addTaskMsgType, @@ -520,6 +520,18 @@ func (t *tribe) processAddPluginIntents() bool { if ok, _ := t.agreements[intent.AgreementName].PluginAgreement.Plugins.Contains(intent.Plugin); !ok { t.agreements[intent.AgreementName].PluginAgreement.Plugins = append(t.agreements[intent.AgreementName].PluginAgreement.Plugins, intent.Plugin) t.intentBuffer = append(t.intentBuffer[:idx], t.intentBuffer[idx+1:]...) + + ptype, _ := core.ToPluginType(intent.Plugin.TypeName()) + work := worker.PluginRequest{ + Plugin: agreement.Plugin{ + Name_: intent.Plugin.Name(), + Version_: intent.Plugin.Version(), + Type_: ptype, + }, + RequestType: worker.PluginLoadedType, + } + t.pluginWorkQueue <- work + return false } } @@ -552,6 +564,16 @@ func (t *tribe) processAddTaskIntents() bool { if ok, _ := a.TaskAgreement.Tasks.Contains(agreement.Task{ID: intent.TaskID}); !ok { a.TaskAgreement.Tasks = append(a.TaskAgreement.Tasks, agreement.Task{ID: intent.TaskID}) t.intentBuffer = append(t.intentBuffer[:idx], t.intentBuffer[idx+1:]...) + + work := worker.TaskRequest{ + Task: worker.Task{ + ID: intent.TaskID, + StartOnCreate: intent.StartOnCreate, + }, + RequestType: worker.TaskCreatedType, + } + t.taskWorkQueue <- work + return false } } @@ -731,7 +753,8 @@ func (t *tribe) handleAddTask(msg *taskMsg) bool { work := worker.TaskRequest{ Task: worker.Task{ - ID: msg.TaskID, + ID: msg.TaskID, + StartOnCreate: msg.StartOnCreate, }, RequestType: worker.TaskCreatedType, } diff --git a/mgmt/tribe/tribe_test.go b/mgmt/tribe/tribe_test.go index e64079918..bdb26bb15 100644 --- a/mgmt/tribe/tribe_test.go +++ b/mgmt/tribe/tribe_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "net" "sync" + "sync/atomic" "testing" "time" @@ -16,28 +17,17 @@ import ( . "github.com/smartystreets/goconvey/convey" ) -func init() { - log.SetLevel(log.InfoLevel) -} - -var inc uint64 - -func getNextTaskID() uint64 { - inc = inc + 1 - return inc -} - func TestFullStateSync(t *testing.T) { tribes := []*tribe{} - numOfTribes := 8 + numOfTribes := 5 agreement1 := "agreement1" plugin1 := agreement.Plugin{Name_: "plugin1", Version_: 1, Type_: core.ProcessorPluginType} - task1 := agreement.Task{ID: getNextTaskID()} + task1 := agreement.Task{ID: uuid.New()} Convey("Tribe members are started", t, func() { conf := DefaultConfig("seed", "127.0.0.1", getAvailablePort(), "", getAvailablePort()) - conf.memberlistConfig.PushPullInterval = 200 * time.Millisecond + conf.MemberlistConfig.PushPullInterval = 200 * time.Millisecond // conf.memberlistConfig.GossipInterval = 300 * time.Second - conf.memberlistConfig.GossipNodes = numOfTribes / 2 + conf.MemberlistConfig.GossipNodes = numOfTribes / 2 seed, err := New(conf) tribes = append(tribes, seed) So(err, ShouldBeNil) @@ -45,8 +35,8 @@ func TestFullStateSync(t *testing.T) { timer := time.After(4 * time.Second) for i := 1; i < numOfTribes; i++ { conf = DefaultConfig(fmt.Sprintf("member-%v", i), "127.0.0.1", getAvailablePort(), fmt.Sprintf("%v:%v", "127.0.0.1", seed.memberlist.LocalNode().Port), getAvailablePort()) - conf.memberlistConfig.GossipInterval = 300 * time.Second - conf.memberlistConfig.GossipNodes = numOfTribes / 2 + conf.MemberlistConfig.GossipInterval = 300 * time.Second + conf.MemberlistConfig.GossipNodes = numOfTribes / 2 tr, err := New(conf) So(err, ShouldBeNil) So(tr, ShouldNotBeNil) @@ -122,8 +112,8 @@ func TestFullStateSyncOnJoin(t *testing.T) { agreement1 := "agreement1" plugin1 := agreement.Plugin{Name_: "plugin1", Version_: 1} plugin2 := agreement.Plugin{Name_: "plugin2", Version_: 1} - task1 := agreement.Task{ID: getNextTaskID()} - task2 := agreement.Task{ID: getNextTaskID()} + task1 := agreement.Task{ID: uuid.New()} + task2 := agreement.Task{ID: uuid.New()} Convey("A seed is started", t, func() { conf := DefaultConfig("seed", "127.0.0.1", getAvailablePort(), "", getAvailablePort()) seed, err := New(conf) @@ -209,8 +199,8 @@ func TestTaskAgreements(t *testing.T) { agreementName := "agreement1" agreementName2 := "agreement2" - task1 := agreement.Task{ID: getNextTaskID()} - task2 := agreement.Task{ID: getNextTaskID()} + task1 := agreement.Task{ID: uuid.New()} + task2 := agreement.Task{ID: uuid.New()} Convey("a member handles", func() { t := tribes[rand.Intn(numOfTribes)] Convey("an out of order 'add task' message", func() { @@ -237,7 +227,7 @@ func TestTaskAgreements(t *testing.T) { err := t.AddTask(agreementName, task1) So(err.Error(), ShouldResemble, errTaskAlreadyExists.Error()) Convey("removing a task that doesn't exist", func() { - err := t.RemoveTask(agreementName, agreement.Task{ID: 999}) + err := t.RemoveTask(agreementName, agreement.Task{ID: uuid.New()}) So(err.Error(), ShouldResemble, errTaskDoesNotExist.Error()) err = t.RemoveTask("doesn't exist", task1) So(err.Error(), ShouldResemble, errAgreementDoesNotExist.Error()) @@ -925,15 +915,18 @@ func getTribes(numOfTribes int, seedTribe *tribe) []*tribe { return tribes } +var nextPort uint64 = 55234 + func getAvailablePort() int { - addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + atomic.AddUint64(&nextPort, 1) + addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("localhost:%d", nextPort)) if err != nil { panic(err) } l, err := net.ListenTCP("tcp", addr) if err != nil { - panic(err) + return getAvailablePort() } defer l.Close() return l.Addr().(*net.TCPAddr).Port diff --git a/mgmt/tribe/worker/worker.go b/mgmt/tribe/worker/worker.go index 2e484f54a..604075b14 100644 --- a/mgmt/tribe/worker/worker.go +++ b/mgmt/tribe/worker/worker.go @@ -22,7 +22,6 @@ import ( const ( PluginLoadedType = iota - // pluginAddedToAgreementType ) const ( @@ -44,7 +43,8 @@ type TaskRequest struct { } type Task struct { - ID uint64 + ID string + StartOnCreate bool } type ManagesPlugins interface { @@ -54,7 +54,7 @@ type ManagesPlugins interface { } type ManagesTasks interface { - GetTask(id uint64) (core.Task, error) + GetTask(id string) (core.Task, error) CreateTask(sch schedule.Schedule, wfMap *wmap.WorkflowMap, startOnCreate bool, opts ...core.TaskOption) (core.Task, core.TaskErrors) } @@ -88,7 +88,8 @@ func newWorker(id int, pluginWorkerQueue: pluginWorkerQueue, taskWork: make(chan TaskRequest), taskWorkerQueue: taskWorkerQueue, - quitChan: make(chan bool)} + quitChan: make(chan bool), + } return worker } @@ -120,37 +121,112 @@ func DispatchWorkers(nworkers int, pluginWorkQueue chan PluginRequest, taskWorkQ for { select { case pluginWork := <-pluginWorkQueue: - workerLogger.Debug("Received plugin work requeust") + workerLogger.Infof("Received plugin work request") go func() { pluginWorker := <-pluginWorkerQueue - workerLogger.Debug("Dispatching plugin work request") + workerLogger.Infof("Dispatching plugin work request") pluginWorker <- pluginWork }() case taskWork := <-taskWorkQueue: - workerLogger.Debug("Received task work requeust") + workerLogger.Infof("Received task work request") go func() { + workerLogger.Infof("Waiting for free worker") taskWorker := <-taskWorkerQueue - workerLogger.Debug("Dispatching plugin work request") + workerLogger.Infof("Dispatching task work request") taskWorker <- taskWork }() case <-quitChan: - workerLogger.Debug("Stopping plugin work dispatcher") + workerLogger.Infof("Stopping plugin work dispatcher") return } } }() } -// Start "starts" the worker +// Start "starts" the workers func (w worker) start() { + // task worker + go func() { + for { + defer w.waitGroup.Done() + w.taskWorkerQueue <- w.taskWork + + select { + case work := <-w.taskWork: + done := false + // Receive a work request. + wLogger := workerLogger.WithFields(log.Fields{ + "task": work.Task.ID, + "worker": w.id, + "_block": "start", + }) + if work.RequestType == TaskCreatedType { + task, _ := w.taskManager.GetTask(work.Task.ID) + if task != nil { + wLogger.Warn("we already have a task with this Id") + } else { + for { + members, err := w.memberManager.GetTaskAgreementMembers() + if err != nil { + wLogger.Error(err) + continue + } + for _, member := range shuffle(members) { + uri := fmt.Sprintf("http://%s:%s", member.GetAddr(), member.GetRESTAPIPort()) + wLogger.Debugf("getting task %v from %v", work.Task.ID, uri) + c := client.New(uri, "v1") + taskResult := c.GetTask(work.Task.ID) + if taskResult.Err != nil { + wLogger.WithField("err", taskResult.Err.Error()).Debug("error getting task") + continue + } + wLogger.Debug("creating task") + opt := core.SetTaskID(work.Task.ID) + t, errs := w.taskManager.CreateTask( + getSchedule(taskResult.ScheduledTaskReturned.Schedule), + taskResult.Workflow, + work.Task.StartOnCreate, + opt) + if errs != nil && len(errs.Errors()) > 0 { + fields := log.Fields{ + "task": work.Task.ID, + "worker": w.id, + "_block": "start", + } + for idx, e := range errs.Errors() { + fields[fmt.Sprintf("err-%d", idx)] = e + } + wLogger.WithFields(fields).Debug("error creating task") + continue + } + wLogger.WithField("id", t.ID()).Debugf("task created") + done = true + break + } + if done { + break + } + time.Sleep(500 * time.Millisecond) + } + } + } + + case <-w.quitChan: + workerLogger.Infof("Tribe plugin worker-%d is stopping\n", w.id) + return + + } + } + }() + + // plugin worker go func() { defer w.waitGroup.Done() for { // Add ourselves into the worker queue. w.pluginWorkerQueue <- w.pluginWork - w.taskWorkerQueue <- w.taskWork select { case work := <-w.pluginWork: @@ -162,7 +238,6 @@ func (w worker) start() { "worker": w.id, "_block": "start", }) - wLogger.Debug("received work") done := false for { if w.isPluginLoaded(work.Plugin.Name(), work.Plugin.TypeName(), work.Plugin.Version()) { @@ -175,7 +250,6 @@ func (w worker) start() { } for _, member := range shuffle(members) { url := fmt.Sprintf("http://%s:%s/v1/plugins/%s/%s/%d", member.GetAddr(), member.GetRESTAPIPort(), work.Plugin.TypeName(), work.Plugin.Name(), work.Plugin.Version()) - wLogger.Debugf("worker-%v is trying %v ", w.id, url) resp, err := http.Get(url) if err != nil { wLogger.Error(err) @@ -188,6 +262,7 @@ func (w worker) start() { f, err := ioutil.TempFile("", fmt.Sprintf("%s-%s-%d", work.Plugin.TypeName(), work.Plugin.Name(), work.Plugin.Version())) if err != nil { wLogger.Error(err) + f.Close() continue } io.Copy(f, resp.Body) @@ -203,7 +278,6 @@ func (w worker) start() { continue } if w.isPluginLoaded(work.Plugin.Name(), work.Plugin.TypeName(), work.Plugin.Version()) { - wLogger.WithField("path", f.Name()).Info("loaded plugin") done = true break } @@ -212,52 +286,8 @@ func (w worker) start() { if done { break } - time.Sleep(200 * time.Millisecond) - } - case work := <-w.taskWork: - done := false - // Receive a work request. - wLogger := workerLogger.WithFields(log.Fields{ - "task": work.Task.ID, - "worker": w.id, - "_block": "start", - }) - wLogger.Debug("received work") - if work.RequestType == TaskCreatedType { - task, _ := w.taskManager.GetTask(work.Task.ID) - if task != nil { - wLogger.Warn("we already have a task with this Id") - } else { - for { - members, err := w.memberManager.GetTaskAgreementMembers() - if err != nil { - wLogger.Error(err) - continue - } - for _, member := range shuffle(members) { - // workerLogger.Error(member) - c := client.New(fmt.Sprintf("http://%s:%s", member.GetAddr(), member.GetRESTAPIPort()), "v1") - taskResult := c.GetTask(uint(work.Task.ID)) - if taskResult.Err != nil { - wLogger.Debug(err) - continue - } - _, err := w.taskManager.CreateTask(getSchedule(taskResult.ScheduledTaskReturned.Schedule), taskResult.Workflow, false) - if err != nil { - wLogger.Error(err) - continue - } - done = true - break - } - if done { - break - } - time.Sleep(200 * time.Millisecond) - } - } + time.Sleep(500 * time.Millisecond) } - case <-w.quitChan: workerLogger.Debugf("Tribe plugin worker-%d is stopping\n", w.id) return @@ -285,10 +315,15 @@ func shuffle(m []Member) []Member { func (w worker) isPluginLoaded(n, t string, v int) bool { catalog := w.pluginManager.PluginCatalog() for _, item := range catalog { + workerLogger.WithFields(log.Fields{ + "name": n, + "version": v, + "type": t, + }).Errorf("loaded plugin.. looking for %v %v %v", item.Name(), item.Version(), item.TypeName()) if item.TypeName() == t && item.Name() == n && item.Version() == v { - workerLogger.WithField("_block", "isPluginLoaded").Info("Plugin already loaded") + workerLogger.WithField("_block", "isPluginLoaded").Error("Plugin already loaded") return true } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 8c3cc8a46..8e4a84f5d 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -39,8 +39,8 @@ type managesMetrics interface { processesMetrics managesPluginContentTypes ValidateDeps([]core.Metric, []core.SubscribedPlugin) []perror.PulseError - SubscribeDeps(uint64, []core.Metric, []core.Plugin) []perror.PulseError - UnsubscribeDeps(uint64, []core.Metric, []core.Plugin) []perror.PulseError + SubscribeDeps(string, []core.Metric, []core.Plugin) []perror.PulseError + UnsubscribeDeps(string, []core.Metric, []core.Plugin) []perror.PulseError } // ManagesPluginContentTypes is an interface to a plugin manager that can tell us what content accept and returns are supported. @@ -201,7 +201,7 @@ func (s *scheduler) CreateTask(sch schedule.Schedule, wfMap *wmap.WorkflowMap, s // RemoveTask given a tasks id. The task must be stopped. // Can return errors ErrTaskNotFound and ErrTaskNotStopped. -func (s *scheduler) RemoveTask(id uint64) error { +func (s *scheduler) RemoveTask(id string) error { t := s.tasks.Get(id) if t == nil { log.WithFields(log.Fields{ @@ -219,8 +219,8 @@ func (s *scheduler) RemoveTask(id uint64) error { } // GetTasks returns a copy of the tasks in a map where the task id is the key -func (s *scheduler) GetTasks() map[uint64]core.Task { - tasks := make(map[uint64]core.Task) +func (s *scheduler) GetTasks() map[string]core.Task { + tasks := make(map[string]core.Task) for id, t := range s.tasks.Table() { tasks[id] = t } @@ -228,7 +228,7 @@ func (s *scheduler) GetTasks() map[uint64]core.Task { } // GetTask provided the task id a task is returned -func (s *scheduler) GetTask(id uint64) (core.Task, error) { +func (s *scheduler) GetTask(id string) (core.Task, error) { task := s.tasks.Get(id) if task == nil { return nil, fmt.Errorf("No task with Id '%v'", id) @@ -237,7 +237,7 @@ func (s *scheduler) GetTask(id uint64) (core.Task, error) { } // StartTask provided a task id a task is started -func (s *scheduler) StartTask(id uint64) []perror.PulseError { +func (s *scheduler) StartTask(id string) []perror.PulseError { t := s.tasks.Get(id) if t == nil { return []perror.PulseError{ @@ -268,7 +268,7 @@ func (s *scheduler) StartTask(id uint64) []perror.PulseError { } // StopTask provided a task id a task is stopped -func (s *scheduler) StopTask(id uint64) []perror.PulseError { +func (s *scheduler) StopTask(id string) []perror.PulseError { t := s.tasks.Get(id) if t == nil { e := fmt.Errorf("No task found with id '%v'", id) @@ -341,7 +341,7 @@ func (s *scheduler) SetMetricManager(mm managesMetrics) { } // -func (s *scheduler) WatchTask(id uint64, tw core.TaskWatcherHandler) (core.TaskWatcherCloser, error) { +func (s *scheduler) WatchTask(id string, tw core.TaskWatcherHandler) (core.TaskWatcherCloser, error) { if task := s.tasks.Get(id); task != nil { a, b := s.taskWatcherColl.add(id, tw) return a, b diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 5235bfd70..1f76f3207 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -80,13 +80,13 @@ func (m *mockMetricManager) ValidateDeps(mts []core.Metric, prs []core.Subscribe } return nil } -func (m *mockMetricManager) SubscribeDeps(taskId uint64, mts []core.Metric, prs []core.Plugin) []perror.PulseError { +func (m *mockMetricManager) SubscribeDeps(taskId string, mts []core.Metric, prs []core.Plugin) []perror.PulseError { return []perror.PulseError{ perror.New(errors.New("metric validation error")), } } -func (m *mockMetricManager) UnsubscribeDeps(taskId uint64, mts []core.Metric, prs []core.Plugin) []perror.PulseError { +func (m *mockMetricManager) UnsubscribeDeps(taskId string, mts []core.Metric, prs []core.Plugin) []perror.PulseError { return nil } @@ -256,7 +256,7 @@ func TestScheduler(t *testing.T) { So(t, ShouldEqual, tsk) }) Convey("error when attempting to get a task that doesn't exist", func() { - t, err := s.GetTask(uint64(1234)) + t, err := s.GetTask("1234") So(err, ShouldNotBeNil) So(t, ShouldBeNil) }) diff --git a/scheduler/task.go b/scheduler/task.go index def899ba2..d0af3c474 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -3,13 +3,12 @@ package scheduler import ( "errors" "fmt" - "strconv" "sync" - "sync/atomic" "time" log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/gomit" + "github.com/pborman/uuid" "github.com/intelsdi-x/pulse/core" "github.com/intelsdi-x/pulse/core/scheduler_event" @@ -34,7 +33,7 @@ var ( type task struct { sync.Mutex //protects state - id uint64 + id string name string schResponseChan chan schedule.Response killChan chan struct{} @@ -61,11 +60,12 @@ func newTask(s schedule.Schedule, wf *schedulerWorkflow, m *workManager, mm mana //Task would always be given a default name. //However if a user want to change this name, she can pass optional arguments, in form of core.TaskOption //The new name then get over written. - taskId := id() - name := "Task-" + string(strconv.FormatInt(int64(taskId), 10)) + + taskID := uuid.New() + name := fmt.Sprintf("Task-%s", taskID) wf.eventEmitter = emitter task := &task{ - id: taskId, + id: taskID, name: name, schResponseChan: make(chan schedule.Response), schedule: s, @@ -117,13 +117,17 @@ func (t *task) SetDeadlineDuration(d time.Duration) { t.deadlineDuration = d } +func (t *task) SetTaskID(id string) { + t.id = id +} + // HitCount returns the number of times the task has fired. func (t *task) HitCount() uint { return t.hitCount } // Id returns the tasks Id. -func (t *task) ID() uint64 { +func (t *task) ID() string { return t.id } @@ -161,6 +165,10 @@ func (t *task) SetStopOnFailure(v uint) { t.stopOnFailure = v } +func (t *task) SetID(id string) { + t.id = id +} + func (t *task) GetStopOnFailure() uint { return t.stopOnFailure } @@ -300,19 +308,19 @@ func (t *task) waitForSchedule() { type taskCollection struct { *sync.Mutex - table map[uint64]*task + table map[string]*task } func newTaskCollection() *taskCollection { return &taskCollection{ Mutex: &sync.Mutex{}, - table: make(map[uint64]*task), + table: make(map[string]*task), } } // Get given a task id returns a Task or nil if not found -func (t *taskCollection) Get(id uint64) *task { +func (t *taskCollection) Get(id string) *task { t.Lock() defer t.Unlock() @@ -368,19 +376,12 @@ func (t *taskCollection) remove(task *task) error { } // Table returns a copy of the taskCollection -func (t *taskCollection) Table() map[uint64]*task { +func (t *taskCollection) Table() map[string]*task { t.Lock() defer t.Unlock() - tasks := make(map[uint64]*task) + tasks := make(map[string]*task) for id, t := range t.table { tasks[id] = t } return tasks } - -var idCounter uint64 - -// id generates the sequential next id (starting from 0) -func id() uint64 { - return atomic.AddUint64(&idCounter, 1) -} diff --git a/scheduler/task_test.go b/scheduler/task_test.go index 0fef6a625..252f12a8e 100644 --- a/scheduler/task_test.go +++ b/scheduler/task_test.go @@ -1,7 +1,6 @@ package scheduler import ( - "strconv" "testing" "time" @@ -50,7 +49,7 @@ func TestTask(t *testing.T) { sch := schedule.NewSimpleSchedule(time.Millisecond * 100) task := newTask(sch, wf, newWorkManager(), c, emitter) task.Spin() - So(task.GetName(), ShouldResemble, "Task-"+string(strconv.FormatInt(int64(task.ID()), 10))) + So(task.GetName(), ShouldResemble, "Task-"+task.ID()) }) @@ -138,13 +137,13 @@ func TestTask(t *testing.T) { }) Convey("Attempt to get task with an invalid Id", func() { - t := taskCollection.Get(1234) + t := taskCollection.Get("1234") So(t, ShouldBeNil) }) Convey("Create another task and compare the id", func() { task2 := newTask(sch, wf, newWorkManager(), &mockMetricManager{}, emitter) - So(task2.id, ShouldEqual, task.id+1) + So(task2.id, ShouldNotEqual, task.ID()) }) }) diff --git a/scheduler/watcher.go b/scheduler/watcher.go index 8443c652e..10a75d15a 100644 --- a/scheduler/watcher.go +++ b/scheduler/watcher.go @@ -14,7 +14,7 @@ var ( type TaskWatcher struct { id uint64 - taskIds []uint64 + taskIds []string parent *taskWatcherCollection stopped bool handler core.TaskWatcherHandler @@ -30,20 +30,20 @@ func (t *TaskWatcher) Close() error { type taskWatcherCollection struct { // Collection of task watchers by - coll map[uint64][]*TaskWatcher + coll map[string][]*TaskWatcher tIdCounter uint64 mutex *sync.Mutex } func newTaskWatcherCollection() *taskWatcherCollection { return &taskWatcherCollection{ - coll: make(map[uint64][]*TaskWatcher), + coll: make(map[string][]*TaskWatcher), tIdCounter: 1, mutex: &sync.Mutex{}, } } -func (t *taskWatcherCollection) rm(taskId uint64, tw *TaskWatcher) { +func (t *taskWatcherCollection) rm(taskId string, tw *TaskWatcher) { t.mutex.Lock() defer t.mutex.Unlock() if t.coll[taskId] != nil { @@ -62,7 +62,7 @@ func (t *taskWatcherCollection) rm(taskId uint64, tw *TaskWatcher) { } } -func (t *taskWatcherCollection) add(taskId uint64, twh core.TaskWatcherHandler) (*TaskWatcher, error) { +func (t *taskWatcherCollection) add(taskId string, twh core.TaskWatcherHandler) (*TaskWatcher, error) { t.mutex.Lock() defer t.mutex.Unlock() // init map for task ID if it does not eist @@ -90,7 +90,7 @@ func (t *taskWatcherCollection) add(taskId uint64, twh core.TaskWatcherHandler) return tw, nil } -func (t *taskWatcherCollection) handleMetricCollected(taskId uint64, m []core.Metric) { +func (t *taskWatcherCollection) handleMetricCollected(taskId string, m []core.Metric) { t.mutex.Lock() defer t.mutex.Unlock() // no taskID means no watches, early exit @@ -113,7 +113,7 @@ func (t *taskWatcherCollection) handleMetricCollected(taskId uint64, m []core.Me } } -func (t *taskWatcherCollection) handleTaskStarted(taskId uint64) { +func (t *taskWatcherCollection) handleTaskStarted(taskId string) { t.mutex.Lock() defer t.mutex.Unlock() // no taskID means no watches, early exit @@ -136,7 +136,7 @@ func (t *taskWatcherCollection) handleTaskStarted(taskId uint64) { } } -func (t *taskWatcherCollection) handleTaskStopped(taskId uint64) { +func (t *taskWatcherCollection) handleTaskStopped(taskId string) { t.mutex.Lock() defer t.mutex.Unlock() // no taskID means no watches, early exit @@ -159,7 +159,7 @@ func (t *taskWatcherCollection) handleTaskStopped(taskId uint64) { } } -func (t *taskWatcherCollection) handleTaskDisabled(taskId uint64, why string) { +func (t *taskWatcherCollection) handleTaskDisabled(taskId string, why string) { t.mutex.Lock() defer t.mutex.Unlock() // no taskID means no watches, early exit diff --git a/scheduler/watcher_test.go b/scheduler/watcher_test.go index 3445854fa..c8a0534e3 100644 --- a/scheduler/watcher_test.go +++ b/scheduler/watcher_test.go @@ -46,20 +46,20 @@ func TestTaskWatching(t *testing.T) { d2 := &dummyCatcher{} d3 := &dummyCatcher{} - twc.add(1, d1) - x, _ := twc.add(1, d2) - y, _ := twc.add(2, d2) - twc.add(3, d3) + twc.add("1", d1) + x, _ := twc.add("1", d2) + y, _ := twc.add("2", d2) + twc.add("3", d3) - So(len(twc.coll[1]), ShouldEqual, 2) - So(len(twc.coll[2]), ShouldEqual, 1) + So(len(twc.coll["1"]), ShouldEqual, 2) + So(len(twc.coll["2"]), ShouldEqual, 1) So(len(twc.coll), ShouldEqual, 3) - twc.handleMetricCollected(1, nil) - twc.handleMetricCollected(1, nil) - twc.handleMetricCollected(2, nil) - twc.handleMetricCollected(1, nil) - twc.handleMetricCollected(2, nil) + twc.handleMetricCollected("1", nil) + twc.handleMetricCollected("1", nil) + twc.handleMetricCollected("2", nil) + twc.handleMetricCollected("1", nil) + twc.handleMetricCollected("2", nil) So(d1.count, ShouldEqual, 3) So(d2.count, ShouldEqual, 5) @@ -69,15 +69,15 @@ func TestTaskWatching(t *testing.T) { x.Close() y.Close() - So(len(twc.coll[1]), ShouldEqual, 1) - So(len(twc.coll[2]), ShouldEqual, 0) + So(len(twc.coll["1"]), ShouldEqual, 1) + So(len(twc.coll["2"]), ShouldEqual, 0) So(len(twc.coll), ShouldEqual, 2) - twc.handleMetricCollected(1, nil) - twc.handleMetricCollected(1, nil) - twc.handleMetricCollected(2, nil) - twc.handleMetricCollected(1, nil) - twc.handleMetricCollected(2, nil) + twc.handleMetricCollected("1", nil) + twc.handleMetricCollected("1", nil) + twc.handleMetricCollected("2", nil) + twc.handleMetricCollected("1", nil) + twc.handleMetricCollected("2", nil) So(d1.count, ShouldEqual, 6) So(d2.count, ShouldEqual, 5)