Skip to content

Commit

Permalink
Initial porting attempt using fuweid's idea
Browse files Browse the repository at this point in the history
Very ugly PoC for now

Reference:
- etcd-io#17985 (comment)

Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
  • Loading branch information
henrybear327 committed May 15, 2024
1 parent 03b69ed commit c37801f
Showing 1 changed file with 123 additions and 12 deletions.
135 changes: 123 additions & 12 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"log"
"math/bits"
mrand "math/rand"
"net"
Expand Down Expand Up @@ -211,6 +212,8 @@ type server struct {

blackholePeerMap map[int]uint8 // port number, blackhole type
blackholePeerMapMu sync.RWMutex

tmp *http.Server
}

// NewServer returns a proxy implementation with no iptables/tc dependencies.
Expand Down Expand Up @@ -278,25 +281,129 @@ func NewServer(cfg ServerConfig) Server {
addr = s.from.Host
}

var ln net.Listener
var err error
// var ln net.Listener
// var err error
// if !s.tlsInfo.Empty() {
// ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo)
// } else {
// ln, err = net.Listen(s.from.Scheme, addr)
// }
// if err != nil {
// s.errc <- err
// s.Close()
// return s
// }
// s.listener = ln

s.closeWg.Add(1)
// go s.listenAndServe()

handler := &serverHandler{
closeWg: &s.closeWg,
s: s,
}

if !s.tlsInfo.Empty() {
ln, err = transport.NewListener(addr, s.from.Scheme, &s.tlsInfo)
go func() {
defer s.closeWg.Done()

close(s.readyc)
http.ListenAndServeTLS(addr, s.tlsInfo.CertFile, s.tlsInfo.KeyFile, handler)
}()
} else {
ln, err = net.Listen(s.from.Scheme, addr)
s.tmp = startHttpServer(&s.closeWg, addr, handler)
close(s.readyc)
}

s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To()))
return s
}

type serverHandler struct {
closeWg *sync.WaitGroup

s *server
}

func startHttpServer(wg *sync.WaitGroup, addr string, handler *serverHandler) *http.Server {
srv := &http.Server{Addr: addr}
srv.Handler = handler

go func() {
defer wg.Done() // let main know we are done cleaning up

// always returns error. ErrServerClosed on graceful close
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
// unexpected error. port in use?
log.Fatalf("ListenAndServe(): %v", err)
}
}()

// returning reference so caller can call Shutdown()
return srv
}

func (s *serverHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
hijacker, _ := resp.(http.Hijacker)
conn, _, err := hijacker.Hijack()
if err != nil {
s.errc <- err
s.Close()
return s
// write error back to conn
return
}
s.listener = ln

s.closeWg.Add(1)
go s.listenAndServe()
if req.Method == "CONNECT" {
// for CONNECT, we need to send 200 response back first
conn.Write([]byte("HTTP/1.0 200 Connection established\r\n\r\n"))
} // else {
// for plain http, just act as agent to forward request to target host
// }

s.lg.Info("started proxying", zap.String("from", s.From()), zap.String("to", s.To()))
return s
// dial to target host
targetConn, err := net.Dial("tcp", req.URL.Host)
if err != nil {
// write error back to conn
return
}

// perform normal operation
// go io.Copy(targetConn, conn)
// go io.Copy(conn, targetConn)

var dstPort int
dstPort, err = getPort(targetConn.RemoteAddr())
if err != nil {
select {
case s.s.errc <- err:
select {
case <-s.s.donec:
return
default:
}
case <-s.s.donec:
return
}
s.s.lg.Debug("failed to parse port in transmit", zap.Error(err))
return
}

out := targetConn
in := conn

s.closeWg.Add(2)
go func() {
defer s.closeWg.Done()
// read incoming bytes from listener, dispatch to outgoing connection
s.s.transmit(out, in, dstPort)
out.Close()
in.Close()
}()
go func() {
defer s.closeWg.Done()
// read response from outgoing connection, write back to listener
s.s.receive(in, out, dstPort)
in.Close()
out.Close()
}()
}

func (s *server) From() string {
Expand Down Expand Up @@ -762,6 +869,10 @@ func (s *server) Ready() <-chan struct{} { return s.readyc }
func (s *server) Done() <-chan struct{} { return s.donec }
func (s *server) Error() <-chan error { return s.errc }
func (s *server) Close() (err error) {
if err := s.tmp.Shutdown(context.TODO()); err != nil {
panic(err) // failure/timeout shutting down the server gracefully
}

s.closeOnce.Do(func() {
close(s.donec)
s.listenerMu.Lock()
Expand Down

0 comments on commit c37801f

Please sign in to comment.