This repository has been archived by the owner on Feb 7, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlifecycle.go
227 lines (176 loc) · 5.17 KB
/
lifecycle.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
/*
Package lifecycle provides events that services in the Choria eco system
emit during startup, shutdown, provisioning and general running.
These events can be used by other tools to react to events or monitor the
running of a Chroia network.
A library to view the events received from the network and one to create
a running tally of the count and versions of nodes on your network.
*/
package lifecycle
import (
"encoding/json"
"errors"
"fmt"
"sort"
"strings"
cloudevents "github.com/cloudevents/sdk-go"
"github.com/tidwall/gjson"
)
// PublishConnector is a connection to the middleware
type PublishConnector interface {
PublishRaw(target string, data []byte) error
}
// Type is a type of event this system supports
type Type int
const (
_ = iota
// Startup is an event components can publish when they start
Startup Type = iota
// Shutdown is an event components can publish when they shutdown
Shutdown
// Provisioned is an event components can publish post provisioning
Provisioned
// Alive is an event components can publish to indicate they are still alive
Alive
)
// Format is the event format used for transporting events
type Format int
const (
// UnknownFormat is a unknown format message
UnknownFormat Format = iota
// ChoriaFormat is classical ChoriaFormat lifecycle events in its own package
ChoriaFormat
// CloudEventV1Format is a classical Choria lifecycle event carried within a version 1.0 CloudEvent
CloudEventV1Format
)
var eventTypes = make(map[string]Type)
var eventJSONParsers = make(map[Type]func([]byte) (Event, error))
var eventFactories = make(map[Type]func(...Option) Event)
// New creates a new event
func New(t Type, opts ...Option) (Event, error) {
factory, ok := eventFactories[t]
if !ok {
return nil, errors.New("unknown event type")
}
return factory(opts...), nil
}
// EventTypeNames produce a list of valid event type names
func EventTypeNames() []string {
names := []string{}
for k := range eventTypes {
names = append(names, k)
}
sort.Strings(names)
return names
}
// EventFormatFromJSON inspects the JSON data and tries to determine the format from it's content
func EventFormatFromJSON(j []byte) Format {
protocol := gjson.GetBytes(j, "protocol")
if protocol.Exists() && strings.HasPrefix(protocol.String(), "io.choria.lifecycle") {
return ChoriaFormat
}
specversion := gjson.GetBytes(j, "specversion")
source := gjson.GetBytes(j, "source")
if specversion.Exists() && source.Exists() {
if specversion.String() == "1.0" && source.String() == "io.choria.lifecycle" {
return CloudEventV1Format
}
}
return UnknownFormat
}
// NewFromJSON creates an event from the event JSON
func NewFromJSON(j []byte) (event Event, err error) {
format := EventFormatFromJSON(j)
switch format {
case ChoriaFormat:
event, err = choriaFormatNewFromJSON(j)
case CloudEventV1Format:
event, err = cloudeventV1FormatNewFromJSON(j)
default:
return nil, fmt.Errorf("unsupported event format")
}
if err != nil {
return nil, err
}
event.SetFormat(format)
return event, nil
}
func cloudeventV1FormatNewFromJSON(j []byte) (Event, error) {
event := cloudevents.NewEvent("1.0")
err := event.UnmarshalJSON(j)
if err != nil {
return nil, err
}
data, err := event.DataBytes()
if err != nil {
return nil, err
}
return NewFromJSON(data)
}
func choriaFormatNewFromJSON(j []byte) (Event, error) {
protocol := gjson.GetBytes(j, "protocol")
proto, err := protoStringToTypeString(protocol.String())
if err != nil {
return nil, err
}
etype, ok := eventTypes[proto]
if !ok {
return nil, fmt.Errorf("unknown protocol '%s' received", protocol.String())
}
factory, ok := eventJSONParsers[etype]
if !ok {
return nil, fmt.Errorf("cannot create %s event type from JSON", proto)
}
return factory(j)
}
// turns io.choria.lifecycle.v1.provisioned or choria:lifecycle:provisioned:1 into provisioned
func protoStringToTypeString(proto string) (eventType string, err error) {
if strings.HasPrefix(proto, "choria:lifecycle") {
parts := strings.Split(proto, ":")
if len(parts) == 4 {
return parts[2], nil
}
return "", fmt.Errorf("unknown protocol '%s' received", proto)
}
if strings.HasPrefix(proto, "io.choria.lifecycle") {
parts := strings.Split(proto, ".")
if len(parts) == 5 {
return parts[4], nil
}
return "", fmt.Errorf("unknown protocol '%s' received", proto)
}
return "", fmt.Errorf("invalid protocol '%s' received", proto)
}
// ToCloudEventV1 converts an event to a CloudEvent version 1
func ToCloudEventV1(e Event) cloudevents.Event {
event := cloudevents.NewEvent("1.0")
event.SetType(e.Protocol())
event.SetSource("io.choria.lifecycle")
event.SetSubject(e.Identity())
event.SetID(e.ID())
event.SetTime(e.TimeStamp())
event.SetData(e)
return event
}
// PublishEvent publishes an event
func PublishEvent(e Event, conn PublishConnector) error {
var j []byte
var err error
switch e.Format() {
case ChoriaFormat:
j, err = json.Marshal(e)
case CloudEventV1Format:
j, err = ToCloudEventV1(e).MarshalJSON()
default:
err = fmt.Errorf("do not know how to publish this format event")
}
if err != nil {
return err
}
target, err := e.Target()
if err != nil {
return err
}
conn.PublishRaw(target, j)
return nil
}