Skip to content

Commit

Permalink
FIXUP: scratch the pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ikavgo committed Dec 16, 2021
1 parent 4d67b20 commit a9ef4ed
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 40 deletions.
107 changes: 107 additions & 0 deletions cmd/ingress/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"log"
"sync"

amqp "github.com/rabbitmq/amqp091-go"
)

type Channel struct {
channel *amqp.Channel
counter uint64
mutex sync.Mutex
confirmsMap sync.Map
pconfirms_chan chan amqp.Confirmation
}

func openChannel(conn *amqp.Connection) *Channel {
a_channel, err := conn.Channel()
if err != nil {
log.Fatalf("failed to open a channel: %s", err)
}

channel := &Channel{channel: a_channel, counter: 1, pconfirms_chan: make(chan amqp.Confirmation)}

go channel.confirmsHandler()

a_channel.NotifyPublish(channel.pconfirms_chan)
// noWait is false
if err := a_channel.Confirm(false); err != nil {
log.Fatalf("faild to switch connection channel to confirm mode: %s", err)
}

return channel
}

func (channel *Channel) publish(exchange string, headers amqp.Table, bytes []byte) (chan amqp.Confirmation, error) {
channel.mutex.Lock()
defer channel.mutex.Unlock()

cchan := make(chan amqp.Confirmation, 1)

channel.confirmsMap.Store(channel.counter, cchan)

if err := channel.channel.Publish(
exchange,
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: headers,
ContentType: "application/json",
Body: bytes,
}); err != nil {
channel.confirmsMap.LoadAndDelete(channel.counter)
return nil, err
}

channel.counter++

return cchan, nil
}

func (channel *Channel) confirmsHandler() func() error {
return func() error {
for confirm := range channel.pconfirms_chan {
channel.handleConfirm(confirm)
}
return nil
}
}

func (channel *Channel) handleConfirm(confirm amqp.Confirmation) {
// NOTE: current golang driver resequences confirms and sends them one-by-one
value, present := channel.confirmsMap.LoadAndDelete(confirm.DeliveryTag)

if !present {
// really die?
log.Fatalf("Received confirm for unknown send. DeliveryTag: %d", confirm.DeliveryTag)
}

ch := value.(chan amqp.Confirmation)

ch <- confirm
}

func (channel *Channel) Close() {
// should I also stop confirmHandler here?
// mb sending kill switch to channel.pconfirms_chan?
channel.channel.Close()
}
94 changes: 54 additions & 40 deletions cmd/ingress/main.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
/*
Copyright 2020 The Knative Authors
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main
Expand All @@ -23,7 +23,7 @@ import (
"fmt"
"log"
"net/http"
"sync"
"sync/atomic"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -38,43 +38,44 @@ import (
const (
defaultMaxIdleConnections = 1000
defaultMaxIdleConnectionsPerHost = 1000
defaultMaxAmqpChannels = 10
)

type envConfig struct {
Port int `envconfig:"PORT" default:"8080"`
BrokerURL string `envconfig:"BROKER_URL" required:"true"`
ExchangeName string `envconfig:"EXCHANGE_NAME" required:"true"`

channel *amqp.Channel
logger *zap.SugaredLogger
conn *amqp.Connection
channels []*Channel
logger *zap.SugaredLogger

pconfirms_chan chan amqp.Confirmation
pconfirms_mutex sync.Mutex
rc uint64
cc int
}

func main() {
var env envConfig

var err error

if err := envconfig.Process("", &env); err != nil {
log.Fatal("Failed to process env var", zap.Error(err))
}

conn, err := amqp.Dial(env.BrokerURL)
env.conn, err = amqp.Dial(env.BrokerURL)

if err != nil {
log.Fatalf("failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
defer env.conn.Close()

env.channel, err = conn.Channel()
if err != nil {
log.Fatalf("failed to open a channel: %s", err)
}
defer env.channel.Close()
// TODO: get it from annotation
env.cc = defaultMaxAmqpChannels

env.pconfirms_chan = env.channel.NotifyPublish(make(chan amqp.Confirmation))
// noWait is false
if err := env.channel.Confirm(false); err != nil {
log.Fatalf("faild to switch connection channel to confirm mode: %s", err)
}
env.openAmqpChannels()

defer env.closeAmqpChannels()

env.logger = logging.FromContext(context.Background())

Expand Down Expand Up @@ -125,6 +126,7 @@ func (env *envConfig) ServeHTTP(writer http.ResponseWriter, request *http.Reques
return
}

// send to RabbitMQ
statusCode, err := env.send(event)
if err != nil {
env.logger.Error("failed to send event,", err)
Expand All @@ -146,27 +148,39 @@ func (env *envConfig) send(event *cloudevents.Event) (int, error) {
headers[key] = val
}

env.pconfirms_mutex.Lock()
defer env.pconfirms_mutex.Unlock()
ch, err := env.sendMessage(headers, bytes)

if err := env.channel.Publish(
env.ExchangeName,
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: headers,
ContentType: "application/json",
Body: bytes,
}); err != nil {
if err != nil {
return http.StatusInternalServerError, fmt.Errorf("failed to publish message")
}

confirmation := <-env.pconfirms_chan
confirmation := <-ch

if confirmation.Ack {
return http.StatusAccepted, nil
} else {
return http.StatusServiceUnavailable, errors.New("message was not confirmed")
}
}

func (env *envConfig) sendMessage(headers amqp.Table, bytes []byte) (chan amqp.Confirmation, error) {
crc := atomic.AddUint64(&env.rc, 1)

channel := env.channels[crc%uint64(env.cc)]

return channel.publish(env.ExchangeName, headers, bytes)
}

func (env *envConfig) openAmqpChannels() {
env.channels = make([]*Channel, env.cc)

for i := 0; i < env.cc; i++ {
env.channels[i] = openChannel(env.conn)
}
}

func (env *envConfig) closeAmqpChannels() {
for i := 0; i < len(env.channels); i++ {
env.channels[i].Close()
}
}

0 comments on commit a9ef4ed

Please sign in to comment.