Skip to content

Commit

Permalink
feat: write separation
Browse files Browse the repository at this point in the history
  • Loading branch information
dickens7 committed Sep 15, 2024
1 parent 8914b35 commit 04390ab
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 29 deletions.
5 changes: 1 addition & 4 deletions server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,7 @@ func (ctx *Context) Write(v interface{}) error {

var err error
if ctx.async {
go func() {
_, err = ctx.conn.Write(*respData)
protocol.PutData(respData)
}()
return wirteResp(ctx.ctx, res)
} else {
_, err = ctx.conn.Write(*respData)
protocol.PutData(respData)
Expand Down
7 changes: 7 additions & 0 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,10 @@ func WithAsyncWrite() OptionFn {
s.AsyncWrite = true
}
}

// AsyncOutgoing sets AsyncWrite outgoing queue
func AsyncOutgoing(limit int) OptionFn {
return func(s *Server) {
s.AsyncOutgoing = limit
}
}
93 changes: 68 additions & 25 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ var (
TagContextKey = &contextKey{"service-tag"}
// HttpConnContextKey is used to store http connection.
HttpConnContextKey = &contextKey{"http-conn"}
// AsyncWriteCh
AsyncWriteCh = &contextKey{"async-write-ch"}
)

type Handler func(ctx *Context) error
Expand All @@ -87,6 +89,7 @@ type Server struct {
DisableHTTPGateway bool // disable http invoke or not.
DisableJSONRPC bool // disable json rpc or not.
AsyncWrite bool // set true if your server only serves few clients
AsyncOutgoing int // write message conn outgoing queue max
pool WorkerPool

serviceMapMu sync.RWMutex
Expand Down Expand Up @@ -133,14 +136,15 @@ type Server struct {
// NewServer returns a server.
func NewServer(options ...OptionFn) *Server {
s := &Server{
Plugins: &pluginContainer{},
options: make(map[string]interface{}),
activeConn: make(map[net.Conn]struct{}),
doneChan: make(chan struct{}),
serviceMap: make(map[string]*service),
router: make(map[string]Handler),
AsyncWrite: false, // ้™ค้žไฝ ๆƒณๅš่ฟ›ไธ€ๆญฅ็š„ไผ˜ๅŒ–ๆต‹่ฏ•๏ผŒๅฆๅˆ™ๅปบ่ฎฎไฝ ่ฎพ็ฝฎไธบfalse
Started: make(chan struct{}),
Plugins: &pluginContainer{},
options: make(map[string]interface{}),
activeConn: make(map[net.Conn]struct{}),
doneChan: make(chan struct{}),
serviceMap: make(map[string]*service),
router: make(map[string]Handler),
AsyncWrite: false, // ้™ค้žไฝ ๆƒณๅš่ฟ›ไธ€ๆญฅ็š„ไผ˜ๅŒ–ๆต‹่ฏ•๏ผŒๅฆๅˆ™ๅปบ่ฎฎไฝ ่ฎพ็ฝฎไธบfalse
AsyncOutgoing: 100000, //
Started: make(chan struct{}),
}

for _, op := range options {
Expand Down Expand Up @@ -364,23 +368,11 @@ func (s *Server) sendResponse(ctx *share.Context, conn net.Conn, err error, req,

data := res.EncodeSlicePointer()
if s.AsyncWrite {
if s.pool != nil {
s.pool.Submit(func() {
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
}
conn.Write(*data)
protocol.PutData(data)
})
} else {
go func() {
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
}
conn.Write(*data)
protocol.PutData(data)
}()
err := wirteResp(ctx, res)
if err != nil {
log.Errorf(err.Error())
}
return
} else {
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
Expand Down Expand Up @@ -433,9 +425,14 @@ func (s *Server) serveConn(conn net.Conn) {
return
}
}
var asyncWriteCh chan *protocol.Message
// async write
if s.AsyncWrite {
asyncWriteCh = make(chan *protocol.Message, s.AsyncOutgoing)
go s.asyncWrite(conn, asyncWriteCh)
}

r := bufio.NewReaderSize(conn, ReaderBuffsize)

// read requests and handle it
for {
if s.isShutdown() {
Expand All @@ -449,6 +446,9 @@ func (s *Server) serveConn(conn net.Conn) {

// create a rpcx Context
ctx := share.WithValue(context.Background(), RemoteConnContextKey, conn)
if s.AsyncWrite {
ctx = share.WithValue(ctx, AsyncWriteCh, asyncWriteCh)
}

// read a request from the underlying connection
req, err := s.readRequest(ctx, r)
Expand Down Expand Up @@ -522,6 +522,36 @@ func (s *Server) serveConn(conn net.Conn) {
}
}

// syncWrite
func (s *Server) asyncWrite(conn net.Conn, asyncWriteCh chan *protocol.Message) {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
ss := runtime.Stack(buf, false)
if ss > size {
ss = size
}
buf = buf[:ss]
log.Errorf("serving %s panic error: %s, stack:\n %s", conn.RemoteAddr(), err, string(buf))
}
for {
if s.isShutdown() {
return
}
select {
case <-s.doneChan:
return
case res := <-asyncWriteCh:
data := res.EncodeSlicePointer()
if s.writeTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
}
conn.Write(*data)
protocol.PutData(data)
}
}
}

func (s *Server) processOneRequest(ctx *share.Context, req *protocol.Message, conn net.Conn) {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -1073,6 +1103,19 @@ func (s *Server) closeDoneChanLocked() {
}
}

func wirteResp(ctx context.Context, res *protocol.Message) error {
ch, ok := ctx.Value(AsyncWriteCh).(chan *protocol.Message)
if !ok {
return fmt.Errorf("async write chan closed")
}
select {
case ch <- res:
default:
return fmt.Errorf("could not write message, conn outgoing queue full")
}
return nil
}

var ip4Reg = regexp.MustCompile(`^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$`)

func validIP4(ipAddress string) bool {
Expand Down

0 comments on commit 04390ab

Please sign in to comment.