-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage.go
98 lines (93 loc) · 1.97 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package rabbitmq
import (
"context"
"errors"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"strconv"
)
func (g *RabbitMQ) SendDelayMsg(exchangeName, routingKey string, msg amqp.Delivery, delay int32) error {
if g.IsClose() {
err := g.connect()
if err != nil {
return err
}
}
// 声明通道
var err error
var channel *amqp.Channel
channel, err = g.conn.Channel()
if err != nil {
return err
}
// 声明延迟队列名称
queueName := fmt.Sprintf("%s_queue_delay", exchangeName)
// 声明延时队列
_, err = channel.QueueDeclarePassive(
queueName, true, false, false, false,
amqp.Table{
"x-dead-letter-exchange": exchangeName,
"x-dead-letter-routing-key": routingKey,
},
)
if err != nil {
var e *amqp.Error
if errors.As(err, &e) && e.Code == amqp.NotFound {
channel, err = g.conn.Channel()
if err != nil {
return err
}
_, err = channel.QueueDeclare(
queueName,
true,
false,
false,
false,
amqp.Table{
"x-dead-letter-exchange": exchangeName,
"x-dead-letter-routing-key": routingKey,
},
)
if err != nil {
return err
}
} else {
log.Println("Failed to open a channel: ", err)
return e
}
}
defer func(channel *amqp.Channel) {
err := channel.Close()
if err != nil {
log.Printf("关闭channel失败 err :%s \n", err)
}
}(channel)
// 发送消息
err = channel.PublishWithContext(
context.Background(),
"",
queueName,
false,
false,
amqp.Publishing{
Headers: msg.Headers,
ContentType: msg.ContentType,
ContentEncoding: msg.ContentEncoding,
DeliveryMode: msg.DeliveryMode,
Priority: msg.Priority,
CorrelationId: msg.CorrelationId,
ReplyTo: msg.ReplyTo,
MessageId: msg.MessageId,
Type: msg.Type,
UserId: msg.UserId,
AppId: msg.AppId,
Body: msg.Body,
Expiration: strconv.Itoa(int(delay)),
},
)
if err != nil {
return err
}
return nil
}