Skip to content

Commit

Permalink
THRIFT-5240: Do connectivity check in Go server
Browse files Browse the repository at this point in the history
Client: go

In compiler generated TProcessorFunction implementations, add a
goroutine after read the request to do connectivity check on the input
transport. If the transport is no longer open, cancel the context object
passed into the handler implementation.

Also define ErrAbandonRequest error, to help TSimpleServer closing
client connections that's already closed on the other end.
  • Loading branch information
fishy committed Jul 1, 2020
1 parent 5dc1d26 commit 4db7a0a
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
- [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - Add ClientMiddleware function type and WrapClient function to support wrapping a TClient with middleware functions.
- [THRIFT-5164](https://issues.apache.org/jira/browse/THRIFT-5164) - Add ProcessorMiddleware function type and WrapProcessor function to support wrapping a TProcessor with middleware functions.
- [THRIFT-5233](https://issues.apache.org/jira/browse/THRIFT-5233) - Add context deadline check to ReadMessageBegin in TBinaryProtocol, TCompactProtocol, and THeaderProtocol.
- [THRIFT-5240](https://issues.apache.org/jira/browse/THRIFT-5240) - The context passed into server handler implementations will be canceled when we detected that the client closed the connection.

## 0.13.0

Expand Down
69 changes: 65 additions & 4 deletions compiler/cpp/src/thrift/generate/t_go_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,7 @@ string t_go_generator::go_imports_begin(bool consts) {
system_packages.push_back("errors");
}
system_packages.push_back("fmt");
system_packages.push_back("time");
system_packages.push_back(gen_thrift_import_);
return "import(\n" + render_system_packages(system_packages);
}
Expand All @@ -937,6 +938,7 @@ string t_go_generator::go_imports_end() {
"var _ = fmt.Printf\n"
"var _ = context.Background\n"
"var _ = reflect.DeepEqual\n"
"var _ = time.Now\n"
"var _ = bytes.Equal\n\n");
}

Expand Down Expand Up @@ -2778,15 +2780,66 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
f_types_ << indent() << " oprot.Flush(ctx)" << endl;
}
f_types_ << indent() << " return false, err" << endl;
f_types_ << indent() << "}" << endl << endl;
f_types_ << indent() << "iprot.ReadMessageEnd(ctx)" << endl;
f_types_ << indent() << "}" << endl;
f_types_ << indent() << "iprot.ReadMessageEnd(ctx)" << endl << endl;

// Even though we never create the goroutine in oneway handlers,
// always have (nop) tickerCancel defined makes the writing part of code
// generating easier and less error-prone.
f_types_ << indent() << "tickerCancel := func() {}" << endl;
// Only create the goroutine for non-oneways.
if (!tfunction->is_oneway()) {
f_types_ << indent() << "// Start a goroutine to do server side connectivity check." << endl;
f_types_ << indent() << "if thrift.ServerConnectivityCheckInterval > 0 {" << endl;

indent_up();
f_types_ << indent() << "var cancel context.CancelFunc" << endl;
f_types_ << indent() << "ctx, cancel = context.WithCancel(ctx)" << endl;
f_types_ << indent() << "defer cancel()" << endl;
f_types_ << indent() << "var tickerCtx context.Context" << endl;
f_types_ << indent() << "tickerCtx, tickerCancel = context.WithCancel(context.Background())" << endl;
f_types_ << indent() << "defer tickerCancel()" << endl;
f_types_ << indent() << "go func(ctx context.Context, cancel context.CancelFunc) {" << endl;

indent_up();
f_types_ << indent() << "ticker := time.NewTicker(thrift.ServerConnectivityCheckInterval)" << endl;
f_types_ << indent() << "defer ticker.Stop()" << endl;
f_types_ << indent() << "for {" << endl;

indent_up();
f_types_ << indent() << "select {" << endl;
f_types_ << indent() << "case <-ctx.Done():" << endl;
indent_up();
f_types_ << indent() << "return" << endl;
indent_down();
f_types_ << indent() << "case <-ticker.C:" << endl;

indent_up();
f_types_ << indent() << "if !iprot.Transport().IsOpen() {" << endl;
indent_up();
f_types_ << indent() << "cancel()" << endl;
f_types_ << indent() << "return" << endl;
indent_down();
f_types_ << indent() << "}" << endl;
indent_down();
f_types_ << indent() << "}" << endl;
indent_down();
f_types_ << indent() << "}" << endl;
indent_down();
f_types_ << indent() << "}(tickerCtx, cancel)" << endl;
indent_down();
f_types_ << indent() << "}" << endl << endl;
} else {
// Make sure we don't get the defined but unused compiling error.
f_types_ << indent() << "_ = tickerCancel" << endl << endl;
}

if (!tfunction->is_oneway()) {
f_types_ << indent() << "result := " << resultname << "{}" << endl;
}
bool need_reference = type_need_reference(tfunction->get_returntype());
if (!tfunction->is_oneway() && !tfunction->get_returntype()->is_void()) {
f_types_ << "var retval " << type_to_go_type(tfunction->get_returntype()) << endl;
f_types_ << indent() << "var retval " << type_to_go_type(tfunction->get_returntype()) << endl;
}

f_types_ << indent() << "var err2 error" << endl;
Expand Down Expand Up @@ -2818,6 +2871,7 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
}

