From 3e50af25bd9f8077c2ae4fe63443bd14f6f55bf4 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Mon, 30 Mar 2020 17:19:06 +0000 Subject: [PATCH 01/15] found some bugs in the socketfiles code, and got a bit carried away fixing things --- lib/files/socketfiles/socket.go | 181 +++++++++++++++++----------- lib/files/socketfiles/tcp.go | 75 +++++------- lib/files/socketfiles/tcp_test.go | 28 ++++- lib/files/socketfiles/tcpreader.go | 3 +- lib/files/socketfiles/throttling.go | 33 +++-- lib/files/socketfiles/udp.go | 151 +++++++++++------------ lib/files/socketfiles/udp_test.go | 37 ++++-- lib/files/socketfiles/udpreader.go | 37 +++--- 8 files changed, 302 insertions(+), 243 deletions(-) diff --git a/lib/files/socketfiles/socket.go b/lib/files/socketfiles/socket.go index 752fb80..34b08ff 100644 --- a/lib/files/socketfiles/socket.go +++ b/lib/files/socketfiles/socket.go @@ -32,21 +32,46 @@ type ipSocket struct { laddr, raddr net.Addr bufferSize int + packetSize int tos, ttl int throttler } +func (s *ipSocket) uri() *url.URL { + q := s.uriQuery() + + switch laddr := s.laddr.(type) { + case *net.TCPAddr: + q.Set(FieldLocalAddress, laddr.IP.String()) + q.Set(FieldLocalPort, strconv.Itoa(laddr.Port)) + + case *net.UDPAddr: + q.Set(FieldLocalAddress, laddr.IP.String()) + q.Set(FieldLocalPort, strconv.Itoa(laddr.Port)) + } + + return &url.URL{ + Scheme: s.raddr.Network(), + Host: s.raddr.String(), + RawQuery: q.Encode(), + } +} + func (s *ipSocket) uriQuery() url.Values { q := make(url.Values) if s.bitrate > 0 { - setInt(q, FieldMaxBitrate, s.bitrate) + q.Set(FieldMaxBitrate, strconv.Itoa(s.bitrate)) } if s.bufferSize > 0 { - setInt(q, FieldBufferSize, s.bufferSize) + q.Set(FieldBufferSize, strconv.Itoa(s.bufferSize)) + } + + if s.packetSize > 0 { + q.Set(FieldPacketSize, strconv.Itoa(s.packetSize)) } if s.tos > 0 { @@ -54,105 +79,124 @@ func (s *ipSocket) uriQuery() url.Values { } if s.ttl > 0 { - setInt(q, FieldTTL, s.ttl) + q.Set(FieldTTL, strconv.Itoa(s.ttl)) } return q } -func (s *ipSocket) setForReader(conn net.Conn, q url.Values) error { - s.laddr = conn.LocalAddr() - - type bufferSizeSetter interface { - SetReadBuffer(int) error +func ipReader(conn net.Conn, q url.Values) (*ipSocket, error) { + bufferSize, err := getSize(q, FieldBufferSize) + if err != nil { + return nil, err } - if bufferSize, ok, err := getSize(q, FieldBufferSize); ok || err != nil { - if err != nil { - return err + + if bufferSize > 0 { + type readBufferSetter interface { + SetReadBuffer(int) error } - conn, ok := conn.(bufferSizeSetter) + conn, ok := conn.(readBufferSetter) if !ok { - return syscall.EINVAL + return nil, syscall.EINVAL } if err := conn.SetReadBuffer(bufferSize); err != nil { - return err + return nil, err } - - s.bufferSize = bufferSize } - return nil -} + return &ipSocket{ + laddr: conn.LocalAddr(), -func (s *ipSocket) setForWriter(conn net.Conn, q url.Values) error { - s.laddr = conn.LocalAddr() - s.raddr = conn.RemoteAddr() + bufferSize: bufferSize, + }, nil +} - if err := s.setThrottle(q); err != nil { - return err +func ipWriter(conn net.Conn, q url.Values) (*ipSocket, error) { + bufferSize, err := getSize(q, FieldBufferSize) + if err != nil { + return nil, err } - type bufferSizeSetter interface { - SetWriteBuffer(int) error - } - if bufferSize, ok, err := getSize(q, FieldBufferSize); ok || err != nil { - if err != nil { - return err + if bufferSize > 0 { + type writeBufferSetter interface { + SetWriteBuffer(int) error } - conn, ok := conn.(bufferSizeSetter) + conn, ok := conn.(writeBufferSetter) if !ok { - return syscall.EINVAL + return nil, syscall.EINVAL } if err := conn.SetWriteBuffer(bufferSize); err != nil { - return err + return nil, err } + } - s.bufferSize = bufferSize + packetSize, err := getSize(q, FieldPacketSize) + if err != nil { + return nil, err + } + + bitrate, err := getSize(q, FieldMaxBitrate) + if err != nil { + return nil, err + } + + var t throttler + if bitrate > 0 { + t.setBitrate(bitrate, packetSize) } var p *ipv4.Conn - if tos, ok, err := getInt(q, FieldTOS); ok || err != nil { - if err != nil { - return err - } + tos, err := getInt(q, FieldTOS) + if err != nil { + return nil, err + } + if tos > 0 { if p == nil { p = ipv4.NewConn(conn) } if err := p.SetTOS(tos); err != nil { - return err + return nil, err } - s.tos, _ = p.TOS() + tos, _ = p.TOS() } - if ttl, ok, err := getInt(q, FieldTTL); ok || err != nil { - if err != nil { - return err - } + ttl, err := getInt(q, FieldTTL) + if err != nil { + return nil, err + } + if ttl > 0 { if p == nil { p = ipv4.NewConn(conn) } if err := p.SetTTL(ttl); err != nil { - return err + return nil, err } - s.ttl, _ = p.TTL() + ttl, _ = p.TTL() } - return nil -} + return &ipSocket{ + laddr: conn.LocalAddr(), + raddr: conn.RemoteAddr(), + + bufferSize: bufferSize, + packetSize: packetSize, + + tos: tos, + ttl: ttl, -func setInt(q url.Values, field string, val int) { - q.Set(field, strconv.Itoa(val)) + throttler: t, + }, nil } var scales = map[byte]int{ @@ -164,40 +208,40 @@ var scales = map[byte]int{ 'k': 1000, } -func getSize(q url.Values, field string) (val int, specified bool, err error) { - s := q.Get(field) - if s == "" { - return 0, false, nil +func getSize(q url.Values, field string) (val int, err error) { + value := q.Get(field) + if value == "" { + return 0, nil } - suffix := s[len(s)-1] + suffix := value[len(value)-1] scale := 1 - if val, ok := scales[suffix]; ok { - scale = val - s = s[:len(s)-1] + if s := scales[suffix]; s > 0 { + scale = s + value = value[:len(value)-1] } - i, err := strconv.ParseInt(s, 0, strconv.IntSize) + i, err := strconv.ParseInt(value, 0, strconv.IntSize) if err != nil { - return 0, true, err + return 0, err } - return int(i) * scale, true, nil + return int(i) * scale, nil } -func getInt(q url.Values, field string) (val int, specified bool, err error) { - s := q.Get(field) - if s == "" { - return 0, false, nil +func getInt(q url.Values, field string) (val int, err error) { + value := q.Get(field) + if value == "" { + return 0, nil } - i, err := strconv.ParseInt(s, 0, strconv.IntSize) + i, err := strconv.ParseInt(value, 0, strconv.IntSize) if err != nil { - return 0, true, err + return 0, err } - return int(i), true, nil + return int(i), nil } func buildAddr(addr, portString string) (ip net.IP, port int, err error) { @@ -220,9 +264,10 @@ func buildAddr(addr, portString string) (ip net.IP, port int, err error) { return ip, port, nil } -func withContext(ctx context.Context, fn func() error) (err error) { +func do(ctx context.Context, fn func() error) error { done := make(chan struct{}) + var err error go func() { defer close(done) diff --git a/lib/files/socketfiles/tcp.go b/lib/files/socketfiles/tcp.go index 68c2978..acd76ef 100644 --- a/lib/files/socketfiles/tcp.go +++ b/lib/files/socketfiles/tcp.go @@ -19,25 +19,21 @@ func init() { } type tcpWriter struct { + *wrapper.Info + conn *net.TCPConn + mu sync.Mutex closed chan struct{} - conn *net.TCPConn - *wrapper.Info - ipSocket + sock *ipSocket } func (w *tcpWriter) SetBitrate(bitrate int) int { w.mu.Lock() defer w.mu.Unlock() - prev := w.bitrate - - w.bitrate = bitrate - w.updateDelay(1) - - return prev + return w.sock.setBitrate(bitrate, 1) } func (w *tcpWriter) Sync() error { @@ -61,26 +57,13 @@ func (w *tcpWriter) Write(b []byte) (n int, err error) { w.mu.Lock() defer w.mu.Unlock() - w.throttle(len(b)) + w.sock.throttle(len(b)) return w.conn.Write(b) } func (w *tcpWriter) uri() *url.URL { - q := w.ipSocket.uriQuery() - - if w.laddr != nil { - laddr := w.laddr.(*net.TCPAddr) - - q.Set(FieldLocalAddress, laddr.IP.String()) - setInt(q, FieldLocalPort, laddr.Port) - } - - return &url.URL{ - Scheme: "tcp", - Host: w.raddr.String(), - RawQuery: q.Encode(), - } + return w.sock.uri() } func (h *tcpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, error) { @@ -88,10 +71,6 @@ func (h *tcpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er return nil, files.PathError("create", uri.String(), errInvalidURL) } - w := &tcpWriter{ - closed: make(chan struct{}), - } - raddr, err := net.ResolveTCPAddr("tcp", uri.Host) if err != nil { return nil, files.PathError("create", uri.String(), err) @@ -99,32 +78,44 @@ func (h *tcpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er q := uri.Query() - port := q.Get(FieldLocalPort) - addr := q.Get(FieldLocalAddress) - var laddr *net.TCPAddr - if port != "" || addr != "" { - laddr = new(net.TCPAddr) - - laddr.IP, laddr.Port, err = buildAddr(addr, port) + host := q.Get(FieldLocalAddress) + port := q.Get(FieldLocalPort) + if host != "" || port != "" { + laddr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port)) if err != nil { return nil, files.PathError("create", uri.String(), err) } } - dail := func() error { + var conn *net.TCPConn + dial := func() error { var err error - w.conn, err = net.DialTCP("tcp", laddr, raddr) + conn, err = net.DialTCP("tcp", laddr, raddr) return err } - if err := withContext(ctx, dail); err != nil { + if err := do(ctx, dial); err != nil { + return nil, files.PathError("create", uri.String(), err) + } + + sock, err := ipWriter(conn, q) + if err != nil { + conn.Close() return nil, files.PathError("create", uri.String(), err) } + w := &tcpWriter{ + Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), + conn: conn, + + closed: make(chan struct{}), + sock: sock, + } + go func() { select { case <-w.closed: @@ -133,14 +124,6 @@ func (h *tcpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er } }() - if err := w.ipSocket.setForWriter(w.conn, q); err != nil { - w.Close() - return nil, files.PathError("create", uri.String(), err) - } - - w.updateDelay(1) - w.Info = wrapper.NewInfo(w.uri(), 0, time.Now()) - return w, nil } diff --git a/lib/files/socketfiles/tcp_test.go b/lib/files/socketfiles/tcp_test.go index 53db281..2557ddf 100644 --- a/lib/files/socketfiles/tcp_test.go +++ b/lib/files/socketfiles/tcp_test.go @@ -10,13 +10,13 @@ import ( func TestTCPName(t *testing.T) { w := &tcpWriter{ - ipSocket: ipSocket{ + sock: &ipSocket{ laddr: &net.TCPAddr{ IP: []byte{127, 0, 0, 1}, Port: 65535, }, raddr: &net.TCPAddr{ - IP: []byte{127, 0, 0, 1}, + IP: []byte{127, 0, 0, 2}, Port: 80, }, bufferSize: 1024, @@ -30,27 +30,43 @@ func TestTCPName(t *testing.T) { } uri := w.uri() - expected := "tcp://127.0.0.1:80?buffer_size=1024&localaddr=127.0.0.1&localport=65535&max_bitrate=2048&tos=0x80&ttl=100" + expected := "tcp://127.0.0.2:80?buffer_size=1024&localaddr=127.0.0.1&localport=65535&max_bitrate=2048&tos=0x80&ttl=100" if s := uri.String(); s != expected { t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) } w = &tcpWriter{ - ipSocket: ipSocket{ + sock: &ipSocket{ laddr: &net.TCPAddr{ IP: []byte{127, 0, 0, 1}, Port: 65534, }, raddr: &net.TCPAddr{ - IP: []byte{127, 0, 0, 1}, + IP: []byte{127, 0, 0, 2}, Port: 443, }, }, } uri = w.uri() - expected = "tcp://127.0.0.1:443?localaddr=127.0.0.1&localport=65534" + expected = "tcp://127.0.0.2:443?localaddr=127.0.0.1&localport=65534" + + if s := uri.String(); s != expected { + t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) + } + + w = &tcpWriter{ + sock: &ipSocket{ + raddr: &net.TCPAddr{ + IP: []byte{127, 0, 0, 2}, + Port: 8080, + }, + }, + } + + uri = w.uri() + expected = "tcp://127.0.0.2:8080" if s := uri.String(); s != expected { t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) diff --git a/lib/files/socketfiles/tcpreader.go b/lib/files/socketfiles/tcpreader.go index 06d0fae..971e7db 100644 --- a/lib/files/socketfiles/tcpreader.go +++ b/lib/files/socketfiles/tcpreader.go @@ -61,7 +61,8 @@ func (h *tcpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, erro // Maybe we asked for an arbitrary port, // so, we build our own copy of the URL, and use that. lurl := &url.URL{ - Host: l.Addr().String(), + Scheme: "tcp", + Host: l.Addr().String(), } loading := make(chan struct{}) diff --git a/lib/files/socketfiles/throttling.go b/lib/files/socketfiles/throttling.go index 68682b2..7438bf0 100644 --- a/lib/files/socketfiles/throttling.go +++ b/lib/files/socketfiles/throttling.go @@ -1,7 +1,6 @@ package socketfiles import ( - "net/url" "time" ) @@ -12,16 +11,33 @@ type throttler struct { next *time.Timer } +func (t *throttler) drain() { + if t.next == nil { + return + } + + if !t.next.Stop() { + <-t.next.C + } +} + func (t *throttler) updateDelay(prescale int) { if t.bitrate <= 0 { t.delay = 0 + t.drain() t.next = nil return } + if t.next != nil { + t.drain() + t.next.Reset(0) + } else { + t.next = time.NewTimer(0) + } + // delay = nanoseconds per byte t.delay = (8 * time.Second) / time.Duration(t.bitrate) - t.next = time.NewTimer(0) // recalculate to the actual expected maximum bitrate t.bitrate = int(8 * time.Second / t.delay) @@ -46,14 +62,11 @@ func (t *throttler) throttle(scale int) { t.next.Reset(t.delay) } -func (t *throttler) setThrottle(q url.Values) error { - if bitrate, ok, err := getSize(q, FieldMaxBitrate); ok || err != nil { - if err != nil { - return err - } +func (t *throttler) setBitrate(bitrate, prescale int) int { + prev := t.bitrate - t.bitrate = bitrate - } + t.bitrate = bitrate + t.updateDelay(prescale) - return nil + return prev } diff --git a/lib/files/socketfiles/udp.go b/lib/files/socketfiles/udp.go index bef2151..bc48eee 100644 --- a/lib/files/socketfiles/udp.go +++ b/lib/files/socketfiles/udp.go @@ -20,13 +20,14 @@ func init() { } type udpWriter struct { + *wrapper.Info + conn *net.UDPConn + mu sync.Mutex closed chan struct{} - conn *net.UDPConn - *wrapper.Info - ipSocket + sock *ipSocket noerrs bool @@ -59,12 +60,23 @@ func (w *udpWriter) SetPacketSize(size int) int { prev := len(w.buf) - w.buf = nil - if size > 0 { - w.buf = make([]byte, size) + switch { + case size <= 0: + w.buf = nil + + case size <= len(w.buf): + w.buf = w.buf[:size] + + default: + w.buf = append(w.buf, make([]byte, size-len(w.buf))...) } - w.updateDelay(len(w.buf)) + if w.off > len(w.buf) { + w.off = len(w.buf) + } + + w.sock.packetSize = len(w.buf) + w.sock.updateDelay(len(w.buf)) return prev } @@ -73,12 +85,7 @@ func (w *udpWriter) SetBitrate(bitrate int) int { w.mu.Lock() defer w.mu.Unlock() - prev := w.bitrate - - w.bitrate = bitrate - w.updateDelay(len(w.buf)) - - return prev + return w.sock.setBitrate(bitrate, len(w.buf)) } func (w *udpWriter) Sync() error { @@ -98,14 +105,18 @@ func (w *udpWriter) sync() error { w.buf[i] = 0 } - w.off = 0 - _, err := w.mustWrite(w.buf) + _, err := w.writeBuffer() return err } -func (w *udpWriter) mustWrite(b []byte) (n int, err error) { +func (w *udpWriter) writeBuffer() (n int, err error) { + w.off = 0 + return w.write(w.buf) +} + +func (w *udpWriter) write(b []byte) (n int, err error) { // We should have already prescaled the delay, so scale=1 here. - w.throttle(1) + w.sock.throttle(1) n, err = w.conn.Write(b) if n != len(b) { @@ -121,14 +132,14 @@ func (w *udpWriter) Close() error { w.mu.Lock() defer w.mu.Unlock() - err := w.sync() - select { case <-w.closed: default: close(w.closed) } + err := w.sync() + if err2 := w.conn.Close(); err == nil { err = err2 } @@ -141,7 +152,7 @@ func (w *udpWriter) Write(b []byte) (n int, err error) { defer w.mu.Unlock() if len(w.buf) < 1 { - w.throttle(len(b)) + w.sock.throttle(len(b)) n, err = w.conn.Write(b) return n, w.err(err) @@ -158,28 +169,24 @@ func (w *udpWriter) Write(b []byte) (n int, err error) { return n, nil } - w.off = 0 - b = b[n:] - - n2, err2 := w.mustWrite(w.buf) + n2, err2 := w.writeBuffer() if err = w.err(err2); err != nil { if n2 > 0 { + // Should we? + // This could cause loss of packet-alignment from writers? w.off = copy(w.buf, w.buf[n2:]) } - /*n -= len(w.buf) - n2 - if n < 0 { - n = 0 - } */ - return n, err } + + b = b[n:] } sz := len(w.buf) for len(b) >= sz { - n2, err2 := w.mustWrite(b[:sz]) + n2, err2 := w.write(b[:sz]) n += n2 if err = w.err(err2); err != nil { @@ -191,33 +198,15 @@ func (w *udpWriter) Write(b []byte) (n int, err error) { } if len(b) > 0 { - n2 := copy(w.buf, b) - w.off += n2 - n += n2 + w.off = copy(w.buf, b) + n += w.off } return n, nil } func (w *udpWriter) uri() *url.URL { - q := w.ipSocket.uriQuery() - - if w.laddr != nil { - laddr := w.laddr.(*net.UDPAddr) - - q.Set(FieldLocalAddress, laddr.IP.String()) - setInt(q, FieldLocalPort, laddr.Port) - } - - if len(w.buf) > 0 { - setInt(q, FieldPacketSize, len(w.buf)) - } - - return &url.URL{ - Scheme: "udp", - Host: w.raddr.String(), - RawQuery: q.Encode(), - } + return w.sock.uri() } func (h *udpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, error) { @@ -225,10 +214,6 @@ func (h *udpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er return nil, files.PathError("create", uri.String(), errInvalidURL) } - w := &udpWriter{ - closed: make(chan struct{}), - } - raddr, err := net.ResolveUDPAddr("udp", uri.Host) if err != nil { return nil, files.PathError("create", uri.String(), err) @@ -236,32 +221,51 @@ func (h *udpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er q := uri.Query() - port := q.Get(FieldLocalPort) - addr := q.Get(FieldLocalAddress) - var laddr *net.UDPAddr - if port != "" || addr != "" { - laddr = new(net.UDPAddr) - - laddr.IP, laddr.Port, err = buildAddr(addr, port) + host := q.Get(FieldLocalAddress) + port := q.Get(FieldLocalPort) + if host != "" || port != "" { + laddr, err = net.ResolveUDPAddr("udp", net.JoinHostPort(host, port)) if err != nil { return nil, files.PathError("create", uri.String(), err) } } - dail := func() error { + var conn *net.UDPConn + dial := func() error { var err error - w.conn, err = net.DialUDP("udp", laddr, raddr) + conn, err = net.DialUDP("udp", laddr, raddr) return err } - if err := withContext(ctx, dail); err != nil { + if err := do(ctx, dial); err != nil { + return nil, files.PathError("create", uri.String(), err) + } + + sock, err := ipWriter(conn, q) + if err != nil { + conn.Close() return nil, files.PathError("create", uri.String(), err) } + var buf []byte + if sock.packetSize > 0 { + buf = make([]byte, sock.packetSize) + } + + w := &udpWriter{ + Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), + conn: conn, + + closed: make(chan struct{}), + sock: sock, + + buf: buf, + } + go func() { select { case <-w.closed: @@ -270,23 +274,6 @@ func (h *udpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er } }() - if err := w.ipSocket.setForWriter(w.conn, q); err != nil { - w.Close() - return nil, files.PathError("create", uri.String(), err) - } - - if pktSize, ok, err := getSize(q, FieldPacketSize); ok || err != nil { - if err != nil { - w.Close() - return nil, files.PathError("create", uri.String(), err) - } - - w.buf = make([]byte, pktSize) - } - - w.updateDelay(len(w.buf)) - w.Info = wrapper.NewInfo(w.uri(), 0, time.Now()) - return w, nil } diff --git a/lib/files/socketfiles/udp_test.go b/lib/files/socketfiles/udp_test.go index e4a5bf5..d7e07bc 100644 --- a/lib/files/socketfiles/udp_test.go +++ b/lib/files/socketfiles/udp_test.go @@ -10,48 +10,67 @@ import ( func TestUDPName(t *testing.T) { w := &udpWriter{ - ipSocket: ipSocket{ + sock: &ipSocket{ laddr: &net.UDPAddr{ IP: []byte{127, 0, 0, 1}, Port: 65535, }, raddr: &net.UDPAddr{ - IP: []byte{127, 0, 0, 1}, + IP: []byte{127, 0, 0, 2}, Port: 80, }, + bufferSize: 1024, - ttl: 100, - tos: 0x80, + + ttl: 100, + tos: 0x80, throttler: throttler{ bitrate: 2048, }, }, - buf: make([]byte, 188), } + w.SetPacketSize(188) + uri := w.uri() - expected := "udp://127.0.0.1:80?buffer_size=1024&localaddr=127.0.0.1&localport=65535&max_bitrate=2048&pkt_size=188&tos=0x80&ttl=100" + expected := "udp://127.0.0.2:80?buffer_size=1024&localaddr=127.0.0.1&localport=65535&max_bitrate=2048&pkt_size=188&tos=0x80&ttl=100" if s := uri.String(); s != expected { t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) } w = &udpWriter{ - ipSocket: ipSocket{ + sock: &ipSocket{ laddr: &net.UDPAddr{ IP: []byte{127, 0, 0, 1}, Port: 65534, }, raddr: &net.UDPAddr{ - IP: []byte{127, 0, 0, 1}, + IP: []byte{127, 0, 0, 2}, Port: 443, }, }, } uri = w.uri() - expected = "udp://127.0.0.1:443?localaddr=127.0.0.1&localport=65534" + expected = "udp://127.0.0.2:443?localaddr=127.0.0.1&localport=65534" + + if s := uri.String(); s != expected { + t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) + } + + w = &udpWriter{ + sock: &ipSocket{ + raddr: &net.UDPAddr{ + IP: []byte{127, 0, 0, 2}, + Port: 8080, + }, + }, + } + + uri = w.uri() + expected = "udp://127.0.0.2:8080" if s := uri.String(); s != expected { t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) diff --git a/lib/files/socketfiles/udpreader.go b/lib/files/socketfiles/udpreader.go index 040e79d..6cf0a1d 100644 --- a/lib/files/socketfiles/udpreader.go +++ b/lib/files/socketfiles/udpreader.go @@ -12,9 +12,9 @@ import ( ) type udpReader struct { - conn *net.UDPConn *wrapper.Info - ipSocket + conn *net.UDPConn + sock *ipSocket } func (r *udpReader) Read(b []byte) (n int, err error) { @@ -29,41 +29,36 @@ func (r *udpReader) Close() error { return r.conn.Close() } -func (r *udpReader) uri() *url.URL { - q := r.ipSocket.uriQuery() - - return &url.URL{ - Scheme: "udp", - Host: r.laddr.String(), - RawQuery: q.Encode(), - } -} - func (h *udpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) { if uri.Host == "" { return nil, files.PathError("open", uri.String(), errInvalidURL) } - r := new(udpReader) - laddr, err := net.ResolveUDPAddr("udp", uri.Host) if err != nil { return nil, files.PathError("open", uri.String(), err) } - q := uri.Query() - - r.conn, err = net.ListenUDP("udp", laddr) + conn, err := net.ListenUDP("udp", laddr) if err != nil { return nil, files.PathError("open", uri.String(), err) } - if err := r.ipSocket.setForReader(r.conn, q); err != nil { - r.conn.Close() + sock, err := ipReader(conn, uri.Query()) + if err != nil { + conn.Close() return nil, files.PathError("open", uri.String(), err) } - r.Info = wrapper.NewInfo(r.uri(), 0, time.Now()) + uri = &url.URL{ + Scheme: laddr.Network(), + Host: laddr.String(), + RawQuery: sock.uriQuery().Encode(), + } - return r, nil + return &udpReader{ + Info: wrapper.NewInfo(uri, 0, time.Now()), + conn: conn, + sock: sock, + }, nil } From 1c6e25f49acdb362f023fe7f1e3649d12c3124f6 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Mon, 30 Mar 2020 17:28:16 +0000 Subject: [PATCH 02/15] also tcpreader has some fixes --- lib/files/socketfiles/tcpreader.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/lib/files/socketfiles/tcpreader.go b/lib/files/socketfiles/tcpreader.go index 971e7db..875810f 100644 --- a/lib/files/socketfiles/tcpreader.go +++ b/lib/files/socketfiles/tcpreader.go @@ -12,11 +12,12 @@ import ( ) type tcpReader struct { - conn *net.TCPConn *wrapper.Info - err error loading <-chan struct{} + + err error + conn *net.TCPConn } func (r *tcpReader) Read(b []byte) (n int, err error) { @@ -59,21 +60,22 @@ func (h *tcpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, erro } // Maybe we asked for an arbitrary port, - // so, we build our own copy of the URL, and use that. - lurl := &url.URL{ - Scheme: "tcp", - Host: l.Addr().String(), + // so, refresh our address to the one we’re actually listening on. + laddr = l.Addr().(*net.TCPAddr) + + uri = &url.URL{ + Scheme: laddr.Network(), + Host: laddr.String(), } loading := make(chan struct{}) r := &tcpReader{ loading: loading, - Info: wrapper.NewInfo(lurl, 0, time.Now()), + Info: wrapper.NewInfo(uri, 0, time.Now()), } go func() { defer close(loading) - defer l.Close() select { case loading <- struct{}{}: @@ -88,6 +90,11 @@ func (h *tcpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, erro return } + // We can close the listener now that we have accepted one, + // this will not close the accepted connection. + l.Close() + + // TODO: make the a configurable option? /* if err := conn.CloseWrite(); err != nil { conn.Close() r.err = err From bd22b003173ae5e52648f1ea3dc1c232d842116c Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Mon, 30 Mar 2020 17:38:05 +0000 Subject: [PATCH 03/15] allow context cancel while waiting on accept --- lib/files/socketfiles/tcpreader.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/lib/files/socketfiles/tcpreader.go b/lib/files/socketfiles/tcpreader.go index 875810f..132ca89 100644 --- a/lib/files/socketfiles/tcpreader.go +++ b/lib/files/socketfiles/tcpreader.go @@ -84,8 +84,16 @@ func (h *tcpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, erro return } - conn, err := l.AcceptTCP() - if err != nil { + var conn *net.TCPConn + accept := func() error { + var err error + + conn, err = l.AcceptTCP() + + return err + } + + if err := do(ctx, accept); err != nil { r.err = files.PathError("accept", uri.String(), err) return } From 195b8bafc64cf27f5311b79f25fe0ae1459aebeb Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Mon, 30 Mar 2020 17:38:50 +0000 Subject: [PATCH 04/15] oops, we need to l.Close in all codepaths --- lib/files/socketfiles/tcpreader.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/files/socketfiles/tcpreader.go b/lib/files/socketfiles/tcpreader.go index 132ca89..e574194 100644 --- a/lib/files/socketfiles/tcpreader.go +++ b/lib/files/socketfiles/tcpreader.go @@ -76,6 +76,7 @@ func (h *tcpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, erro go func() { defer close(loading) + defer l.Close() select { case loading <- struct{}{}: @@ -98,10 +99,6 @@ func (h *tcpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, erro return } - // We can close the listener now that we have accepted one, - // this will not close the accepted connection. - l.Close() - // TODO: make the a configurable option? /* if err := conn.CloseWrite(); err != nil { conn.Close() From 58562e6adc0dbeb778a8d077300d8543c5e8a131 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Mon, 30 Mar 2020 18:02:16 +0000 Subject: [PATCH 05/15] expose UDPConn directly as an embedded element --- lib/files/socketfiles/udpreader.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/lib/files/socketfiles/udpreader.go b/lib/files/socketfiles/udpreader.go index 6cf0a1d..08bc92d 100644 --- a/lib/files/socketfiles/udpreader.go +++ b/lib/files/socketfiles/udpreader.go @@ -13,22 +13,15 @@ import ( type udpReader struct { *wrapper.Info - conn *net.UDPConn sock *ipSocket -} -func (r *udpReader) Read(b []byte) (n int, err error) { - return r.conn.Read(b) + *net.UDPConn } func (r *udpReader) Seek(offset int64, whence int) (int64, error) { return 0, os.ErrInvalid } -func (r *udpReader) Close() error { - return r.conn.Close() -} - func (h *udpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) { if uri.Host == "" { return nil, files.PathError("open", uri.String(), errInvalidURL) @@ -58,7 +51,8 @@ func (h *udpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, erro return &udpReader{ Info: wrapper.NewInfo(uri, 0, time.Now()), - conn: conn, sock: sock, + + UDPConn: conn, }, nil } From c6fe51f5e368b8d4273a4754dd1fdffd68017ce8 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Wed, 1 Apr 2020 14:14:14 +0000 Subject: [PATCH 06/15] avoid panic on nil-deref on Close with no Accept --- lib/files/socketfiles/tcpreader.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/files/socketfiles/tcpreader.go b/lib/files/socketfiles/tcpreader.go index e574194..2711e9f 100644 --- a/lib/files/socketfiles/tcpreader.go +++ b/lib/files/socketfiles/tcpreader.go @@ -39,6 +39,11 @@ func (r *tcpReader) Close() error { for range r.loading { } + // Never connected, so just return nil. + if r.conn == nil { + return nil + } + // Ignore the r.err, as it is a request-scope error, and not relevant to closing. return r.conn.Close() From 1cea329d97a6d62031160b1a1d25cea4a91bfb34 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Wed, 1 Apr 2020 14:46:28 +0000 Subject: [PATCH 07/15] update unixsocket as well --- lib/files/unixsocket/unixsocket.go | 53 +++++++++++++++++++++++------- 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/lib/files/unixsocket/unixsocket.go b/lib/files/unixsocket/unixsocket.go index 41ba692..21219f7 100644 --- a/lib/files/unixsocket/unixsocket.go +++ b/lib/files/unixsocket/unixsocket.go @@ -19,8 +19,8 @@ func init() { } type writer struct { - *net.UnixConn *wrapper.Info + *net.UnixConn } func (w *writer) Sync() error { return nil } @@ -41,11 +41,6 @@ func (h *handler) Create(ctx context.Context, uri *url.URL) (files.Writer, error return nil, err } - fixURL := &url.URL{ - Scheme: "unix", - Opaque: raddr.String(), - } - var laddr *net.UnixAddr q := uri.Query() @@ -54,18 +49,35 @@ func (h *handler) Create(ctx context.Context, uri *url.URL) (files.Writer, error if err != nil { return nil, err } + } + + var conn *net.UnixConn + dial := func() error { + var err error + + conn, err = net.DialUnix("unix", laddr, raddr) + + return err + } + + if err := do(ctx, dial); err != nil { + return nil, files.PathError("create", uri.String(), err) + } + + q = make(url.Values) + if laddr != nil { q.Set(FieldLocalAddress, laddr.String()) - fixURL.RawQuery = q.Encode() } - conn, err := net.DialUnix("unix", laddr, raddr) - if err != nil { - return nil, err + uri = &url.URL{ + Scheme: raddr.Network(), + Path: raddr.String(), + RawQuery: q.Encode(), } w := &writer{ + Info: wrapper.NewInfo(uri, 0, time.Now()), UnixConn: conn, - Info: wrapper.NewInfo(fixURL, 0, time.Now()), } return w, nil @@ -74,3 +86,22 @@ func (h *handler) Create(ctx context.Context, uri *url.URL) (files.Writer, error func (h *handler) List(ctx context.Context, uri *url.URL) ([]os.FileInfo, error) { return nil, files.PathError("readdir", uri.String(), os.ErrInvalid) } + +func do(ctx context.Context, fn func() error) error { + done := make(chan struct{}) + + var err error + go func() { + defer close(done) + + err = fn() + }() + + select { + case <-done: + case <-ctx.Done(): + return ctx.Err() + } + + return err +} From 662c6fc5c412ff4091968bdc1cb79833bfc5bdff Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Wed, 1 Apr 2020 14:50:07 +0000 Subject: [PATCH 08/15] update unixsocket/reader.go as well --- lib/files/unixsocket/reader.go | 42 ++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/lib/files/unixsocket/reader.go b/lib/files/unixsocket/reader.go index 778ef27..bdbedc4 100644 --- a/lib/files/unixsocket/reader.go +++ b/lib/files/unixsocket/reader.go @@ -12,11 +12,12 @@ import ( ) type reader struct { - conn *net.UnixConn *wrapper.Info - err error loading <-chan struct{} + + err error + conn *net.UnixConn } func (r *reader) Read(b []byte) (n int, err error) { @@ -38,6 +39,11 @@ func (r *reader) Close() error { for range r.loading { } + // Never connected, so just return nil. + if r.conn == nil { + return nil + } + // Ignore the r.err, as it is a request-scope error, and not relevant to closing. return r.conn.Close() @@ -54,20 +60,23 @@ func (h *handler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) return nil, err } - fixURL := &url.URL{ - Scheme: "unix", - Opaque: laddr.String(), - } - l, err := net.ListenUnix("unix", laddr) if err != nil { return nil, err } + // Make sure we are setting our file name to the actual address we’re listening on. + laddr = l.Addr().(*net.UnixAddr) + + uri = &url.URL{ + Scheme: laddr.Network(), + Path: laddr.String(), + } + loading := make(chan struct{}) r := &reader{ loading: loading, - Info: wrapper.NewInfo(fixURL, 0, time.Now()), + Info: wrapper.NewInfo(uri, 0, time.Now()), } go func() { @@ -77,15 +86,24 @@ func (h *handler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) select { case loading <- struct{}{}: case <-ctx.Done(): - r.err = ctx.Err() + r.err = files.PathError("open", uri.String(), ctx.Err()) return } - conn, err := l.AcceptUnix() - if err != nil { - r.err = err + var conn *net.UnixConn + accept := func() error { + var err error + + conn, err = l.AcceptUnix() + + return err + } + + if err := do(ctx, accept); err != nil { + r.err = files.PathError("accept", uri.String(), err) return } + r.conn = conn }() From a741a48bf9d5c4da41fb3a32bfb850b9ffd3726f Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Wed, 1 Apr 2020 15:29:10 +0000 Subject: [PATCH 09/15] only show local_addr if we specified one --- lib/files/socketfiles/socket.go | 9 +++++++-- lib/files/socketfiles/tcp.go | 2 +- lib/files/socketfiles/udp.go | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/lib/files/socketfiles/socket.go b/lib/files/socketfiles/socket.go index 34b08ff..e65e7a4 100644 --- a/lib/files/socketfiles/socket.go +++ b/lib/files/socketfiles/socket.go @@ -113,7 +113,7 @@ func ipReader(conn net.Conn, q url.Values) (*ipSocket, error) { }, nil } -func ipWriter(conn net.Conn, q url.Values) (*ipSocket, error) { +func ipWriter(conn net.Conn, showLocalAddr bool, q url.Values) (*ipSocket, error) { bufferSize, err := getSize(q, FieldBufferSize) if err != nil { return nil, err @@ -185,8 +185,13 @@ func ipWriter(conn net.Conn, q url.Values) (*ipSocket, error) { ttl, _ = p.TTL() } + var laddr net.Addr + if showLocalAddr { + laddr = conn.LocalAddr() + } + return &ipSocket{ - laddr: conn.LocalAddr(), + laddr: laddr, raddr: conn.RemoteAddr(), bufferSize: bufferSize, diff --git a/lib/files/socketfiles/tcp.go b/lib/files/socketfiles/tcp.go index acd76ef..7b976e2 100644 --- a/lib/files/socketfiles/tcp.go +++ b/lib/files/socketfiles/tcp.go @@ -102,7 +102,7 @@ func (h *tcpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er return nil, files.PathError("create", uri.String(), err) } - sock, err := ipWriter(conn, q) + sock, err := ipWriter(conn, laddr != nil, q) if err != nil { conn.Close() return nil, files.PathError("create", uri.String(), err) diff --git a/lib/files/socketfiles/udp.go b/lib/files/socketfiles/udp.go index bc48eee..fd31a3a 100644 --- a/lib/files/socketfiles/udp.go +++ b/lib/files/socketfiles/udp.go @@ -245,7 +245,7 @@ func (h *udpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er return nil, files.PathError("create", uri.String(), err) } - sock, err := ipWriter(conn, q) + sock, err := ipWriter(conn, laddr != nil, q) if err != nil { conn.Close() return nil, files.PathError("create", uri.String(), err) From 30e6f57ed07efa9733815e1e61357c8a9ac5aeef Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Wed, 1 Apr 2020 15:35:34 +0000 Subject: [PATCH 10/15] reinit laddr for udpreader in case asked for port 0 --- lib/files/socketfiles/udpreader.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/files/socketfiles/udpreader.go b/lib/files/socketfiles/udpreader.go index 08bc92d..5ccf2bc 100644 --- a/lib/files/socketfiles/udpreader.go +++ b/lib/files/socketfiles/udpreader.go @@ -37,6 +37,10 @@ func (h *udpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, erro return nil, files.PathError("open", uri.String(), err) } + // Maybe we asked for an arbitrary port, + // so, refresh our address to the one we’re actually listening on. + laddr = conn.LocalAddr().(*net.UDPAddr) + sock, err := ipReader(conn, uri.Query()) if err != nil { conn.Close() From d9335d9cd99690691a2ac5a540c0f8a8377852c8 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Wed, 1 Apr 2020 23:45:45 +0000 Subject: [PATCH 11/15] restructure into streams/datagrams and move unix sockets into socketfiles --- lib/files/plugins/plugins.go | 1 - lib/files/socketfiles/dgram.go | 243 +++++++++++++++++++++++++++++ lib/files/socketfiles/socket.go | 136 ++++++++++------ lib/files/socketfiles/stream.go | 178 +++++++++++++++++++++ lib/files/socketfiles/tcp.go | 77 ++------- lib/files/socketfiles/tcp_test.go | 69 ++++---- lib/files/socketfiles/tcpreader.go | 118 -------------- lib/files/socketfiles/udp.go | 227 +++------------------------ lib/files/socketfiles/udp_test.go | 73 ++++----- lib/files/socketfiles/udpreader.go | 62 -------- lib/files/socketfiles/unixsock.go | 115 ++++++++++++++ lib/files/unixsocket/reader.go | 111 ------------- lib/files/unixsocket/unixsocket.go | 107 ------------- 13 files changed, 721 insertions(+), 796 deletions(-) create mode 100644 lib/files/socketfiles/dgram.go create mode 100644 lib/files/socketfiles/stream.go delete mode 100644 lib/files/socketfiles/tcpreader.go delete mode 100644 lib/files/socketfiles/udpreader.go create mode 100644 lib/files/socketfiles/unixsock.go delete mode 100644 lib/files/unixsocket/reader.go delete mode 100644 lib/files/unixsocket/unixsocket.go diff --git a/lib/files/plugins/plugins.go b/lib/files/plugins/plugins.go index 6f9b62f..a16e75d 100644 --- a/lib/files/plugins/plugins.go +++ b/lib/files/plugins/plugins.go @@ -10,5 +10,4 @@ import ( _ "github.com/puellanivis/breton/lib/files/home" _ "github.com/puellanivis/breton/lib/files/httpfiles" _ "github.com/puellanivis/breton/lib/files/socketfiles" - _ "github.com/puellanivis/breton/lib/files/unixsocket" ) diff --git a/lib/files/socketfiles/dgram.go b/lib/files/socketfiles/dgram.go new file mode 100644 index 0000000..95c862d --- /dev/null +++ b/lib/files/socketfiles/dgram.go @@ -0,0 +1,243 @@ +package socketfiles + +import ( + "context" + "io" + "net" + "os" + "sync" + "time" + + "github.com/puellanivis/breton/lib/files/wrapper" +) + +type datagramWriter struct { + *wrapper.Info + + mu sync.Mutex + + closed chan struct{} + + sock *socket + + noerrs bool + + off int + buf []byte +} + +func (w *datagramWriter) IgnoreErrors(state bool) bool { + w.mu.Lock() + defer w.mu.Unlock() + + prev := w.noerrs + + w.noerrs = state + + return prev +} + +func (w *datagramWriter) err(err error) error { + if w.noerrs && err != io.ErrShortWrite { + return nil + } + + return err +} + +func (w *datagramWriter) SetPacketSize(size int) int { + w.mu.Lock() + defer w.mu.Unlock() + + prev := len(w.buf) + + switch { + case size <= 0: + w.buf = nil + + case size <= len(w.buf): + w.buf = w.buf[:size] + + default: + w.buf = append(w.buf, make([]byte, size-len(w.buf))...) + } + + if w.off > len(w.buf) { + w.off = len(w.buf) + } + + w.sock.packetSize = len(w.buf) + w.sock.updateDelay(len(w.buf)) + + return prev +} + +func (w *datagramWriter) SetBitrate(bitrate int) int { + w.mu.Lock() + defer w.mu.Unlock() + + return w.sock.setBitrate(bitrate, len(w.buf)) +} + +func (w *datagramWriter) Sync() error { + w.mu.Lock() + defer w.mu.Unlock() + + return w.err(w.sync()) +} + +func (w *datagramWriter) sync() error { + if w.off < 1 { + return nil + } + + // zero out the end of the buffer. + for i := w.off; i < len(w.buf); i++ { + w.buf[i] = 0 + } + + _, err := w.writeBuffer() + return err +} + +func (w *datagramWriter) writeBuffer() (n int, err error) { + w.off = 0 + return w.write(w.buf) +} + +func (w *datagramWriter) write(b []byte) (n int, err error) { + // We should have already prescaled the delay, so scale=1 here. + w.sock.throttle(1) + + n, err = w.sock.conn.Write(b) + if n != len(b) { + if (w.noerrs && n > 0) || err == nil { + err = io.ErrShortWrite + } + } + + return n, err +} + +func (w *datagramWriter) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + select { + case <-w.closed: + default: + close(w.closed) + } + + err := w.sync() + + if err2 := w.sock.conn.Close(); err == nil { + err = err2 + } + + return err +} + +func (w *datagramWriter) Write(b []byte) (n int, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + if len(w.buf) < 1 { + w.sock.throttle(len(b)) + + n, err = w.sock.conn.Write(b) + return n, w.err(err) + } + + if w.off > 0 { + n = copy(w.buf[w.off:], b) + w.off += n + + if w.off < len(w.buf) { + // The full length of b was copied into buffer, + // and we haven’t filled the buffer. + // So, we’re done. + return n, nil + } + + n2, err2 := w.writeBuffer() + if err = w.err(err2); err != nil { + if n2 > 0 { + // Should we? + // This could cause loss of packet-alignment from writers? + w.off = copy(w.buf, w.buf[n2:]) + } + + return n, err + } + + b = b[n:] + } + + sz := len(w.buf) + + for len(b) >= sz { + n2, err2 := w.write(b[:sz]) + n += n2 + + if err = w.err(err2); err != nil { + return n, err + } + + // skip the whole packet size, even on a short write. + b = b[sz:] + } + + if len(b) > 0 { + w.off = copy(w.buf, b) + n += w.off + } + + return n, nil +} + +func newDatagramWriter(ctx context.Context, sock *socket) *datagramWriter { + var buf []byte + if sock.packetSize > 0 { + buf = make([]byte, sock.packetSize) + } + + w := &datagramWriter{ + Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), + + closed: make(chan struct{}), + sock: sock, + + buf: buf, + } + + go func() { + select { + case <-w.closed: + case <-ctx.Done(): + w.Close() + } + }() + + return w +} + +type datagramReader struct { + *wrapper.Info + sock *socket + + net.Conn +} + +func (r *datagramReader) Seek(offset int64, whence int) (int64, error) { + return 0, os.ErrInvalid +} + +func newDatagramReader(ctx context.Context, sock *socket) *datagramReader { + return &datagramReader{ + Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), + sock: sock, + + Conn: sock.conn, + } +} diff --git a/lib/files/socketfiles/socket.go b/lib/files/socketfiles/socket.go index e65e7a4..b288303 100644 --- a/lib/files/socketfiles/socket.go +++ b/lib/files/socketfiles/socket.go @@ -28,8 +28,10 @@ const ( FieldTTL = "ttl" ) -type ipSocket struct { - laddr, raddr net.Addr +type socket struct { + conn net.Conn + + addr, qaddr net.Addr bufferSize int packetSize int @@ -39,27 +41,38 @@ type ipSocket struct { throttler } -func (s *ipSocket) uri() *url.URL { +func (s *socket) uri() *url.URL { q := s.uriQuery() - switch laddr := s.laddr.(type) { + switch qaddr := s.qaddr.(type) { case *net.TCPAddr: - q.Set(FieldLocalAddress, laddr.IP.String()) - q.Set(FieldLocalPort, strconv.Itoa(laddr.Port)) + q.Set(FieldLocalAddress, qaddr.IP.String()) + q.Set(FieldLocalPort, strconv.Itoa(qaddr.Port)) case *net.UDPAddr: - q.Set(FieldLocalAddress, laddr.IP.String()) - q.Set(FieldLocalPort, strconv.Itoa(laddr.Port)) + q.Set(FieldLocalAddress, qaddr.IP.String()) + q.Set(FieldLocalPort, strconv.Itoa(qaddr.Port)) + + case *net.UnixAddr: + q.Set(FieldLocalAddress, qaddr.String()) + } + + host, path := s.addr.String(), "" + + switch s.addr.Network() { + case "unix", "unixgram", "unixpacket": + host, path = "", host } return &url.URL{ - Scheme: s.raddr.Network(), - Host: s.raddr.String(), + Scheme: s.addr.Network(), + Host: host, + Path: path, RawQuery: q.Encode(), } } -func (s *ipSocket) uriQuery() url.Values { +func (s *socket) uriQuery() url.Values { q := make(url.Values) if s.bitrate > 0 { @@ -70,22 +83,30 @@ func (s *ipSocket) uriQuery() url.Values { q.Set(FieldBufferSize, strconv.Itoa(s.bufferSize)) } - if s.packetSize > 0 { - q.Set(FieldPacketSize, strconv.Itoa(s.packetSize)) - } + network := s.addr.Network() - if s.tos > 0 { - q.Set(FieldTOS, "0x"+strconv.FormatInt(int64(s.tos), 16)) + switch network { + case "udp", "udp4", "udp6", "unixgram", "unixpacket": + if s.packetSize > 0 { + q.Set(FieldPacketSize, strconv.Itoa(s.packetSize)) + } } - if s.ttl > 0 { - q.Set(FieldTTL, strconv.Itoa(s.ttl)) + switch network { + case "udp", "udp4", "tcp", "tcp4": + if s.tos > 0 { + q.Set(FieldTOS, "0x"+strconv.FormatInt(int64(s.tos), 16)) + } + + if s.ttl > 0 { + q.Set(FieldTTL, strconv.Itoa(s.ttl)) + } } return q } -func ipReader(conn net.Conn, q url.Values) (*ipSocket, error) { +func sockReader(conn net.Conn, q url.Values) (*socket, error) { bufferSize, err := getSize(q, FieldBufferSize) if err != nil { return nil, err @@ -106,14 +127,18 @@ func ipReader(conn net.Conn, q url.Values) (*ipSocket, error) { } } - return &ipSocket{ - laddr: conn.LocalAddr(), + return &socket{ + conn: conn, + + addr: conn.LocalAddr(), bufferSize: bufferSize, }, nil } -func ipWriter(conn net.Conn, showLocalAddr bool, q url.Values) (*ipSocket, error) { +func sockWriter(conn net.Conn, showLocalAddr bool, q url.Values) (*socket, error) { + raddr := conn.RemoteAddr() + bufferSize, err := getSize(q, FieldBufferSize) if err != nil { return nil, err @@ -134,9 +159,13 @@ func ipWriter(conn net.Conn, showLocalAddr bool, q url.Values) (*ipSocket, error } } - packetSize, err := getSize(q, FieldPacketSize) - if err != nil { - return nil, err + var packetSize int + switch raddr.Network() { + case "udp", "udp4", "udp6", "unixgram", "unixpacket": + packetSize, err = getSize(q, FieldPacketSize) + if err != nil { + return nil, err + } } bitrate, err := getSize(q, FieldMaxBitrate) @@ -149,40 +178,45 @@ func ipWriter(conn net.Conn, showLocalAddr bool, q url.Values) (*ipSocket, error t.setBitrate(bitrate, packetSize) } - var p *ipv4.Conn - - tos, err := getInt(q, FieldTOS) - if err != nil { - return nil, err - } + var tos, ttl int - if tos > 0 { - if p == nil { - p = ipv4.NewConn(conn) - } + switch raddr.Network() { + case "udp", "udp4", "tcp", "tcp4": + var p *ipv4.Conn - if err := p.SetTOS(tos); err != nil { + tos, err = getInt(q, FieldTOS) + if err != nil { return nil, err } - tos, _ = p.TOS() - } + if tos > 0 { + if p == nil { + p = ipv4.NewConn(conn) + } - ttl, err := getInt(q, FieldTTL) - if err != nil { - return nil, err - } + if err := p.SetTOS(tos); err != nil { + return nil, err + } - if ttl > 0 { - if p == nil { - p = ipv4.NewConn(conn) + tos, _ = p.TOS() } - if err := p.SetTTL(ttl); err != nil { + ttl, err = getInt(q, FieldTTL) + if err != nil { return nil, err } - ttl, _ = p.TTL() + if ttl > 0 { + if p == nil { + p = ipv4.NewConn(conn) + } + + if err := p.SetTTL(ttl); err != nil { + return nil, err + } + + ttl, _ = p.TTL() + } } var laddr net.Addr @@ -190,9 +224,11 @@ func ipWriter(conn net.Conn, showLocalAddr bool, q url.Values) (*ipSocket, error laddr = conn.LocalAddr() } - return &ipSocket{ - laddr: laddr, - raddr: conn.RemoteAddr(), + return &socket{ + conn: conn, + + addr: raddr, + qaddr: laddr, bufferSize: bufferSize, packetSize: packetSize, diff --git a/lib/files/socketfiles/stream.go b/lib/files/socketfiles/stream.go new file mode 100644 index 0000000..1b8ccc0 --- /dev/null +++ b/lib/files/socketfiles/stream.go @@ -0,0 +1,178 @@ +package socketfiles + +import ( + "context" + "net" + "net/url" + "os" + "sync" + "time" + + "github.com/puellanivis/breton/lib/files" + "github.com/puellanivis/breton/lib/files/wrapper" +) + +type streamWriter struct { + *wrapper.Info + + mu sync.Mutex + + closed chan struct{} + + sock *socket +} + +func (w *streamWriter) SetBitrate(bitrate int) int { + w.mu.Lock() + defer w.mu.Unlock() + + return w.sock.setBitrate(bitrate, 1) +} + +func (w *streamWriter) Sync() error { + return nil +} + +func (w *streamWriter) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + select { + case <-w.closed: + default: + close(w.closed) + } + + return w.sock.conn.Close() +} + +func (w *streamWriter) Write(b []byte) (n int, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + w.sock.throttle(len(b)) + + return w.sock.conn.Write(b) +} + +func (w *streamWriter) uri() *url.URL { + return w.sock.uri() +} + +func newStreamWriter(ctx context.Context, sock *socket) *streamWriter { + w := &streamWriter{ + Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), + + closed: make(chan struct{}), + sock: sock, + } + + go func() { + select { + case <-w.closed: + case <-ctx.Done(): + w.Close() + } + }() + + return w +} + +type streamReader struct { + *wrapper.Info + + loading <-chan struct{} + + err error + conn net.Conn +} + +func (r *streamReader) Read(b []byte) (n int, err error) { + for range r.loading { + } + + if r.err != nil { + return 0, r.err + } + + return r.conn.Read(b) +} + +func (r *streamReader) Seek(offset int64, whence int) (int64, error) { + return 0, os.ErrInvalid +} + +func (r *streamReader) Close() error { + for range r.loading { + } + + // Never connected, so just return nil. + if r.conn == nil { + return nil + } + + // Ignore the r.err, as it is a request-scope error, and not relevant to closing. + + return r.conn.Close() +} + +func newStreamReader(ctx context.Context, l net.Listener) (*streamReader, error) { + // Maybe we asked for an arbitrary port, + // so, refresh our address to the one we’re actually listening on. + laddr := l.Addr() + + host, path := laddr.String(), "" + switch laddr.Network() { + case "unix": + host, path = "", host + } + + uri := &url.URL{ + Scheme: laddr.Network(), + Host: host, + Path: path, + } + + loading := make(chan struct{}) + r := &streamReader{ + loading: loading, + Info: wrapper.NewInfo(uri, 0, time.Now()), + } + + go func() { + defer close(loading) + defer l.Close() + + select { + case loading <- struct{}{}: + case <-ctx.Done(): + r.err = files.PathError("open", uri.String(), ctx.Err()) + return + } + + var conn net.Conn + accept := func() error { + var err error + + conn, err = l.Accept() + + return err + } + + if err := do(ctx, accept); err != nil { + r.err = files.PathError("accept", uri.String(), err) + return + } + + // TODO: make the a configurable option? + /* if err := conn.CloseWrite(); err != nil { + conn.Close() + r.err = err + return + } */ + + r.conn = conn + }() + + return r, nil +} diff --git a/lib/files/socketfiles/tcp.go b/lib/files/socketfiles/tcp.go index 7b976e2..83cb8f4 100644 --- a/lib/files/socketfiles/tcp.go +++ b/lib/files/socketfiles/tcp.go @@ -5,11 +5,8 @@ import ( "net" "net/url" "os" - "sync" - "time" "github.com/puellanivis/breton/lib/files" - "github.com/puellanivis/breton/lib/files/wrapper" ) type tcpHandler struct{} @@ -18,52 +15,22 @@ func init() { files.RegisterScheme(&tcpHandler{}, "tcp") } -type tcpWriter struct { - *wrapper.Info - conn *net.TCPConn - - mu sync.Mutex - - closed chan struct{} - - sock *ipSocket -} - -func (w *tcpWriter) SetBitrate(bitrate int) int { - w.mu.Lock() - defer w.mu.Unlock() - - return w.sock.setBitrate(bitrate, 1) -} - -func (w *tcpWriter) Sync() error { - return nil -} - -func (w *tcpWriter) Close() error { - w.mu.Lock() - defer w.mu.Unlock() - - select { - case <-w.closed: - default: - close(w.closed) +func (h *tcpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) { + if uri.Host == "" { + return nil, files.PathError("open", uri.String(), errInvalidURL) } - return w.conn.Close() -} - -func (w *tcpWriter) Write(b []byte) (n int, err error) { - w.mu.Lock() - defer w.mu.Unlock() - - w.sock.throttle(len(b)) + laddr, err := net.ResolveTCPAddr("tcp", uri.Host) + if err != nil { + return nil, files.PathError("open", uri.String(), err) + } - return w.conn.Write(b) -} + l, err := net.ListenTCP("tcp", laddr) + if err != nil { + return nil, files.PathError("open", uri.String(), err) + } -func (w *tcpWriter) uri() *url.URL { - return w.sock.uri() + return newStreamReader(ctx, l) } func (h *tcpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, error) { @@ -102,29 +69,13 @@ func (h *tcpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er return nil, files.PathError("create", uri.String(), err) } - sock, err := ipWriter(conn, laddr != nil, q) + sock, err := sockWriter(conn, laddr != nil, q) if err != nil { conn.Close() return nil, files.PathError("create", uri.String(), err) } - w := &tcpWriter{ - Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), - conn: conn, - - closed: make(chan struct{}), - sock: sock, - } - - go func() { - select { - case <-w.closed: - case <-ctx.Done(): - w.Close() - } - }() - - return w, nil + return newStreamWriter(ctx, sock), nil } func (h *tcpHandler) List(ctx context.Context, uri *url.URL) ([]os.FileInfo, error) { diff --git a/lib/files/socketfiles/tcp_test.go b/lib/files/socketfiles/tcp_test.go index 2557ddf..63f18f3 100644 --- a/lib/files/socketfiles/tcp_test.go +++ b/lib/files/socketfiles/tcp_test.go @@ -9,63 +9,60 @@ import ( ) func TestTCPName(t *testing.T) { - w := &tcpWriter{ - sock: &ipSocket{ - laddr: &net.TCPAddr{ - IP: []byte{127, 0, 0, 1}, - Port: 65535, - }, - raddr: &net.TCPAddr{ - IP: []byte{127, 0, 0, 2}, - Port: 80, - }, - bufferSize: 1024, - ttl: 100, - tos: 0x80, - - throttler: throttler{ - bitrate: 2048, - }, + sock := &socket{ + qaddr: &net.TCPAddr{ + IP: []byte{127, 0, 0, 1}, + Port: 65535, + }, + addr: &net.TCPAddr{ + IP: []byte{127, 0, 0, 2}, + Port: 80, + }, + + packetSize: 188, // should not show up + bufferSize: 1024, + + ttl: 100, + tos: 0x80, + + throttler: throttler{ + bitrate: 2048, }, } - uri := w.uri() + uri := sock.uri() expected := "tcp://127.0.0.2:80?buffer_size=1024&localaddr=127.0.0.1&localport=65535&max_bitrate=2048&tos=0x80&ttl=100" if s := uri.String(); s != expected { t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) } - w = &tcpWriter{ - sock: &ipSocket{ - laddr: &net.TCPAddr{ - IP: []byte{127, 0, 0, 1}, - Port: 65534, - }, - raddr: &net.TCPAddr{ - IP: []byte{127, 0, 0, 2}, - Port: 443, - }, + sock = &socket{ + qaddr: &net.TCPAddr{ + IP: []byte{127, 0, 0, 1}, + Port: 65534, + }, + addr: &net.TCPAddr{ + IP: []byte{127, 0, 0, 2}, + Port: 443, }, } - uri = w.uri() + uri = sock.uri() expected = "tcp://127.0.0.2:443?localaddr=127.0.0.1&localport=65534" if s := uri.String(); s != expected { t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) } - w = &tcpWriter{ - sock: &ipSocket{ - raddr: &net.TCPAddr{ - IP: []byte{127, 0, 0, 2}, - Port: 8080, - }, + sock = &socket{ + addr: &net.TCPAddr{ + IP: []byte{127, 0, 0, 2}, + Port: 8080, }, } - uri = w.uri() + uri = sock.uri() expected = "tcp://127.0.0.2:8080" if s := uri.String(); s != expected { diff --git a/lib/files/socketfiles/tcpreader.go b/lib/files/socketfiles/tcpreader.go deleted file mode 100644 index 2711e9f..0000000 --- a/lib/files/socketfiles/tcpreader.go +++ /dev/null @@ -1,118 +0,0 @@ -package socketfiles - -import ( - "context" - "net" - "net/url" - "os" - "time" - - "github.com/puellanivis/breton/lib/files" - "github.com/puellanivis/breton/lib/files/wrapper" -) - -type tcpReader struct { - *wrapper.Info - - loading <-chan struct{} - - err error - conn *net.TCPConn -} - -func (r *tcpReader) Read(b []byte) (n int, err error) { - for range r.loading { - } - - if r.err != nil { - return 0, r.err - } - - return r.conn.Read(b) -} - -func (r *tcpReader) Seek(offset int64, whence int) (int64, error) { - return 0, os.ErrInvalid -} - -func (r *tcpReader) Close() error { - for range r.loading { - } - - // Never connected, so just return nil. - if r.conn == nil { - return nil - } - - // Ignore the r.err, as it is a request-scope error, and not relevant to closing. - - return r.conn.Close() -} - -func (h *tcpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) { - if uri.Host == "" { - return nil, files.PathError("open", uri.String(), errInvalidURL) - } - - laddr, err := net.ResolveTCPAddr("tcp", uri.Host) - if err != nil { - return nil, files.PathError("open", uri.String(), err) - } - - l, err := net.ListenTCP("tcp", laddr) - if err != nil { - return nil, files.PathError("open", uri.String(), err) - } - - // Maybe we asked for an arbitrary port, - // so, refresh our address to the one we’re actually listening on. - laddr = l.Addr().(*net.TCPAddr) - - uri = &url.URL{ - Scheme: laddr.Network(), - Host: laddr.String(), - } - - loading := make(chan struct{}) - r := &tcpReader{ - loading: loading, - Info: wrapper.NewInfo(uri, 0, time.Now()), - } - - go func() { - defer close(loading) - defer l.Close() - - select { - case loading <- struct{}{}: - case <-ctx.Done(): - r.err = files.PathError("open", uri.String(), ctx.Err()) - return - } - - var conn *net.TCPConn - accept := func() error { - var err error - - conn, err = l.AcceptTCP() - - return err - } - - if err := do(ctx, accept); err != nil { - r.err = files.PathError("accept", uri.String(), err) - return - } - - // TODO: make the a configurable option? - /* if err := conn.CloseWrite(); err != nil { - conn.Close() - r.err = err - return - } */ - - r.conn = conn - }() - - return r, nil -} diff --git a/lib/files/socketfiles/udp.go b/lib/files/socketfiles/udp.go index fd31a3a..47e0e54 100644 --- a/lib/files/socketfiles/udp.go +++ b/lib/files/socketfiles/udp.go @@ -2,15 +2,11 @@ package socketfiles import ( "context" - "io" "net" "net/url" "os" - "sync" - "time" "github.com/puellanivis/breton/lib/files" - "github.com/puellanivis/breton/lib/files/wrapper" ) type udpHandler struct{} @@ -19,194 +15,32 @@ func init() { files.RegisterScheme(&udpHandler{}, "udp") } -type udpWriter struct { - *wrapper.Info - conn *net.UDPConn - - mu sync.Mutex - - closed chan struct{} - - sock *ipSocket - - noerrs bool - - off int - buf []byte -} - -func (w *udpWriter) IgnoreErrors(state bool) bool { - w.mu.Lock() - defer w.mu.Unlock() - - prev := w.noerrs - - w.noerrs = state - - return prev -} - -func (w *udpWriter) err(err error) error { - if w.noerrs && err != io.ErrShortWrite { - return nil - } - - return err -} - -func (w *udpWriter) SetPacketSize(size int) int { - w.mu.Lock() - defer w.mu.Unlock() - - prev := len(w.buf) - - switch { - case size <= 0: - w.buf = nil - - case size <= len(w.buf): - w.buf = w.buf[:size] - - default: - w.buf = append(w.buf, make([]byte, size-len(w.buf))...) - } - - if w.off > len(w.buf) { - w.off = len(w.buf) - } - - w.sock.packetSize = len(w.buf) - w.sock.updateDelay(len(w.buf)) - - return prev -} - -func (w *udpWriter) SetBitrate(bitrate int) int { - w.mu.Lock() - defer w.mu.Unlock() - - return w.sock.setBitrate(bitrate, len(w.buf)) -} - -func (w *udpWriter) Sync() error { - w.mu.Lock() - defer w.mu.Unlock() - - return w.err(w.sync()) -} - -func (w *udpWriter) sync() error { - if w.off < 1 { - return nil - } - - // zero out the end of the buffer. - for i := w.off; i < len(w.buf); i++ { - w.buf[i] = 0 - } - - _, err := w.writeBuffer() - return err -} - -func (w *udpWriter) writeBuffer() (n int, err error) { - w.off = 0 - return w.write(w.buf) -} - -func (w *udpWriter) write(b []byte) (n int, err error) { - // We should have already prescaled the delay, so scale=1 here. - w.sock.throttle(1) - - n, err = w.conn.Write(b) - if n != len(b) { - if (w.noerrs && n > 0) || err == nil { - err = io.ErrShortWrite - } - } - - return n, err -} - -func (w *udpWriter) Close() error { - w.mu.Lock() - defer w.mu.Unlock() - - select { - case <-w.closed: - default: - close(w.closed) - } - - err := w.sync() - - if err2 := w.conn.Close(); err == nil { - err = err2 +func (h *udpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) { + if uri.Host == "" { + return nil, files.PathError("open", uri.String(), errInvalidURL) } - return err -} - -func (w *udpWriter) Write(b []byte) (n int, err error) { - w.mu.Lock() - defer w.mu.Unlock() - - if len(w.buf) < 1 { - w.sock.throttle(len(b)) - - n, err = w.conn.Write(b) - return n, w.err(err) + laddr, err := net.ResolveUDPAddr("udp", uri.Host) + if err != nil { + return nil, files.PathError("open", uri.String(), err) } - if w.off > 0 { - n = copy(w.buf[w.off:], b) - w.off += n - - if w.off < len(w.buf) { - // The full length of b was copied into buffer, - // and we haven’t filled the buffer. - // So, we’re done. - return n, nil - } - - n2, err2 := w.writeBuffer() - if err = w.err(err2); err != nil { - if n2 > 0 { - // Should we? - // This could cause loss of packet-alignment from writers? - w.off = copy(w.buf, w.buf[n2:]) - } - - return n, err - } - - b = b[n:] + conn, err := net.ListenUDP("udp", laddr) + if err != nil { + return nil, files.PathError("open", uri.String(), err) } - sz := len(w.buf) - - for len(b) >= sz { - n2, err2 := w.write(b[:sz]) - n += n2 - - if err = w.err(err2); err != nil { - return n, err - } - - // skip the whole packet size, even on a short write. - b = b[sz:] - } + // Maybe we asked for an arbitrary port, + // so, refresh our address to the one we’re actually listening on. + laddr = conn.LocalAddr().(*net.UDPAddr) - if len(b) > 0 { - w.off = copy(w.buf, b) - n += w.off + sock, err := sockReader(conn, uri.Query()) + if err != nil { + conn.Close() + return nil, files.PathError("open", uri.String(), err) } - return n, nil -} - -func (w *udpWriter) uri() *url.URL { - return w.sock.uri() + return newDatagramReader(ctx, sock), nil } func (h *udpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, error) { @@ -245,36 +79,13 @@ func (h *udpHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, er return nil, files.PathError("create", uri.String(), err) } - sock, err := ipWriter(conn, laddr != nil, q) + sock, err := sockWriter(conn, laddr != nil, q) if err != nil { conn.Close() return nil, files.PathError("create", uri.String(), err) } - var buf []byte - if sock.packetSize > 0 { - buf = make([]byte, sock.packetSize) - } - - w := &udpWriter{ - Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), - conn: conn, - - closed: make(chan struct{}), - sock: sock, - - buf: buf, - } - - go func() { - select { - case <-w.closed: - case <-ctx.Done(): - w.Close() - } - }() - - return w, nil + return newDatagramWriter(ctx, sock), nil } func (h *udpHandler) List(ctx context.Context, uri *url.URL) ([]os.FileInfo, error) { diff --git a/lib/files/socketfiles/udp_test.go b/lib/files/socketfiles/udp_test.go index d7e07bc..cc8c7ed 100644 --- a/lib/files/socketfiles/udp_test.go +++ b/lib/files/socketfiles/udp_test.go @@ -9,67 +9,60 @@ import ( ) func TestUDPName(t *testing.T) { - w := &udpWriter{ - sock: &ipSocket{ - laddr: &net.UDPAddr{ - IP: []byte{127, 0, 0, 1}, - Port: 65535, - }, - raddr: &net.UDPAddr{ - IP: []byte{127, 0, 0, 2}, - Port: 80, - }, - - bufferSize: 1024, - - ttl: 100, - tos: 0x80, - - throttler: throttler{ - bitrate: 2048, - }, + sock := &socket{ + qaddr: &net.UDPAddr{ + IP: []byte{127, 0, 0, 1}, + Port: 65535, + }, + addr: &net.UDPAddr{ + IP: []byte{127, 0, 0, 2}, + Port: 80, }, - } - w.SetPacketSize(188) + packetSize: 188, + bufferSize: 1024, - uri := w.uri() + ttl: 100, + tos: 0x80, + + throttler: throttler{ + bitrate: 2048, + }, + } + + uri := sock.uri() expected := "udp://127.0.0.2:80?buffer_size=1024&localaddr=127.0.0.1&localport=65535&max_bitrate=2048&pkt_size=188&tos=0x80&ttl=100" if s := uri.String(); s != expected { t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) } - w = &udpWriter{ - sock: &ipSocket{ - laddr: &net.UDPAddr{ - IP: []byte{127, 0, 0, 1}, - Port: 65534, - }, - raddr: &net.UDPAddr{ - IP: []byte{127, 0, 0, 2}, - Port: 443, - }, + sock = &socket{ + qaddr: &net.UDPAddr{ + IP: []byte{127, 0, 0, 1}, + Port: 65534, + }, + addr: &net.UDPAddr{ + IP: []byte{127, 0, 0, 2}, + Port: 443, }, } - uri = w.uri() + uri = sock.uri() expected = "udp://127.0.0.2:443?localaddr=127.0.0.1&localport=65534" if s := uri.String(); s != expected { t.Errorf("got a bad URI, was expecting, but got:\n\t%v\n\t%v", expected, s) } - w = &udpWriter{ - sock: &ipSocket{ - raddr: &net.UDPAddr{ - IP: []byte{127, 0, 0, 2}, - Port: 8080, - }, + sock = &socket{ + addr: &net.UDPAddr{ + IP: []byte{127, 0, 0, 2}, + Port: 8080, }, } - uri = w.uri() + uri = sock.uri() expected = "udp://127.0.0.2:8080" if s := uri.String(); s != expected { diff --git a/lib/files/socketfiles/udpreader.go b/lib/files/socketfiles/udpreader.go deleted file mode 100644 index 5ccf2bc..0000000 --- a/lib/files/socketfiles/udpreader.go +++ /dev/null @@ -1,62 +0,0 @@ -package socketfiles - -import ( - "context" - "net" - "net/url" - "os" - "time" - - "github.com/puellanivis/breton/lib/files" - "github.com/puellanivis/breton/lib/files/wrapper" -) - -type udpReader struct { - *wrapper.Info - sock *ipSocket - - *net.UDPConn -} - -func (r *udpReader) Seek(offset int64, whence int) (int64, error) { - return 0, os.ErrInvalid -} - -func (h *udpHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) { - if uri.Host == "" { - return nil, files.PathError("open", uri.String(), errInvalidURL) - } - - laddr, err := net.ResolveUDPAddr("udp", uri.Host) - if err != nil { - return nil, files.PathError("open", uri.String(), err) - } - - conn, err := net.ListenUDP("udp", laddr) - if err != nil { - return nil, files.PathError("open", uri.String(), err) - } - - // Maybe we asked for an arbitrary port, - // so, refresh our address to the one we’re actually listening on. - laddr = conn.LocalAddr().(*net.UDPAddr) - - sock, err := ipReader(conn, uri.Query()) - if err != nil { - conn.Close() - return nil, files.PathError("open", uri.String(), err) - } - - uri = &url.URL{ - Scheme: laddr.Network(), - Host: laddr.String(), - RawQuery: sock.uriQuery().Encode(), - } - - return &udpReader{ - Info: wrapper.NewInfo(uri, 0, time.Now()), - sock: sock, - - UDPConn: conn, - }, nil -} diff --git a/lib/files/socketfiles/unixsock.go b/lib/files/socketfiles/unixsock.go new file mode 100644 index 0000000..8f3aedb --- /dev/null +++ b/lib/files/socketfiles/unixsock.go @@ -0,0 +1,115 @@ +package socketfiles + +import ( + "context" + "errors" + "net" + "net/url" + "os" + + "github.com/puellanivis/breton/lib/files" +) + +type unixHandler struct{} + +func init() { + files.RegisterScheme(&unixHandler{}, "unix", "unixgram") +} + +func (h *unixHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) { + path := uri.Path + if path == "" { + path = uri.Opaque + } + network := uri.Scheme + + laddr, err := net.ResolveUnixAddr(network, path) + if err != nil { + return nil, files.PathError("open", uri.String(), err) + } + + switch laddr.Network() { + case "unixgram": + conn, err := net.ListenUnixgram(network, laddr) + if err != nil { + return nil, files.PathError("open", uri.String(), err) + } + + sock, err := sockReader(conn, uri.Query()) + if err != nil { + conn.Close() + return nil, files.PathError("open", uri.String(), err) + } + + return newDatagramReader(ctx, sock), nil + + case "unix", "unixpacket": + l, err := net.ListenUnix(network, laddr) + if err != nil { + return nil, files.PathError("open", uri.String(), err) + } + + return newStreamReader(ctx, l) + } + + return nil, files.PathError("create", uri.String(), errors.New("unknown unix socket type")) +} + +func (h *unixHandler) Create(ctx context.Context, uri *url.URL) (files.Writer, error) { + path := uri.Path + if path == "" { + path = uri.Opaque + } + network := uri.Scheme + + raddr, err := net.ResolveUnixAddr(network, path) + if err != nil { + return nil, err + } + + q := uri.Query() + + var laddr *net.UnixAddr + + addr := q.Get(FieldLocalAddress) + if addr != "" { + laddr, err = net.ResolveUnixAddr(network, addr) + if err != nil { + return nil, files.PathError("create", uri.String(), err) + } + } + + var conn *net.UnixConn + dial := func() error { + var err error + + conn, err = net.DialUnix(network, laddr, raddr) + + return err + } + + if err := do(ctx, dial); err != nil { + return nil, files.PathError("create", uri.String(), err) + } + + sock, err := sockWriter(conn, laddr != nil, q) + if err != nil { + conn.Close() + return nil, files.PathError("create", uri.String(), err) + } + + switch network { + case "unix": + return newStreamWriter(ctx, sock), nil + + case "unixgram", "unixpacket": + return newDatagramWriter(ctx, sock), nil + } + + conn.Close() + return nil, files.PathError("create", uri.String(), errors.New("unknown unix socket type")) +} + +func (h *unixHandler) List(ctx context.Context, uri *url.URL) ([]os.FileInfo, error) { + return nil, files.PathError("readdir", uri.String(), os.ErrInvalid) +} diff --git a/lib/files/unixsocket/reader.go b/lib/files/unixsocket/reader.go deleted file mode 100644 index bdbedc4..0000000 --- a/lib/files/unixsocket/reader.go +++ /dev/null @@ -1,111 +0,0 @@ -package unixsocket - -import ( - "context" - "net" - "net/url" - "os" - "time" - - "github.com/puellanivis/breton/lib/files" - "github.com/puellanivis/breton/lib/files/wrapper" -) - -type reader struct { - *wrapper.Info - - loading <-chan struct{} - - err error - conn *net.UnixConn -} - -func (r *reader) Read(b []byte) (n int, err error) { - for range r.loading { - } - - if r.err != nil { - return 0, r.err - } - - return r.conn.Read(b) -} - -func (r *reader) Seek(offset int64, whence int) (int64, error) { - return 0, os.ErrInvalid -} - -func (r *reader) Close() error { - for range r.loading { - } - - // Never connected, so just return nil. - if r.conn == nil { - return nil - } - - // Ignore the r.err, as it is a request-scope error, and not relevant to closing. - - return r.conn.Close() -} - -func (h *handler) Open(ctx context.Context, uri *url.URL) (files.Reader, error) { - path := uri.Path - if path == "" { - path = uri.Opaque - } - - laddr, err := net.ResolveUnixAddr("unix", path) - if err != nil { - return nil, err - } - - l, err := net.ListenUnix("unix", laddr) - if err != nil { - return nil, err - } - - // Make sure we are setting our file name to the actual address we’re listening on. - laddr = l.Addr().(*net.UnixAddr) - - uri = &url.URL{ - Scheme: laddr.Network(), - Path: laddr.String(), - } - - loading := make(chan struct{}) - r := &reader{ - loading: loading, - Info: wrapper.NewInfo(uri, 0, time.Now()), - } - - go func() { - defer close(loading) - defer l.Close() - - select { - case loading <- struct{}{}: - case <-ctx.Done(): - r.err = files.PathError("open", uri.String(), ctx.Err()) - return - } - - var conn *net.UnixConn - accept := func() error { - var err error - - conn, err = l.AcceptUnix() - - return err - } - - if err := do(ctx, accept); err != nil { - r.err = files.PathError("accept", uri.String(), err) - return - } - - r.conn = conn - }() - - return r, nil -} diff --git a/lib/files/unixsocket/unixsocket.go b/lib/files/unixsocket/unixsocket.go deleted file mode 100644 index 21219f7..0000000 --- a/lib/files/unixsocket/unixsocket.go +++ /dev/null @@ -1,107 +0,0 @@ -// Package unixsocket implements the "unix:" URL scheme, by reading/writing to a raw unix socket. -package unixsocket - -import ( - "context" - "net" - "net/url" - "os" - "time" - - "github.com/puellanivis/breton/lib/files" - "github.com/puellanivis/breton/lib/files/wrapper" -) - -type handler struct{} - -func init() { - files.RegisterScheme(&handler{}, "unix") -} - -type writer struct { - *wrapper.Info - *net.UnixConn -} - -func (w *writer) Sync() error { return nil } - -// URL query field keys. -const ( - FieldLocalAddress = "local_addr" -) - -func (h *handler) Create(ctx context.Context, uri *url.URL) (files.Writer, error) { - path := uri.Path - if path == "" { - path = uri.Opaque - } - - raddr, err := net.ResolveUnixAddr("unix", path) - if err != nil { - return nil, err - } - - var laddr *net.UnixAddr - - q := uri.Query() - if addr := q.Get(FieldLocalAddress); addr != "" { - laddr, err = net.ResolveUnixAddr("unix", addr) - if err != nil { - return nil, err - } - } - - var conn *net.UnixConn - dial := func() error { - var err error - - conn, err = net.DialUnix("unix", laddr, raddr) - - return err - } - - if err := do(ctx, dial); err != nil { - return nil, files.PathError("create", uri.String(), err) - } - - q = make(url.Values) - if laddr != nil { - q.Set(FieldLocalAddress, laddr.String()) - } - - uri = &url.URL{ - Scheme: raddr.Network(), - Path: raddr.String(), - RawQuery: q.Encode(), - } - - w := &writer{ - Info: wrapper.NewInfo(uri, 0, time.Now()), - UnixConn: conn, - } - - return w, nil -} - -func (h *handler) List(ctx context.Context, uri *url.URL) ([]os.FileInfo, error) { - return nil, files.PathError("readdir", uri.String(), os.ErrInvalid) -} - -func do(ctx context.Context, fn func() error) error { - done := make(chan struct{}) - - var err error - go func() { - defer close(done) - - err = fn() - }() - - select { - case <-done: - case <-ctx.Done(): - return ctx.Err() - } - - return err -} From 44d200ea5842abd2c850f52eb923f18b22d2a5df Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Fri, 24 Apr 2020 14:36:29 +0000 Subject: [PATCH 12/15] remove unused function --- lib/files/socketfiles/socket.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/lib/files/socketfiles/socket.go b/lib/files/socketfiles/socket.go index b288303..d5d1e15 100644 --- a/lib/files/socketfiles/socket.go +++ b/lib/files/socketfiles/socket.go @@ -285,26 +285,6 @@ func getInt(q url.Values, field string) (val int, err error) { return int(i), nil } -func buildAddr(addr, portString string) (ip net.IP, port int, err error) { - if addr != "" { - ip = net.ParseIP(addr) - if ip == nil { - return nil, 0, errInvalidIP - } - } - - if portString != "" { - p, err := strconv.ParseInt(portString, 10, strconv.IntSize) - if err != nil { - return nil, 0, err - } - - port = int(p) - } - - return ip, port, nil -} - func do(ctx context.Context, fn func() error) error { done := make(chan struct{}) From 91550d6752039475972c123adc84a0f3f3f99ff8 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Fri, 24 Apr 2020 14:37:16 +0000 Subject: [PATCH 13/15] polish --- lib/files/socketfiles/dgram.go | 45 +++++++++++----------------------- 1 file changed, 14 insertions(+), 31 deletions(-) diff --git a/lib/files/socketfiles/dgram.go b/lib/files/socketfiles/dgram.go index 95c862d..78ffc1c 100644 --- a/lib/files/socketfiles/dgram.go +++ b/lib/files/socketfiles/dgram.go @@ -15,15 +15,13 @@ type datagramWriter struct { *wrapper.Info mu sync.Mutex - closed chan struct{} - sock *socket - noerrs bool - off int buf []byte + + sock *socket } func (w *datagramWriter) IgnoreErrors(state bool) bool { @@ -83,24 +81,21 @@ func (w *datagramWriter) Sync() error { w.mu.Lock() defer w.mu.Unlock() - return w.err(w.sync()) + _, err := w.sync() + return w.err(err) } -func (w *datagramWriter) sync() error { +func (w *datagramWriter) sync() (n int, err error) { if w.off < 1 { - return nil + return 0, nil } // zero out the end of the buffer. - for i := w.off; i < len(w.buf); i++ { - w.buf[i] = 0 + b := w.buf[w.off:] + for i := range b { + b[i] = 0 } - _, err := w.writeBuffer() - return err -} - -func (w *datagramWriter) writeBuffer() (n int, err error) { w.off = 0 return w.write(w.buf) } @@ -129,7 +124,7 @@ func (w *datagramWriter) Close() error { close(w.closed) } - err := w.sync() + _, err := w.sync() if err2 := w.sock.conn.Close(); err == nil { err = err2 @@ -160,14 +155,8 @@ func (w *datagramWriter) Write(b []byte) (n int, err error) { return n, nil } - n2, err2 := w.writeBuffer() + _, err2 := w.sync() if err = w.err(err2); err != nil { - if n2 > 0 { - // Should we? - // This could cause loss of packet-alignment from writers? - w.off = copy(w.buf, w.buf[n2:]) - } - return n, err } @@ -175,7 +164,6 @@ func (w *datagramWriter) Write(b []byte) (n int, err error) { } sz := len(w.buf) - for len(b) >= sz { n2, err2 := w.write(b[:sz]) n += n2 @@ -184,7 +172,7 @@ func (w *datagramWriter) Write(b []byte) (n int, err error) { return n, err } - // skip the whole packet size, even on a short write. + // skip the whole packet size, even if n2 < sz b = b[sz:] } @@ -204,11 +192,10 @@ func newDatagramWriter(ctx context.Context, sock *socket) *datagramWriter { w := &datagramWriter{ Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), + sock: sock, closed: make(chan struct{}), - sock: sock, - - buf: buf, + buf: buf, } go func() { @@ -224,8 +211,6 @@ func newDatagramWriter(ctx context.Context, sock *socket) *datagramWriter { type datagramReader struct { *wrapper.Info - sock *socket - net.Conn } @@ -236,8 +221,6 @@ func (r *datagramReader) Seek(offset int64, whence int) (int64, error) { func newDatagramReader(ctx context.Context, sock *socket) *datagramReader { return &datagramReader{ Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), - sock: sock, - Conn: sock.conn, } } From 00270b6aef81fa7a8d61ffe6cdc765e79e8f8899 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Fri, 24 Apr 2020 14:37:48 +0000 Subject: [PATCH 14/15] reorg fields --- lib/files/socketfiles/stream.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/files/socketfiles/stream.go b/lib/files/socketfiles/stream.go index 1b8ccc0..01c5bdf 100644 --- a/lib/files/socketfiles/stream.go +++ b/lib/files/socketfiles/stream.go @@ -16,7 +16,6 @@ type streamWriter struct { *wrapper.Info mu sync.Mutex - closed chan struct{} sock *socket @@ -62,9 +61,9 @@ func (w *streamWriter) uri() *url.URL { func newStreamWriter(ctx context.Context, sock *socket) *streamWriter { w := &streamWriter{ Info: wrapper.NewInfo(sock.uri(), 0, time.Now()), + sock: sock, closed: make(chan struct{}), - sock: sock, } go func() { @@ -135,8 +134,9 @@ func newStreamReader(ctx context.Context, l net.Listener) (*streamReader, error) loading := make(chan struct{}) r := &streamReader{ + Info: wrapper.NewInfo(uri, 0, time.Now()), + loading: loading, - Info: wrapper.NewInfo(uri, 0, time.Now()), } go func() { From 7baac1e004c274af3aefdb97c02b96f26c9ae05c Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Fri, 24 Apr 2020 14:38:15 +0000 Subject: [PATCH 15/15] do not have until we actually support it --- lib/files/socketfiles/unixsock.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/files/socketfiles/unixsock.go b/lib/files/socketfiles/unixsock.go index 8f3aedb..ea1c630 100644 --- a/lib/files/socketfiles/unixsock.go +++ b/lib/files/socketfiles/unixsock.go @@ -43,7 +43,7 @@ func (h *unixHandler) Open(ctx context.Context, uri *url.URL) (files.Reader, err return newDatagramReader(ctx, sock), nil - case "unix", "unixpacket": + case "unix": l, err := net.ListenUnix(network, laddr) if err != nil { return nil, files.PathError("open", uri.String(), err)