Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Changes schedule.task.id from uint64 to a uuid
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Oct 1, 2015
1 parent 69add01 commit 390c66f
Show file tree
Hide file tree
Showing 27 changed files with 589 additions and 422 deletions.
58 changes: 22 additions & 36 deletions cmd/pulsectl/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"text/tabwriter"
Expand Down Expand Up @@ -96,7 +95,7 @@ func createTaskUsingTaskManifest(ctx *cli.Context) {
os.Exit(1)
}
fmt.Println("Task created")
fmt.Printf("ID: %d\n", r.ID)
fmt.Printf("ID: %s\n", r.ID)
fmt.Printf("Name: %s\n", r.Name)
fmt.Printf("State: %s\n", r.State)
}
Expand Down Expand Up @@ -211,7 +210,7 @@ func createTaskUsingWFManifest(ctx *cli.Context) {
os.Exit(1)
}
fmt.Println("Task created")
fmt.Printf("ID: %d\n", r.ID)
fmt.Printf("ID: %s\n", r.ID)
fmt.Printf("Name: %s\n", r.Name)
fmt.Printf("State: %s\n", r.State)
}
Expand Down Expand Up @@ -285,19 +284,14 @@ func watchTask(ctx *cli.Context) {
os.Exit(1)
}

id, err := strconv.ParseUint(ctx.Args().First(), 0, 64)
if err != nil {
fmt.Printf("Incorrect usage - %v\n", err.Error())
cli.ShowCommandHelp(ctx, ctx.Command.Name)
os.Exit(1)
}
r := pClient.WatchTask(uint(id))
id := ctx.Args().First()
r := pClient.WatchTask(id)
if r.Err != nil {
fmt.Printf("Error starting task:\n%v\n", r.Err)
cli.ShowCommandHelp(ctx, ctx.Command.Name)
os.Exit(1)
}
fmt.Printf("Watching Task (%d):\n", id)
fmt.Printf("Watching Task (%s):\n", id)

// catch interrupt so we signal the server we are done before exiting
c := make(chan os.Signal, 1)
Expand Down Expand Up @@ -354,18 +348,14 @@ func startTask(ctx *cli.Context) {
os.Exit(1)
}

id, err := strconv.ParseUint(ctx.Args().First(), 0, 64)
if err != nil {
fmt.Printf("Incorrect usage - %v\n", err.Error())
os.Exit(1)
}
r := pClient.StartTask(int(id))
id := ctx.Args().First()
r := pClient.StartTask(id)
if r.Err != nil {
fmt.Printf("Error starting task:\n%v\n", r.Err)
os.Exit(1)
}
fmt.Println("Task started:")
fmt.Printf("ID: %d\n", r.ID)
fmt.Printf("ID: %s\n", r.ID)
}

func stopTask(ctx *cli.Context) {
Expand All @@ -375,18 +365,14 @@ func stopTask(ctx *cli.Context) {
os.Exit(1)
}

id, err := strconv.ParseUint(ctx.Args().First(), 0, 64)
if err != nil {
fmt.Printf("Incorrect usage - %v\n", err.Error())
os.Exit(1)
}
r := pClient.StopTask(int(id))
id := ctx.Args().First()
r := pClient.StopTask(id)
if r.Err != nil {
fmt.Printf("Error stopping task:\n%v\n", r.Err)
os.Exit(1)
}
fmt.Println("Task stopped:")
fmt.Printf("ID: %d\n", r.ID)
fmt.Printf("ID: %s\n", r.ID)
}

func removeTask(ctx *cli.Context) {
Expand All @@ -396,18 +382,14 @@ func removeTask(ctx *cli.Context) {
os.Exit(1)
}

id, err := strconv.ParseUint(ctx.Args().First(), 0, 64)
if err != nil {
fmt.Printf("Incorrect usage - %v\n", err.Error())
os.Exit(1)
}
r := pClient.RemoveTask(int(id))
id := ctx.Args().First()
r := pClient.RemoveTask(id)
if r.Err != nil {
fmt.Printf("Error stopping task:\n%v\n", r.Err)
os.Exit(1)
}
fmt.Println("Task removed:")
fmt.Printf("ID: %d\n", r.ID)
fmt.Printf("ID: %s\n", r.ID)
}

