-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrpc.go
166 lines (144 loc) · 4.03 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package main
import (
"errors"
"log"
"net"
"net/http"
"os"
"strconv"
"strings"
"time"
pb "github.com/itshouldntdothis/swagger-http-grpc"
"go.uber.org/ratelimit"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"gopkg.in/resty.v0"
)
// GrcpServerOptions implements processing functions
type GrcpServerOptions struct {
Addr string
Server *grpc.Server
RateLimited bool
RateLimiter ratelimit.Limiter
RestyClient *resty.Client
UserAgent string
}
// NewGrcpOptions returns a initialized object
func NewGrcpOptions() *GrcpServerOptions {
Transport := &http.Transport{
MaxIdleConns: 200,
DialContext: (&net.Dialer{
Timeout: 60 * time.Second,
KeepAlive: 60 * time.Second,
}).DialContext,
IdleConnTimeout: 60 * time.Second,
TLSHandshakeTimeout: 20 * time.Second,
ResponseHeaderTimeout: 20 * time.Second,
ExpectContinueTimeout: 0,
MaxIdleConnsPerHost: 40,
}
port := "50051"
envGrcpPort := os.Getenv("SW_GRPC_PORT")
if len(envGrcpPort) != 0 {
port = envGrcpPort
}
Addr := ":" + port
log.Printf("SET GRPC server port to %v", Addr)
UserAgent := os.Getenv("SW_USER_AGENT")
if len(UserAgent) == 0 {
log.Fatal("Default useragent env `SW_USER_AGENT` is required")
}
log.Printf("SET GRPC HTTP WORKER User-Agent to `%v`", UserAgent)
RateLimit := 700
var RateLimiter ratelimit.Limiter
var RateLimited bool
envRateLimit := os.Getenv("SW_REQUEST_LIMIT")
if len(envRateLimit) != 0 {
parsedRequestLimit, err := strconv.Atoi(envRateLimit)
if err != nil {
log.Fatalf("Unable to covert ENV 'SW_REQUEST_LIMIT' of %v to int : %v", envRateLimit, err)
}
RateLimit = parsedRequestLimit
}
if RateLimit == 0 {
RateLimited = false
log.Println("Rate Limit of 0 is not recommended")
} else {
RateLimited = true
log.Printf("Rate Limit set to %v", RateLimit)
RateLimiter = ratelimit.New(RateLimit)
}
return &GrcpServerOptions{
Server: grpc.NewServer(),
Addr: Addr,
RestyClient: resty.New().SetTransport(Transport).SetHeader("Via", via),
RateLimited: RateLimited,
RateLimiter: RateLimiter,
UserAgent: UserAgent,
}
}
func (o *GrcpServerOptions) start() {
lis, err := net.Listen("tcp", o.Addr)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
pb.RegisterWorkersServer(o.Server, o)
// Register reflection service on gRPC server.
reflection.Register(o.Server)
log.Println("Starting grpc server")
if err := o.Server.Serve(lis); err != nil {
log.Fatalf("Failed to serve grpc server: %v", err)
}
}
func (o *GrcpServerOptions) close() {
log.Println("Closing down grpc server")
o.Server.GracefulStop()
}
// DoRequest processes a single http request from grpc
func (o *GrcpServerOptions) DoRequest(ctx context.Context, in *pb.Request) (*pb.Response, error) {
method := in.GetMethod()
if method != "GET" && method != "POST" && method != "PUT" && method != "DELETE" {
return nil, errors.New("We don't support " + method + " method at this time")
}
restyReq := o.RestyClient.R()
if len(in.GetHeaders()) > 0 {
restyReq = restyReq.SetHeaders(in.GetHeaders())
}
if restyReq.Header.Get("User-Agent") == "" {
restyReq = restyReq.SetHeader("User-Agent", o.UserAgent)
}
if in.GetBody() != "" {
restyReq = restyReq.SetBody(in.GetBody())
}
var delay time.Duration
if o.RateLimited {
start := time.Now()
now := o.RateLimiter.Take()
delay = now.Sub(start)
}
resp, err := restyReq.Execute(method, in.GetUrl())
if err != nil {
log.Printf("Fetch Error %v", err)
return nil, err
}
headers := make(map[string]*pb.Header)
for k, v := range resp.Header() {
headers[strings.ToLower(k)] = &pb.Header{Value: v}
}
log.Printf("%v successful for %v took %v delayed by %v", method, resp.Request.URL, resp.Time(), delay)
return &pb.Response{
Ok: isOk(resp.StatusCode()),
Url: resp.Request.URL,
Status: int32(resp.StatusCode()),
StatusText: resp.Status(),
Body: resp.String(),
Headers: headers,
}, nil
}
func isOk(status int) bool {
if status >= 200 && status <= 299 {
return true
}
return false
}