Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Aravind Srinivasan committed Dec 27, 2016
0 parents commit 79206f9
Show file tree
Hide file tree
Showing 55 changed files with 9,255 additions and 0 deletions.
14 changes: 14 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
*.out
*.test
*.xml
*.swp
*.cov
*.html
*.tmp
test
test.log
.rewrite
vendor/

# produced executable(s)
example
20 changes: 20 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Copyright (c) 2016 Uber Technologies, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

36 changes: 36 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
.PHONY: bins test clean
PROJECT_ROOT=github.com/uber/cherami-client-go

export PATH := $(GOPATH)/bin:$(PATH)

PROGS = example
export GO15VENDOREXPERIMENT=1
NOVENDOR = $(shell GO15VENDOREXPERIMENT=1 glide novendor)

export PATH := $(GOPATH)/bin:$(PATH)

# Automatically gather all srcs
ALL_SRC := $(shell find . -name "*.go" | grep -v -e Godeps -e vendor \
-e ".*/\..*" \
-e ".*/_.*" \
-e ".*/mocks.*")

# all directories with *_test.go files in them
TEST_DIRS := $(sort $(dir $(filter %_test.go,$(ALL_SRC))))

bins:
glide install
go build -i -o example example.go
clean:
rm -rf example
test:
@rm -f test
@rm -f test.log
@for dir in $(TEST_DIRS); do \
go test -coverprofile=$@ "$$dir" | tee -a test.log; \
done;

test-race:
@for dir in $(TEST_DIRS); do \
go test -race "$$dir" | tee -a "$$dir"_test.log; \
done;
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
Go client library for Cherami
=============================

Cherami is a distributed, scalable, durable, and highly available message queue system we developed at Uber Engineering to transport asynchronous tasks.

cherami-client-go is the go client library for Cherami.

Clone this repo
---------------
Make sure you clone this repo into the correct location.

`git clone git@github.com:uber/cherami-client-go.git $GOPATH/src/github.com/uber/cherami-client-go`
`pushd $GOPATH/src/github.com/uber/cherami-client-go`


Development
-----------
The cherami-client-go repo specifically holds the client library for Cherami, whose thrift APIs are defined in cherami-thrift repo. This repo can be used to talk to Cherami server once the cherami server is up and running.

The repo also holds an `example` which can be executed against the cherami server running locally.

In order to use the example in this repo, the following dependencies needs to be addressed:
1. Make certain that `thrift` (OSX: `brew install thrift`) and `glide` are in your path (above).
2. Make sure that cherami server is up and running by cloning the `cherami-server` repo and following the instructions on that repo.

Once we have the aforementioned steps, one can build the `example` by running:
`make bins`

In order to use `cherami-client-go` as a library in an application which wants to talk to Cherami, in the consuming repo, take in the updated go-client (`github.com/uber/cherami-client-go`) as a package in `glide.yaml`.

Documentation
--------------