func exportTask(ctx *cli.Context) {
Expand All @@ -416,12 +398,16 @@ func exportTask(ctx *cli.Context) {
cli.ShowCommandHelp(ctx, ctx.Command.Name)
os.Exit(1)
}
id, err := strconv.ParseUint(ctx.Args().First(), 0, 32)
if err != nil {
fmt.Printf("Incorrect usage - %v\n", err.Error())
id := ctx.Args().First()
task := pClient.GetTask(id)
if task.Err != nil {
fmt.Printf("Error exporting task:\n%v\n", task.Err)
os.Exit(1)
}
task := pClient.GetTask(uint(id))
tb, err := json.Marshal(task)
if err != nil {
fmt.Printf("Error exporting task:\n%v\n", err)
os.Exit(1)
}
fmt.Println(string(tb))
}
12 changes: 6 additions & 6 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ type apPool struct {
key string

// The subscriptions to this pool.
subs map[uint64]*subscription
subs map[string]*subscription

// The plugins in the pool.
// the primary key is an increasing --> uint from
Expand All @@ -282,7 +282,7 @@ func newPool(key string, plugins ...*availablePlugin) (*apPool, error) {
RWMutex: &sync.RWMutex{},
version: ver,
key: key,
subs: map[uint64]*subscription{},
subs: map[string]*subscription{},
plugins: make(map[uint32]*availablePlugin),
max: maximumRunningPlugins,
concurrencyCount: 1,
Expand Down Expand Up @@ -329,7 +329,7 @@ func (p *apPool) insert(ap *availablePlugin) error {

// subscribe adds a subscription to the pool.
// Using subscribe is idempotent.
func (p *apPool) subscribe(taskId uint64, subType subscriptionType) {
func (p *apPool) subscribe(taskId string, subType subscriptionType) {
p.Lock()
defer p.Unlock()

Expand All @@ -346,7 +346,7 @@ func (p *apPool) subscribe(taskId uint64, subType subscriptionType) {

// subscribe adds a subscription to the pool.
// Using unsubscribe is idempotent.
func (p *apPool) unsubscribe(taskId uint64) {
func (p *apPool) unsubscribe(taskId string) {
p.Lock()
defer p.Unlock()
delete(p.subs, taskId)
Expand Down Expand Up @@ -435,7 +435,7 @@ func (p *apPool) count() int {
}

// NOTE: The data returned by subscriptions should be constant and read only.
func (p *apPool) subscriptions() map[uint64]*subscription {
func (p *apPool) subscriptions() map[string]*subscription {
p.RLock()
defer p.RUnlock()
return p.subs
Expand Down Expand Up @@ -492,7 +492,7 @@ func (p *apPool) moveSubscriptions(to *apPool) []subscription {
type subscription struct {
subType subscriptionType
version int
taskId uint64
taskId string
}

type availablePlugins struct {
Expand Down
8 changes: 4 additions & 4 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []pe
return plugins, nil
}

func (p *pluginControl) SubscribeDeps(taskId uint64, mts []core.Metric, plugins []core.Plugin) []perror.PulseError {
func (p *pluginControl) SubscribeDeps(taskId string, mts []core.Metric, plugins []core.Plugin) []perror.PulseError {
var perrs []perror.PulseError

collectors, errs := p.gatherCollectors(mts)
Expand Down Expand Up @@ -480,7 +480,7 @@ func (p *pluginControl) SubscribeDeps(taskId uint64, mts []core.Metric, plugins
return perrs
}

func (p *pluginControl) sendPluginSubscriptionEvent(taskId uint64, pl core.Plugin) perror.PulseError {
func (p *pluginControl) sendPluginSubscriptionEvent(taskId string, pl core.Plugin) perror.PulseError {
pt, err := core.ToPluginType(pl.TypeName())
if err != nil {
return perror.New(err)
Expand All @@ -501,7 +501,7 @@ func (p *pluginControl) sendPluginSubscriptionEvent(taskId uint64, pl core.Plugi
return nil
}

func (p *pluginControl) UnsubscribeDeps(taskId uint64, mts []core.Metric, plugins []core.Plugin) []perror.PulseError {
func (p *pluginControl) UnsubscribeDeps(taskId string, mts []core.Metric, plugins []core.Plugin) []perror.PulseError {
var perrs []perror.PulseError

collectors, errs := p.gatherCollectors(mts)
Expand All @@ -528,7 +528,7 @@ func (p *pluginControl) UnsubscribeDeps(taskId uint64, mts []core.Metric, plugin
return perrs
}

func (p *pluginControl) sendPluginUnsubscriptionEvent(taskId uint64, pl core.Plugin) perror.PulseError {
func (p *pluginControl) sendPluginUnsubscriptionEvent(taskId string, pl core.Plugin) perror.PulseError {
pt, err := core.ToPluginType(pl.TypeName())
if err != nil {
return perror.New(err)
Expand Down
12 changes: 6 additions & 6 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,8 +664,8 @@ func TestCollectMetrics(t *testing.T) {
So(lp, ShouldNotBeNil)
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:dummy1:1")
So(errp, ShouldBeNil)
pool.subscribe(1, unboundSubscriptionType)
err = c.sendPluginSubscriptionEvent(1, lp)
pool.subscribe("1", unboundSubscriptionType)
err = c.sendPluginSubscriptionEvent("1", lp)
So(err, ShouldBeNil)
m = append(m, m1, m2)

Expand Down Expand Up @@ -758,8 +758,8 @@ func TestPublishMetrics(t *testing.T) {
}
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("publisher:file:1")
So(errp, ShouldBeNil)
pool.subscribe(1, unboundSubscriptionType)
errs := c.sendPluginSubscriptionEvent(1, p)
pool.subscribe("1", unboundSubscriptionType)
errs := c.sendPluginSubscriptionEvent("1", p)
So(errs, ShouldBeNil)
<-lpe.done
time.Sleep(1500 * time.Millisecond)
Expand Down Expand Up @@ -815,8 +815,8 @@ func TestProcessMetrics(t *testing.T) {
}
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("processor:passthru:1")
So(errp, ShouldBeNil)
pool.subscribe(1, unboundSubscriptionType)
errs := c.sendPluginSubscriptionEvent(1, p)
pool.subscribe("1", unboundSubscriptionType)
errs := c.sendPluginSubscriptionEvent("1", p)
So(errs, ShouldBeNil)
<-lpe.done
time.Sleep(1500 * time.Millisecond)
Expand Down
3 changes: 3 additions & 0 deletions control/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ func (mc *metricCatalog) get(ns []string, ver int) (*metricType, perror.PulseErr
if err != nil {
return nil, err
}
if mts == nil {
return nil, perror.New(errMetricNotFound)
}
// a version IS given
if ver > 0 {
l, err := getVersion(mts, ver)
Expand Down
4 changes: 2 additions & 2 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (r *runner) runPlugin(path string) {
}
}

func (r *runner) handleSubscription(pType, pName string, pVersion int, taskId uint64, subType subscriptionType) {
func (r *runner) handleSubscription(pType, pName string, pVersion int, taskId string, subType subscriptionType) {
pool, err := r.availablePlugins.getPool(fmt.Sprintf("%s:%s:%d", pType, pName, pVersion))
if err != nil {
runnerLog.WithFields(log.Fields{
Expand Down Expand Up @@ -430,7 +430,7 @@ func (r *runner) handleSubscription(pType, pName string, pVersion int, taskId ui
r.runPlugin(plugin.Path)
}
}
func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskId uint64) error {
func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskId string) error {
pool, err := r.availablePlugins.getPool(fmt.Sprintf("%s:%s:%d", pType, pName, pVersion))
if err != nil {
runnerLog.WithFields(log.Fields{
Expand Down
6 changes: 3 additions & 3 deletions core/control_event/control_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ type PluginSubscriptionEvent struct {
PluginVersion int
PluginType int
SubscriptionType int
TaskId uint64
TaskId string
}

func (ps PluginSubscriptionEvent) Namespace() string {
return PluginSubscribed
}

type PluginUnsubscriptionEvent struct {
TaskId uint64
TaskId string
PluginName string
PluginVersion int
PluginType int
Expand All @@ -95,7 +95,7 @@ func (hfe HealthCheckFailedEvent) Namespace() string {
}

type MovePluginSubscriptionEvent struct {
TaskId uint64
TaskId string
PluginName string
PreviousVersion int
NewVersion int
Expand Down
14 changes: 7 additions & 7 deletions core/scheduler_event/scheduler_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ const (
)

type TaskStartedEvent struct {
TaskID uint64
TaskID string
}

func (e TaskStartedEvent) Namespace() string {
return TaskStarted
}

type TaskCreatedEvent struct {
TaskID uint64
TaskID string
StartOnCreate bool
}

Expand All @@ -32,23 +32,23 @@ func (e TaskCreatedEvent) Namespace() string {
}

type TaskDeletedEvent struct {
TaskID uint64
TaskID string
}

func (e TaskDeletedEvent) Namespace() string {
return TaskDeleted
}

type TaskStoppedEvent struct {
TaskID uint64
TaskID string
}

func (e TaskStoppedEvent) Namespace() string {
return TaskStopped
}

type TaskDisabledEvent struct {
TaskID uint64
TaskID string
Why string
}

Expand All @@ -57,7 +57,7 @@ func (e TaskDisabledEvent) Namespace() string {
}

type MetricCollectedEvent struct {
TaskID uint64
TaskID string
Metrics []core.Metric
}

Expand All @@ -66,7 +66,7 @@ func (e MetricCollectedEvent) Namespace() string {
}

type MetricCollectionFailedEvent struct {
TaskID uint64
TaskID string
Errors []error
}

Expand Down
Loading

0 comments on commit 390c66f

Please sign in to comment.