Skip to content

Commit

Permalink
Merge pull request linkedin#30 from linkedin/zkoffsets
Browse files Browse the repository at this point in the history
Support Zookeeper committed offsets
  • Loading branch information
toddpalino committed Nov 3, 2015
2 parents 4e62680 + a3c3c34 commit 856acd3
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 30 deletions.
17 changes: 14 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ type BurrowConfig struct {
ZookeeperPort int `gcfg:"zookeeper-port"`
ZookeeperPath string `gcfg:"zookeeper-path"`
OffsetsTopic string `gcfg:"offsets-topic"`
ZKOffsets bool `gcfg:"zookeeper-offsets"`
}
Tickers struct {
BrokerOffsets int `gcfg:"broker-offsets"`
}
Lagcheck struct {
Intervals int `gcfg:"intervals"`
MinDistance int64 `gcfg:"min-distance"`
ExpireGroup int64 `gcfg:"expire-group"`
Intervals int `gcfg:"intervals"`
MinDistance int64 `gcfg:"min-distance"`
ExpireGroup int64 `gcfg:"expire-group"`
ZKCheck int64 `gcfg:"zookeeper-interval"`
ZKGroupRefresh int64 `gcfg:"zk-group-refresh"`
}
Httpserver struct {
Enable bool `gcfg:"server"`
Expand All @@ -78,6 +81,8 @@ type BurrowConfig struct {
Extras []string `gcfg:"extra"`
TemplatePost string `gcfg:"template-post"`
TemplateDelete string `gcfg:"template-delete"`
Timeout int `gcfg:"timeout"`
Keepalive int `gcfg:"keepalive"`
}
}

Expand Down Expand Up @@ -204,6 +209,12 @@ func ValidateConfig(app *ApplicationContext) error {
if app.Config.Lagcheck.ExpireGroup == 0 {
app.Config.Lagcheck.ExpireGroup = 604800
}
if app.Config.Lagcheck.ZKCheck == 0 {
app.Config.Lagcheck.ZKCheck = 60
}
if app.Config.Lagcheck.ZKGroupRefresh == 0 {
app.Config.Lagcheck.ZKGroupRefresh = 300
}

// HTTP Server
if app.Config.Httpserver.Enable {
Expand Down
2 changes: 2 additions & 0 deletions config/burrow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,5 @@ extra=app=burrow
extra=tier=STG
template-post=config/default-http-post.tmpl
template-delete=config/default-http-delete.tmpl
timeout=5
keepalive=30
2 changes: 1 addition & 1 deletion config/default-http-post.tmpl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.Id}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","complete":{{.Result.Complete}},"partitions":{{.Result.Partitions | jsonencoder}}}}]}
{"api_key":"{{index .Extras "api_key"}}","app":"{{index .Extras "app"}}","block":false,"events":[{"id":"{{.Id}}","event":{"severity":"{{if eq .Result.Status 2}}WARN{{else}}ERR{{end}}","tier":"{{index .Extras "tier"}}","group":"{{.Result.Group}}","start":"{{.Start.Format "Jan 02, 2006 15:04:05 UTC"}}","complete":{{.Result.Complete}},"partitions":{{.Result.Partitions | jsonencoder}}}}]}
50 changes: 33 additions & 17 deletions http_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
log "github.com/cihub/seelog"
"github.com/pborman/uuid"
"net"
"net/http"
"os"
"strings"
Expand All @@ -28,13 +29,14 @@ type HttpNotifier struct {
extras map[string]string
ticker *time.Ticker
quitChan chan struct{}
groupIds map[string]map[string]string
groupIds map[string]map[string]Event
resultsChannel chan *ConsumerGroupStatus
httpClient *http.Client
}

type Event struct {
Result *ConsumerGroupStatus
Id string
Id string
Start time.Time
}

func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
Expand Down Expand Up @@ -72,8 +74,17 @@ func NewHttpNotifier(app *ApplicationContext) (*HttpNotifier, error) {
templateDelete: templateDelete,
extras: extras,
quitChan: make(chan struct{}),
groupIds: make(map[string]map[string]string),
groupIds: make(map[string]map[string]Event),
resultsChannel: make(chan *ConsumerGroupStatus),
httpClient: &http.Client{
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: time.Duration(app.Config.Httpnotifier.Timeout) * time.Second,
KeepAlive: time.Duration(app.Config.Httpnotifier.Keepalive) * time.Second,
}).Dial,
TLSHandshakeTimeout: time.Duration(app.Config.Httpnotifier.Timeout) * time.Second,
},
},
}, nil
}

