Skip to content

Commit

Permalink
pkg/rpc: Re-import from Go stdlib.
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaq committed Aug 16, 2024
1 parent 84cb700 commit 2364b99
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 60 deletions.
16 changes: 8 additions & 8 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ type Client struct {

// A ClientCodec implements writing of RPC requests and
// reading of RPC responses for the client side of an RPC session.
// The client calls WriteRequest to write a request to the connection
// and calls ReadResponseHeader and ReadResponseBody in pairs
// to read responses. The client calls Close when finished with the
// The client calls [ClientCodec.WriteRequest] to write a request to the connection
// and calls [ClientCodec.ReadResponseHeader] and [ClientCodec.ReadResponseBody] in pairs
// to read responses. The client calls [ClientCodec.Close] when finished with the
// connection. ReadResponseBody may be called with a nil
// argument to force the body of the response to be read and then
// discarded.
// See NewClient's comment for information about concurrent access.
// See [NewClient]'s comment for information about concurrent access.
type ClientCodec interface {
WriteRequest(*Request, any) error
ReadResponseHeader(*Response) error
Expand Down Expand Up @@ -180,7 +180,7 @@ func (call *Call) done() {
}
}

// NewClient returns a new Client to handle requests to the
// NewClient returns a new [Client] to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
Expand All @@ -195,7 +195,7 @@ func NewClient(conn io.ReadWriteCloser) *Client {
return NewClientWithCodec(client)
}

// NewClientWithCodec is like NewClient but uses the specified
// NewClientWithCodec is like [NewClient] but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
Expand Down Expand Up @@ -245,7 +245,7 @@ func Dial(network, address string) (*Client, error) {
}

// Close calls the underlying codec's Close method. If the connection is already
// shutting down, ErrShutdown is returned.
// shutting down, [ErrShutdown] is returned.
func (client *Client) Close() error {
client.mutex.Lock()
if client.closing {
Expand All @@ -257,7 +257,7 @@ func (client *Client) Close() error {
return client.codec.Close()
}

// Go invokes the function asynchronously. It returns the Call structure representing
// Go invokes the function asynchronously. It returns the [Call] structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
Expand Down
107 changes: 55 additions & 52 deletions pkg/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// license that can be found in the LICENSE file.

/*
Package rpc is a trimmed down version of net/rpc in the standard library.
Original doc:
Package rpc is a trimmed down version of net/rpc in the standard library,
without the dependency on net/http or html/template. Original doc:
Package rpc provides access to the exported methods of an object across a
network or other I/O connection. A server registers an object, making it visible
Expand Down Expand Up @@ -33,25 +33,25 @@ These requirements apply even if a different codec is used.
The method's first argument represents the arguments provided by the caller; the
second argument represents the result parameters to be returned to the caller.
The method's return value, if non-nil, is passed back as a string that the client
sees as if created by errors.New. If an error is returned, the reply parameter
sees as if created by [errors.New]. If an error is returned, the reply parameter
will not be sent back to the client.
The server may handle requests on a single connection by calling ServeConn. More
typically it will create a network listener and call Accept or, for an HTTP
listener, HandleHTTP and http.Serve.
The server may handle requests on a single connection by calling [ServeConn]. More
typically it will create a network listener and call [Accept] or, for an HTTP
listener, [HandleHTTP] and [http.Serve].
A client wishing to use the service establishes a connection and then invokes
NewClient on the connection. The convenience function Dial (DialHTTP) performs
[NewClient] on the connection. The convenience function [Dial] ([DialHTTP]) performs
both steps for a raw network connection (an HTTP connection). The resulting
Client object has two methods, Call and Go, that specify the service and method to
[Client] object has two methods, [Call] and Go, that specify the service and method to
call, a pointer containing the arguments, and a pointer to receive the result
parameters.
The Call method waits for the remote call to complete while the Go method
launches the call asynchronously and signals completion using the Call
structure's Done channel.
Unless an explicit codec is set up, package encoding/gob is used to
Unless an explicit codec is set up, package [encoding/gob] is used to
transport the data.
Here is a simple example. A server wishes to export an object of type Arith:
Expand Down Expand Up @@ -89,9 +89,9 @@ The server calls (for HTTP service):
arith := new(Arith)
rpc.Register(arith)
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
l, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("listen error:", err)
}
go http.Serve(l, nil)
Expand Down Expand Up @@ -142,9 +142,8 @@ import (
"sync"
)

// Precompute the reflect type for error. Can't use error directly
// because Typeof takes an empty interface value. This is annoying.
var typeOfError = reflect.TypeOf((*error)(nil)).Elem()
// Precompute the reflect type for error.
var typeOfError = reflect.TypeFor[error]()

type methodType struct {
sync.Mutex // protects counters
Expand Down Expand Up @@ -189,17 +188,17 @@ type Server struct {
freeResp *Response
}

// NewServer returns a new Server.
// NewServer returns a new [Server].
func NewServer() *Server {
return &Server{}
}

// DefaultServer is the default instance of *Server.
// DefaultServer is the default instance of [*Server].
var DefaultServer = NewServer()

// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
for t.Kind() == reflect.Pointer {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
Expand All @@ -222,34 +221,38 @@ func (server *Server) Register(rcvr any) error {
return server.register(rcvr, "", false)
}

// RegisterName is like Register but uses the provided name for the type
// RegisterName is like [Register] but uses the provided name for the type
// instead of the receiver's concrete type.
func (server *Server) RegisterName(name string, rcvr any) error {
return server.register(rcvr, name, true)
}

// logRegisterError specifies whether to log problems during method registration.
// To debug registration, recompile the package with this set to true.
const logRegisterError = false

func (server *Server) register(rcvr any, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if useName {
sname = name
sname := name
if !useName {
sname = reflect.Indirect(s.rcvr).Type().Name()
}
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
log.Print(s)
return errors.New(s)
}
if !token.IsExported(sname) && !useName {
if !useName && !token.IsExported(sname) {
s := "rpc.Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
s.name = sname

// Install the methods
s.method = suitableMethods(s.typ, true)
s.method = suitableMethods(s.typ, logRegisterError)

if len(s.method) == 0 {
str := ""
Expand All @@ -271,58 +274,58 @@ func (server *Server) register(rcvr any, name string, useName bool) error {
return nil
}

// suitableMethods returns suitable Rpc methods of typ, it will report
// error using log if reportErr is true.
func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
// suitableMethods returns suitable Rpc methods of typ. It will log
// errors if logErr is true.
func suitableMethods(typ reflect.Type, logErr bool) map[string]*methodType {
methods := make(map[string]*methodType)
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := method.Name
// Method must be exported.
if method.PkgPath != "" {
if !method.IsExported() {
continue
}
// Method needs three ins: receiver, *args, *reply.
if mtype.NumIn() != 3 {
if reportErr {
if logErr {
log.Printf("rpc.Register: method %q has %d input parameters; needs exactly three\n", mname, mtype.NumIn())
}
continue
}
// First arg need not be a pointer.
argType := mtype.In(1)
if !isExportedOrBuiltinType(argType) {
if reportErr {
if logErr {
log.Printf("rpc.Register: argument type of method %q is not exported: %q\n", mname, argType)
}
continue
}
// Second arg must be a pointer.
replyType := mtype.In(2)
if replyType.Kind() != reflect.Ptr {
if reportErr {
if replyType.Kind() != reflect.Pointer {
if logErr {
log.Printf("rpc.Register: reply type of method %q is not a pointer: %q\n", mname, replyType)
}
continue
}
// Reply type must be exported.
if !isExportedOrBuiltinType(replyType) {
if reportErr {
if logErr {
log.Printf("rpc.Register: reply type of method %q is not exported: %q\n", mname, replyType)
}
continue
}
// Method needs one out.
if mtype.NumOut() != 1 {
if reportErr {
if logErr {
log.Printf("rpc.Register: method %q has %d output parameters; needs exactly one\n", mname, mtype.NumOut())
}
continue
}
// The return type of the method must be error.
if returnType := mtype.Out(0); returnType != typeOfError {
if reportErr {
if logErr {
log.Printf("rpc.Register: return type of method %q is %q, must be error\n", mname, returnType)
}
continue
Expand Down Expand Up @@ -433,8 +436,8 @@ func (c *gobServerCodec) Close() error {
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
// connection. To use an alternate codec, use [ServeCodec].
// See [NewClient]'s comment for information about concurrent access.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
Expand All @@ -446,7 +449,7 @@ func (server *Server) ServeConn(conn io.ReadWriteCloser) {
server.ServeCodec(srv)
}

// ServeCodec is like ServeConn but uses the specified codec to
// ServeCodec is like [ServeConn] but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
Expand Down Expand Up @@ -476,7 +479,7 @@ func (server *Server) ServeCodec(codec ServerCodec) {
codec.Close()
}

// ServeRequest is like ServeCodec but synchronously serves a single request.
// ServeRequest is like [ServeCodec] but synchronously serves a single request.
// It does not close the codec upon completion.
func (server *Server) ServeRequest(codec ServerCodec) error {
sending := new(sync.Mutex)
Expand Down Expand Up @@ -549,7 +552,7 @@ func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *m

// Decode the argument value.
argIsValue := false // if true, need to indirect before calling.
if mtype.ArgType.Kind() == reflect.Ptr {
if mtype.ArgType.Kind() == reflect.Pointer {
argv = reflect.New(mtype.ArgType.Elem())
} else {
argv = reflect.New(mtype.ArgType)
Expand Down Expand Up @@ -628,23 +631,23 @@ func (server *Server) Accept(lis net.Listener) {
}
}

// Register publishes the receiver's methods in the DefaultServer.
// Register publishes the receiver's methods in the [DefaultServer].
func Register(rcvr any) error { return DefaultServer.Register(rcvr) }

// RegisterName is like Register but uses the provided name for the type
// RegisterName is like [Register] but uses the provided name for the type
// instead of the receiver's concrete type.
func RegisterName(name string, rcvr any) error {
return DefaultServer.RegisterName(name, rcvr)
}

// A ServerCodec implements reading of RPC requests and writing of
// RPC responses for the server side of an RPC session.
// The server calls ReadRequestHeader and ReadRequestBody in pairs
// to read requests from the connection, and it calls WriteResponse to
// write a response back. The server calls Close when finished with the
// The server calls [ServerCodec.ReadRequestHeader] and [ServerCodec.ReadRequestBody] in pairs
// to read requests from the connection, and it calls [ServerCodec.WriteResponse] to
// write a response back. The server calls [ServerCodec.Close] when finished with the
// connection. ReadRequestBody may be called with a nil
// argument to force the body of the request to be read and discarded.
// See NewClient's comment for information about concurrent access.
// See [NewClient]'s comment for information about concurrent access.
type ServerCodec interface {
ReadRequestHeader(*Request) error
ReadRequestBody(any) error
Expand All @@ -654,29 +657,29 @@ type ServerCodec interface {
Close() error
}

// ServeConn runs the DefaultServer on a single connection.
// ServeConn runs the [DefaultServer] on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
// connection. To use an alternate codec, use [ServeCodec].
// See [NewClient]'s comment for information about concurrent access.
func ServeConn(conn io.ReadWriteCloser) {
DefaultServer.ServeConn(conn)
}

// ServeCodec is like ServeConn but uses the specified codec to
// ServeCodec is like [ServeConn] but uses the specified codec to
// decode requests and encode responses.
func ServeCodec(codec ServerCodec) {
DefaultServer.ServeCodec(codec)
}

// ServeRequest is like ServeCodec but synchronously serves a single request.
// ServeRequest is like [ServeCodec] but synchronously serves a single request.
// It does not close the codec upon completion.
func ServeRequest(codec ServerCodec) error {
return DefaultServer.ServeRequest(codec)
}

// Accept accepts connections on the listener and serves requests
// to DefaultServer for each incoming connection.
// to [DefaultServer] for each incoming connection.
// Accept blocks; the caller typically invokes it in a go statement.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }

0 comments on commit 2364b99

Please sign in to comment.