-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathapi.go
131 lines (102 loc) · 2.66 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package cogman
import (
"log"
"time"
"github.com/Joker666/cogman/client"
"github.com/Joker666/cogman/config"
"github.com/Joker666/cogman/util"
)
var (
server *Server
clientSession *client.Session
)
// StartBackground starts the server and client in background
func StartBackground(cfg *config.Config) error {
serverCfg, clientCfg, err := setConfig(cfg)
if err != nil {
return err
}
clientSession, err = client.NewSession(*clientCfg)
if err != nil {
return err
}
if err := clientSession.Connect(); err != nil {
return err
}
server, err = NewServer(*serverCfg)
if err != nil {
return err
}
go func() {
defer server.Stop()
if err = server.Start(); err != nil {
log.Print(err)
}
}()
return nil
}
// SendTask sends task to server from client
func SendTask(task util.Task, handler util.Handler) error {
if handler != nil {
if err := Register(task.Name, handler); err != nil {
return err
}
}
return clientSession.SendTask(task)
}
// Register registers task for server to process
func Register(taskName string, handler util.Handler) error {
if handler == nil || taskName == "" {
return ErrInvalidData
}
return server.Register(taskName, handler)
}
func setConfig(cfg *config.Config) (*config.Server, *config.Client, error) {
if cfg == nil {
cfg = &config.Config{}
}
serverCfg := &config.Server{}
clientCfg := &config.Client{}
if cfg.ConnectionTimeout == 0 {
cfg.ConnectionTimeout = time.Minute * 10
}
serverCfg.ConnectionTimeout = cfg.ConnectionTimeout
clientCfg.ConnectionTimeout = cfg.ConnectionTimeout
if cfg.RequestTimeout == 0 {
cfg.RequestTimeout = time.Second * 5
}
clientCfg.RequestTimeout = cfg.RequestTimeout
if cfg.AmqpURI == "" {
return nil, nil, ErrInvalidConfig
}
amqp := config.AMQP{
URI: cfg.AmqpURI,
HighPriorityQueueCount: max(1, cfg.HighPriorityQueueCount),
LowPriorityQueueCount: max(1, cfg.LowPriorityQueueCount),
Exchange: "",
Prefetch: cfg.Prefetch,
}
serverCfg.AMQP = amqp
clientCfg.AMQP = amqp
if cfg.RedisURI == "" {
return nil, nil, ErrInvalidConfig
}
if cfg.RedisTTL == 0 {
cfg.RedisTTL = time.Hour * 24 * 7 // 1 week
}
serverCfg.Redis = config.Redis{URI: cfg.RedisURI, TTL: cfg.RedisTTL}
clientCfg.Redis = config.Redis{URI: cfg.RedisURI, TTL: cfg.RedisTTL}
if cfg.MongoTTL == 0 {
cfg.MongoTTL = time.Hour * 24 * 30 // 1 month
}
serverCfg.Mongo = config.Mongo{URI: cfg.MongoURI, TTL: cfg.MongoTTL}
clientCfg.Mongo = config.Mongo{URI: cfg.MongoURI, TTL: cfg.MongoTTL}
serverCfg.StartRestServer = cfg.StartRestServer
return serverCfg, clientCfg, nil
}
func max(x int, y int) int {
if x > y {
return x
}
return y
}