Skip to content

Commit

Permalink
Reuse Ben's code and approach with mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
ikavgo committed Dec 8, 2021
1 parent 43df496 commit 4d67b20
Showing 1 changed file with 23 additions and 1 deletion.
24 changes: 23 additions & 1 deletion cmd/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"sync"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -45,6 +47,9 @@ type envConfig struct {

channel *amqp.Channel
logger *zap.SugaredLogger

pconfirms_chan chan amqp.Confirmation
pconfirms_mutex sync.Mutex
}

func main() {
Expand All @@ -65,6 +70,12 @@ func main() {
}
defer env.channel.Close()

env.pconfirms_chan = env.channel.NotifyPublish(make(chan amqp.Confirmation))
// noWait is false
if err := env.channel.Confirm(false); err != nil {
log.Fatalf("faild to switch connection channel to confirm mode: %s", err)
}

env.logger = logging.FromContext(context.Background())

connectionArgs := kncloudevents.ConnectionArgs{
Expand Down Expand Up @@ -134,6 +145,10 @@ func (env *envConfig) send(event *cloudevents.Event) (int, error) {
for key, val := range event.Extensions() {
headers[key] = val
}

env.pconfirms_mutex.Lock()
defer env.pconfirms_mutex.Unlock()

if err := env.channel.Publish(
env.ExchangeName,
"", // routing key
Expand All @@ -146,5 +161,12 @@ func (env *envConfig) send(event *cloudevents.Event) (int, error) {
}); err != nil {
return http.StatusInternalServerError, fmt.Errorf("failed to publish message")
}
return http.StatusAccepted, nil

confirmation := <-env.pconfirms_chan

if confirmation.Ack {
return http.StatusAccepted, nil
} else {
return http.StatusServiceUnavailable, errors.New("message was not confirmed")
}
}

0 comments on commit 4d67b20

Please sign in to comment.