Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

THRIFT-5833: Add ProcessorError #3063

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions compiler/cpp/src/thrift/generate/t_go_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2988,7 +2988,7 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
string write_err;
if (!tfunction->is_oneway()) {
write_err = tmp("_write_err");
f_types_ << indent() << "var " << write_err << " error" << '\n';
f_types_ << indent() << "var " << write_err << " thrift.TException" << '\n';
}
f_types_ << indent() << "args := " << argsname << "{}" << '\n';
f_types_ << indent() << "if err2 := args." << read_method_name_ << "(ctx, iprot); err2 != nil {" << '\n';
Expand Down Expand Up @@ -3120,14 +3120,24 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
// Avoid writing the error to the wire if it's ErrAbandonRequest
f_types_ << indent() << "if errors.Is(err2, thrift.ErrAbandonRequest) {" << '\n';
indent_up();
f_types_ << indent() << "return false, thrift.WrapTException(err2)" << '\n';
f_types_ << indent() << "return false, &thrift.ProcessorError{" << '\n';
indent_up();
f_types_ << indent() << "WriteError: thrift.WrapTException(err2)," << '\n';
f_types_ << indent() << "EndpointError: err," << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
f_types_ << indent() << "if errors.Is(err2, context.Canceled) {" << '\n';
indent_up();
f_types_ << indent() << "if err := context.Cause(ctx); errors.Is(err, thrift.ErrAbandonRequest) {" << '\n';
f_types_ << indent() << "if err3 := context.Cause(ctx); errors.Is(err3, thrift.ErrAbandonRequest) {" << '\n';
indent_up();
f_types_ << indent() << "return false, thrift.WrapTException(err)" << '\n';
f_types_ << indent() << "return false, &thrift.ProcessorError{" << '\n';
indent_up();
f_types_ << indent() << "WriteError: thrift.WrapTException(err3)," << '\n';
f_types_ << indent() << "EndpointError: err," << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
Expand Down Expand Up @@ -3168,7 +3178,12 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*

f_types_ << indent() << "if " << write_err << " != nil {" << '\n';
indent_up();
f_types_ << indent() << "return false, thrift.WrapTException(" << write_err << ")" << '\n';
f_types_ << indent() << "return false, &thrift.ProcessorError{" << '\n';
indent_up();
f_types_ << indent() << "WriteError: " << write_err << "," << '\n';
f_types_ << indent() << "EndpointError: err," << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';

Expand Down Expand Up @@ -3230,7 +3245,12 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*

f_types_ << indent() << "if " << write_err << " != nil {" << '\n';
indent_up();
f_types_ << indent() << "return false, thrift.WrapTException(" << write_err << ")" << '\n';
f_types_ << indent() << "return false, &thrift.ProcessorError{" << '\n';
indent_up();
f_types_ << indent() << "WriteError: " << write_err << "," << '\n';
f_types_ << indent() << "EndpointError: err," << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';

Expand Down
32 changes: 25 additions & 7 deletions lib/go/test/tests/processor_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ import (

const errorMessage = "foo error"

type serviceImpl struct{}
type serviceImpl struct {
sleepTime time.Duration
}

func (serviceImpl) Ping(_ context.Context) (err error) {
func (s serviceImpl) Ping(_ context.Context) (err error) {
time.Sleep(s.sleepTime)
return &processormiddlewaretest.Error{
Foo: thrift.StringPtr(errorMessage),
}
Expand Down Expand Up @@ -67,9 +70,14 @@ func checkError(tb testing.TB, err error) {
}

func TestProcessorMiddleware(t *testing.T) {
const timeout = time.Second
const (
sleepTime = 10 * time.Millisecond
timeout = sleepTime / 5
)

processor := processormiddlewaretest.NewServiceProcessor(&serviceImpl{})
processor := processormiddlewaretest.NewServiceProcessor(&serviceImpl{
sleepTime: sleepTime,
})
serverTransport, err := thrift.NewTServerSocket("127.0.0.1:0")
if err != nil {
t.Fatalf("Could not find available server port: %v", err)
Expand All @@ -80,7 +88,9 @@ func TestProcessorMiddleware(t *testing.T) {
thrift.NewTHeaderTransportFactoryConf(nil, nil),
thrift.NewTHeaderProtocolFactoryConf(nil),
)
defer server.Stop()
t.Cleanup(func() {
server.Stop()
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -103,6 +113,14 @@ func TestProcessorMiddleware(t *testing.T) {

client := processormiddlewaretest.NewServiceClient(thrift.NewTStandardClient(protocol, protocol))

err = client.Ping(context.Background())
checkError(t, err)
for label, timeout := range map[string]time.Duration{
"enough-time": sleepTime * 10,
"not-enough-time": sleepTime / 2,
} {
t.Run(label, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
client.Ping(ctx)
})
}
}
52 changes: 51 additions & 1 deletion lib/go/thrift/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

package thrift

import "context"
import (
"context"
"fmt"
"strings"
)

// A processor is a generic object which operates upon an input stream and
// writes to some output stream.
Expand Down Expand Up @@ -78,3 +82,49 @@ func NewTProcessorFunctionFactory(p TProcessorFunction) TProcessorFunctionFactor
func (p *tProcessorFunctionFactory) GetProcessorFunction(trans TTransport) TProcessorFunction {
return p.processor
}

// ProcessorError is the combined original error returned by the endpoint
// implementation, and I/O error when writing the response back to the client.
//
// This type will be returned by Process function if there's an error happened
// during writing the response back to the client. ProcessorMiddlewares can
// check for this type (use errors.As) to get the underlying write and endpoint
// errors.
type ProcessorError struct {
// WriteError is the error happened during writing the response to the
// client, always set.
WriteError TException

// EndpointError is the original error returned by the endpoint
// implementation, might be nil.
EndpointError TException
}

func (pe *ProcessorError) Unwrap() []error {
if pe.EndpointError != nil {
return []error{
pe.WriteError,
pe.EndpointError,
}
}
return []error{pe.WriteError}
}

func (pe *ProcessorError) Error() string {
var sb strings.Builder
sb.WriteString("thrift.ProcessorError: ")
sb.WriteString(fmt.Sprintf("write response to client: %v", pe.WriteError))
if pe.EndpointError != nil {
sb.WriteString(fmt.Sprintf("; original error from endpoint: %v", pe.EndpointError))
}
return sb.String()
}

func (pe *ProcessorError) TExceptionType() TExceptionType {
return pe.WriteError.TExceptionType()
}

var (
_ error = (*ProcessorError)(nil)
_ TException = (*ProcessorError)(nil)
)
1 change: 1 addition & 0 deletions lib/go/thrift/simple_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func TestErrAbandonRequest(t *testing.T) {
if !errors.Is(ErrAbandonRequest, context.Canceled) {
t.Error("errors.Is(ErrAbandonRequest, context.Canceled) returned false")
}
//lint:ignore SA1032 Intentional order for this test.
if errors.Is(context.Canceled, ErrAbandonRequest) {
t.Error("errors.Is(context.Canceled, ErrAbandonRequest) returned true")
}
Expand Down
Loading