f_types_ << "); err2 != nil {" << endl;
f_types_ << indent() << " tickerCancel()" << endl;

t_struct* exceptions = tfunction->get_xceptions();
const vector<t_field*>& x_fields = exceptions->get_members();
Expand All @@ -2836,6 +2890,11 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
}

if (!tfunction->is_oneway()) {
// Avoid writing the error to the wire if it's ErrAbandonRequest
f_types_ << indent() << " if err2 == thrift.ErrAbandonRequest {" << endl;
f_types_ << indent() << " return false, err2" << endl;
f_types_ << indent() << " }" << endl;

f_types_ << indent() << " x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "
"\"Internal error processing " << escape_string(tfunction->get_name())
<< ": \" + err2.Error())" << endl;
Expand Down Expand Up @@ -2864,10 +2923,11 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
}
f_types_ << "retval" << endl;
indent_down();
f_types_ << "}" << endl;
f_types_ << indent() << "}" << endl;
} else {
f_types_ << endl;
}
f_types_ << indent() << "tickerCancel()" << endl;
f_types_ << indent() << "if err2 = oprot.WriteMessageBegin(ctx, \""
<< escape_string(tfunction->get_name()) << "\", thrift.REPLY, seqId); err2 != nil {"
<< endl;
Expand All @@ -2889,6 +2949,7 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
f_types_ << indent() << "return true, err" << endl;
} else {
f_types_ << endl;
f_types_ << indent() << "tickerCancel()" << endl;
f_types_ << indent() << "return true, nil" << endl;
}
indent_down();
Expand Down
27 changes: 27 additions & 0 deletions lib/go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,30 @@ which will generate:
type Foo struct {
Bar string `thrift:"bar,1,required" some_tag:"some_tag_value"`
}

A note about server handler implementations
===========================================

The context object passed into the server handler function will be canceled when
the client closes the connection (this is a best effort check, not a guarantee
-- there's no guarantee that the context object is always canceled when client
closes the connection, but when it's canceled you can always assume the client
closed the connection). When implementing Go Thrift server, you can take
advantage of that to abandon requests that's no longer needed:

func MyEndpoint(ctx context.Context, req *thriftRequestType) (*thriftResponseType, error) {
...
if ctx.Err() == context.Canceled {
return nil, thrift.ErrAbandonRequest
}
...
}

This feature would add roughly 1 millisecond of latency overhead to the server
handlers (along with roughly 2 goroutines per request).
If that is unacceptable, it can be disabled by having this line early in your
main function:

thrift.ServerConnectivityCheckInterval = 0

This feature is also only enabled on non-oneway endpoints.
25 changes: 25 additions & 0 deletions lib/go/thrift/simple_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,34 @@
package thrift

import (
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
)

// ErrAbandonRequest is a special error server handler implementations can
// return to indicate that the request has been abandoned.
//
// TSimpleServer will check for this error, and close the client connection
// instead of writing the response/error back to the client.
//
// It shall only be used when the server handler implementation know that the
// client already abandoned the request (by checking that the passed in context
// is already canceled, for example).
var ErrAbandonRequest = errors.New("request abandoned")

// ServerConnectivityCheckInterval defines the ticker interval used by
// connectivity check in thrift compiled TProcessorFunc implementations.
//
// It's defined as a variable instead of constant, so that thrift server
// implementations can change its value to control the behavior.
//
// If it's changed to <=0, the feature will be disabled.
var ServerConnectivityCheckInterval = time.Millisecond

/*
* This is not a typical TSimpleServer as it is not blocked after accept a socket.
* It is more like a TThreadedServer that can handle different connections in different goroutines.
Expand Down Expand Up @@ -293,6 +315,9 @@ func (p *TSimpleServer) processRequests(client TTransport) (err error) {
}

ok, err := processor.Process(ctx, inputProtocol, outputProtocol)
if err == ErrAbandonRequest {
return client.Close()
}
if _, ok := err.(TTransportException); ok && err != nil {
return err
}
Expand Down

0 comments on commit 4db7a0a

Please sign in to comment.