-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathtubesock.rb
164 lines (139 loc) · 3.45 KB
/
tubesock.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
require "tubesock/version"
require "tubesock/hijack" if defined?(ActiveSupport)
require "websocket"
# Easily interact with WebSocket connections over Rack.
# TODO: Example with pure Rack
class Tubesock
HijackNotAvailable = Class.new RuntimeError
def initialize(socket, version)
@socket = socket
@version = version
@open_handlers = []
@message_handlers = []
@close_handlers = []
@error_handlers = []
@close_on_error = true
@active = true
end
def self.hijack(env)
if env['rack.hijack']
env['rack.hijack'].call
socket = env['rack.hijack_io']
handshake = WebSocket::Handshake::Server.new
handshake.from_rack env
socket.write handshake.to_s
self.new socket, handshake.version
else
raise Tubesock::HijackNotAvailable
end
end
def prevent_close_on_error
@close_on_error = false
end
def send_data data, type = :text
frame = WebSocket::Frame::Outgoing::Server.new(
version: @version,
data: data,
type: type
)
@socket.write frame.to_s
rescue IOError, Errno::EPIPE, Errno::ETIMEDOUT
close
end
def onopen(&block)
@open_handlers << block
end
def onmessage(&block)
@message_handlers << block
end
def onclose(&block)
@close_handlers << block
end
def onerror(&block)
@error_handlers << block
end
def call_error_handlers(e, data = nil)
@error_handlers.each{|eh| eh.call(e,data)}
close if @close_on_error
end
def listen
keepalive
Thread.new do
Thread.current.abort_on_exception = true
begin
@open_handlers.each(&:call)
each_frame do |data|
@message_handlers.each do |h|
begin
h.call(data)
rescue => e
call_error_handlers(e, data)
end
end
end
ensure
close
end
end
end
def close
return unless @active
@close_handlers.each(&:call)
close!
@active = false
end
def close!
if @socket.respond_to?(:closed?)
@socket.close unless @socket.closed?
else
@socket.close
end
end
def closed?
@socket.closed?
end
def keepalive
thread = Thread.new do
Thread.current.abort_on_exception = true
loop do
sleep 5
begin
send_data nil, :ping
rescue StandardError => e
call_error_handlers(e)
end
end
end
onclose do
thread.kill
end
end
private
def each_frame
framebuffer = WebSocket::Frame::Incoming::Server.new(version: @version)
while IO.select([@socket])
if @socket.respond_to?(:recvfrom)
data, _addrinfo = @socket.recvfrom(2000)
else
data, _addrinfo = @socket.readpartial(2000), @socket.peeraddr
end
break if data.empty?
framebuffer << data
while frame = framebuffer.next
case frame.type
when :close
return
when :text, :binary
yield frame.data
when :ping
# According to https://tools.ietf.org/html/rfc6455#section-5.5.3:
# A Pong frame sent in response to a Ping frame must have identical "Application data" as
# found in the message body of the Ping frame being replied to.'
send_data frame.data, :pong
end
end
end
rescue Errno::EHOSTUNREACH, Errno::ETIMEDOUT, Errno::ECONNRESET, IOError, Errno::EBADF
nil # client disconnected or timed out
end
end