Expand All @@ -96,12 +107,15 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
if result.Status >= StatusWarning {
if _, ok := notifier.groupIds[result.Cluster]; !ok {
// Create the cluster map
notifier.groupIds[result.Cluster] = make(map[string]string)
notifier.groupIds[result.Cluster] = make(map[string]Event)
}
if _, ok := notifier.groupIds[result.Cluster][result.Group]; !ok {
// Create Event and Id
eventId := uuid.NewRandom()
notifier.groupIds[result.Cluster][result.Group] = eventId.String()
notifier.groupIds[result.Cluster][result.Group] = Event{
Id: eventId.String(),
Start: time.Now(),
}
}

// NOTE - I'm leaving the JsonEncode item in here so as not to break compatibility. New helpers go in the FuncMap above
Expand All @@ -110,13 +124,15 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
Result *ConsumerGroupStatus
JsonEncode func(interface{}) string
}{
Cluster: result.Cluster,
Group: result.Group,
Id: notifier.groupIds[result.Cluster][result.Group],
Id: notifier.groupIds[result.Cluster][result.Group].Id,
Start: notifier.groupIds[result.Cluster][result.Group].Start,
Extras: notifier.extras,
Result: result,
JsonEncode: templateJsonEncoder,
Expand All @@ -130,20 +146,19 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
req, err := http.NewRequest("POST", notifier.app.Config.Httpnotifier.Url, bytesToSend)
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
resp, err := notifier.httpClient.Do(req)
if err != nil {
log.Errorf("Failed to send POST (Id %s): %v", notifier.groupIds[result.Cluster][result.Group], err)
log.Errorf("Failed to send POST (Id %s): %v", notifier.groupIds[result.Cluster][result.Group].Id, err)
return
}
defer resp.Body.Close()

if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Infof("Sent POST for group %s in cluster %s at severity %v (Id %s)", result.Group,
result.Cluster, result.Status, notifier.groupIds[result.Cluster][result.Group])
result.Cluster, result.Status, notifier.groupIds[result.Cluster][result.Group].Id)
} else {
log.Errorf("Failed to send POST for group %s in cluster %s at severity %v (Id %s): %s", result.Group,
result.Cluster, result.Status, notifier.groupIds[result.Cluster][result.Group], resp.Status)
result.Cluster, result.Status, notifier.groupIds[result.Cluster][result.Group].Id, resp.Status)
}
} else {
if _, ok := notifier.groupIds[result.Cluster][result.Group]; ok {
Expand All @@ -153,11 +168,13 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
Cluster string
Group string
Id string
Start time.Time
Extras map[string]string
}{
Cluster: result.Cluster,
Group: result.Group,
Id: notifier.groupIds[result.Cluster][result.Group],
Id: notifier.groupIds[result.Cluster][result.Group].Id,
Start: notifier.groupIds[result.Cluster][result.Group].Start,
Extras: notifier.extras,
})
if err != nil {
Expand All @@ -168,8 +185,7 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat
req, err := http.NewRequest("DELETE", notifier.app.Config.Httpnotifier.Url, bytesToSend)
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
resp, err := notifier.httpClient.Do(req)
if err != nil {
log.Errorf("Failed to send DELETE: %v", err)
return
Expand All @@ -178,10 +194,10 @@ func (notifier *HttpNotifier) handleEvaluationResponse(result *ConsumerGroupStat

if (resp.StatusCode >= 200) && (resp.StatusCode <= 299) {
log.Infof("Sent DELETE for group %s in cluster %s (Id %s)", result.Group, result.Cluster,
notifier.groupIds[result.Cluster][result.Group])
notifier.groupIds[result.Cluster][result.Group].Id)
} else {
log.Errorf("Failed to send DELETE for group %s in cluster %s (Id %s): %s", result.Group,
result.Cluster, notifier.groupIds[result.Cluster][result.Group], resp.Status)
result.Cluster, notifier.groupIds[result.Cluster][result.Group].Id, resp.Status)
}

// Remove ID for group that is now clear
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/samuel/go-zookeeper/zk"
"os"
"os/signal"
"runtime"
"syscall"
"time"
)
Expand Down Expand Up @@ -197,6 +198,8 @@ func burrowMain() int {
}

func main() {
runtime.GOMAXPROCS(runtime.NumCPU())

rv := burrowMain()
if rv != 0 {
fmt.Println("Burrow failed at", time.Now().Format("January 2, 2006 at 3:04pm (MST)"))
Expand Down
6 changes: 3 additions & 3 deletions offsets_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (storage *OffsetStorage) addConsumerOffset(offset *PartitionOffset) {
if storage.offsets[offset.Cluster].consumer[offset.Group][offset.Topic][offset.Partition] == nil {
storage.offsets[offset.Cluster].consumer[offset.Group][offset.Topic][offset.Partition] = ring.New(storage.app.Config.Lagcheck.Intervals)
} else {
// The minimum time as configured since the last offset commit has gone by
// Prevent old offset commits, and new commits that are too fast (less than the min-distance config)
previousTimestamp := storage.offsets[offset.Cluster].consumer[offset.Group][offset.Topic][offset.Partition].Prev().Value.(*ConsumerOffset).Timestamp
if offset.Timestamp-previousTimestamp < (storage.app.Config.Lagcheck.MinDistance * 1000) {
storage.offsets[offset.Cluster].consumerLock.Unlock()
Expand Down Expand Up @@ -365,7 +365,7 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
offsetList[topic] = make([][]ConsumerOffset, len(partitions))
for partition, offsetRing := range partitions {
// If we don't have our ring full yet, make sure we let the caller know
if (offsetRing == nil) || (offsetRing.Prev().Value == nil) || (offsetRing.Value == nil) {
if (offsetRing == nil) || (offsetRing.Value == nil) {
status.Complete = false
continue
}
Expand All @@ -387,7 +387,7 @@ func (storage *OffsetStorage) evaluateGroup(cluster string, group string, result
}
storage.offsets[cluster].consumerLock.RUnlock()

// If the youngest offset is older than our expiration window, flush the group
// If the youngest offset is earlier than our expiration window, flush the group
if (youngestOffset > 0) && (youngestOffset < ((time.Now().Unix() - storage.app.Config.Lagcheck.ExpireGroup) * 1000)) {
storage.offsets[cluster].consumerLock.Lock()
log.Infof("Removing expired group %s from cluster %s", group, cluster)
Expand Down
Loading

0 comments on commit 856acd3

Please sign in to comment.