Interested in learning more about Cherami? Read the blog post:
[eng.uber.com.cherami](https://eng.uber.com/cherami/)
161 changes: 161 additions & 0 deletions client/cherami/basePublisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package cherami

import (
"crypto/md5"
"errors"
"hash/crc32"
"strconv"
"sync/atomic"
"time"

"github.com/uber-common/bark"

"github.com/uber/cherami-thrift/.generated/go/cherami"
"github.com/uber/cherami-client-go/common"
"github.com/uber/cherami-client-go/common/backoff"
"github.com/uber/cherami-client-go/common/metrics"
)

type (
// basePublisher contains the data/code
// common to all types of Publisher
// implementations
basePublisher struct {
idCounter int64
path string
client Client
logger bark.Logger
reporter metrics.Reporter
retryPolicy backoff.RetryPolicy
checksumOption cherami.ChecksumOption
}

// publishError represents a message publishing error
publishError struct {
msg string
}
)

// ErrMessageTimedout is returned by Publish when no ack is received within timeout interval
var ErrMessageTimedout = errors.New("Timed out.")

// Publisher default retry policy
const (
publisherRetryInterval = 50 * time.Millisecond
publisherRetryMaxInterval = 10 * time.Second
publisherRetryExpirationInterval = 2 * time.Minute
)

// choosePublishEndpoints selects the list of publish endpoints
// from the set of all possible (protocol -> endpoints) returned
// from the server
func (bp *basePublisher) choosePublishEndpoints(publisherOptions *cherami.ReadPublisherOptionsResult_) (cherami.Protocol, []*cherami.HostAddress) {
// pick best protocol from what server suggested
hostProtocols := publisherOptions.GetHostProtocols()
chosenIdx, err := bp.chooseProcotol(hostProtocols)
chosenProtocol := cherami.Protocol_WS
chosenHostAddresses := publisherOptions.GetHostAddresses()
if err == nil {
chosenProtocol = hostProtocols[chosenIdx].GetProtocol()
chosenHostAddresses = hostProtocols[chosenIdx].GetHostAddresses()
}
return chosenProtocol, chosenHostAddresses
}

// chooseProtocol selects a preferred protocol from the list of
// available protocols returned from the server
func (bp *basePublisher) chooseProcotol(hostProtocols []*cherami.HostProtocol) (int, error) {
clientSupportedProtocol := map[cherami.Protocol]bool{cherami.Protocol_WS: true}
clientSupportButDeprecated := -1
serverSupportedProtocol := make([]cherami.Protocol, 0, len(hostProtocols))

for idx, hostProtocol := range hostProtocols {
serverSupportedProtocol = append(serverSupportedProtocol, hostProtocol.GetProtocol())
if _, found := clientSupportedProtocol[hostProtocol.GetProtocol()]; found {
if !hostProtocol.GetDeprecated() {
// found first supported and non-deprecated one, done
return idx, nil
} else if clientSupportButDeprecated == -1 {
// found first supported but deprecated one, keep looking
clientSupportButDeprecated = idx
}
}
}

if clientSupportButDeprecated == -1 {
bp.logger.WithField(`protocols`, serverSupportedProtocol).Error("No protocol is supported by client")
return clientSupportButDeprecated, &cherami.BadRequestError{Message: `No protocol is supported by client`}
}

bp.logger.WithField(`protocol`, hostProtocols[clientSupportButDeprecated].GetProtocol()).Warn("Client using deprecated protocol")
return clientSupportButDeprecated, nil
}

func (bp *basePublisher) readPublisherOptions() (*cherami.ReadPublisherOptionsResult_, error) {
return bp.client.ReadPublisherOptions(bp.path)
}

func (bp *basePublisher) addChecksum(msg *cherami.PutMessage) {
switch bp.checksumOption {
case cherami.ChecksumOption_CRC32IEEE:
msg.Crc32IEEEDataChecksum = common.Int64Ptr(int64(crc32.ChecksumIEEE(msg.GetData())))
case cherami.ChecksumOption_MD5:
md5Checksum := md5.Sum(msg.GetData())
msg.Md5DataChecksum = md5Checksum[:]
}
}

// toPutMessage converts a PublisherMessage to cherami.PutMessage
func (bp *basePublisher) toPutMessage(pubMessage *PublisherMessage) *cherami.PutMessage {

msgID := atomic.AddInt64(&bp.idCounter, 1)
idStr := strconv.FormatInt(msgID, 10)
delay := int32(pubMessage.Delay.Seconds())

msg := &cherami.PutMessage{
ID: common.StringPtr(idStr),
Data: pubMessage.Data,
DelayMessageInSeconds: &delay,
UserContext: pubMessage.UserContext,
}

bp.addChecksum(msg)
return msg
}

func createDefaultPublisherRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(publisherRetryInterval)
policy.SetMaximumInterval(publisherRetryMaxInterval)
policy.SetExpirationInterval(publisherRetryExpirationInterval)
return policy
}

func newPublishError(status cherami.Status) error {
return &publishError{
msg: "Publish failed with error:" + status.String(),
}
}

func (e *publishError) Error() string {
return e.msg
}
Loading

0 comments on commit 79206f9

Please sign in to comment.