-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rpcenc: Support reader redirect #6952
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package rpcenc | |
import ( | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"io/ioutil" | ||
|
@@ -40,7 +41,63 @@ type ReaderStream struct { | |
Info string | ||
} | ||
|
||
var client = func() *http.Client { | ||
c := *http.DefaultClient | ||
c.CheckRedirect = func(req *http.Request, via []*http.Request) error { | ||
return http.ErrUseLastResponse | ||
} | ||
return &c | ||
}() | ||
|
||
/* | ||
|
||
Example rpc function: | ||
Push(context.Context, io.Reader) error | ||
|
||
Request flow: | ||
1. Client invokes a method with an io.Reader param | ||
2. go-jsonrpc invokes `ReaderParamEncoder` for the client-provided io.Reader | ||
3. `ReaderParamEncoder` transforms the reader into a `ReaderStream` which can | ||
be serialized as JSON, and sent as jsonrpc request parameter | ||
3.1. If the reader is of type `*sealing.NullReader`, the resulting object | ||
is `ReaderStream{ Type: "null", Info: "[base 10 number of bytes]" }` | ||
3.2. If the reader is of type `*RpcReader`, and it wasn't read from, we | ||
notify that RpcReader to go a different push endpoint, and return | ||
a `ReaderStream` object like in 3.4. | ||
Comment on lines
+62
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When do each of these cases happen? |
||
3.3. In remaining cases we start a goroutine which: | ||
3.3.1. Makes a HEAD request to the server push endpoint | ||
3.3.2. If the HEAD request is redirected, it follows the redirect | ||
3.3.3. If the request succeeds, it starts a POST request to the | ||
endpoint to which the last HEAD request was sent with the | ||
reader set as request body. | ||
3.4. We return a `ReaderStream` indicating the uuid of push request, ex: | ||
`ReaderStream{ Type: "push", Info: "[UUID string]" }` | ||
4. If the reader wasn't a NullReader, the server will receive a HEAD (or | ||
POST in case of older clients) request to the push endpoint. | ||
4.1. The server gets or registers an `*RpcReader` in the `readers` map. | ||
4.2. It waits for a request to a matching push endpoint to be opened | ||
4.3. After the request is opened, it returns the `*RpcReader` to | ||
go-jsonrpc, which will pass it as the io.Reader parameter to the | ||
rpc method implementation | ||
4.4. If the first request made to the push endpoint was a POST, the | ||
returned `*RpcReader` acts as a simple reader reading the POST | ||
request body | ||
4.5. If the first request made to the push endpoint was a HEAD | ||
4.5.1. On the first call to Read or Close the server responds with | ||
a 200 OK header, the client starts a POST request to the same | ||
push URL, and the reader starts passing through the POST request | ||
body | ||
4.5.2. If the reader is passed to another (now client) RPC method as a | ||
reader parameter, the server for the first request responds to the | ||
HEAD request with http 302 Found, instructing the first client to | ||
go to the push endpoint of the second RPC server | ||
5. If the reader was a NullReader (ReaderStream.Type=="null"), we instantiate | ||
it, and provide to the method implementation | ||
|
||
*/ | ||
|
||
func ReaderParamEncoder(addr string) jsonrpc.Option { | ||
// Client side parameter encoder. Runs on the rpc client side. io.Reader -> ReaderStream{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hoist to godoc for this method. |
||
return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) { | ||
r := value.Interface().(io.Reader) | ||
|
||
|
@@ -55,62 +112,215 @@ func ReaderParamEncoder(addr string) jsonrpc.Option { | |
} | ||
u.Path = path.Join(u.Path, reqID.String()) | ||
|
||
go func() { | ||
// TODO: figure out errors here | ||
rpcReader, redir := r.(*RpcReader) | ||
if redir { | ||
// if we have an rpc stream, redirect instead of proxying all the data | ||
redir = rpcReader.redirect(u.String()) | ||
} | ||
|
||
resp, err := http.Post(u.String(), "application/octet-stream", r) | ||
if err != nil { | ||
log.Errorf("sending reader param: %+v", err) | ||
return | ||
} | ||
if !redir { | ||
go func() { | ||
// TODO: figure out errors here | ||
for { | ||
req, err := http.NewRequest("HEAD", u.String(), nil) | ||
if err != nil { | ||
log.Errorf("sending HEAD request for the reder param: %+v", err) | ||
return | ||
} | ||
req.Header.Set("Content-Type", "application/octet-stream") | ||
resp, err := client.Do(req) | ||
if err != nil { | ||
log.Errorf("sending reader param: %+v", err) | ||
return | ||
} | ||
// todo do we need to close the body for a head request? | ||
|
||
if resp.StatusCode == http.StatusFound { | ||
nextStr := resp.Header.Get("Location") | ||
u, err = url.Parse(nextStr) | ||
if err != nil { | ||
log.Errorf("sending HEAD request for the reder param, parsing next url (%s): %+v", nextStr, err) | ||
return | ||
} | ||
|
||
continue | ||
} | ||
|
||
if resp.StatusCode == http.StatusNoContent { // reader closed before reading anything | ||
// todo just return?? | ||
return | ||
} | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
b, _ := ioutil.ReadAll(resp.Body) | ||
log.Errorf("sending reader param (%s): non-200 status: %s, msg: '%s'", u.String(), resp.Status, string(b)) | ||
return | ||
} | ||
|
||
break | ||
} | ||
|
||
// now actually send the data | ||
req, err := http.NewRequest("POST", u.String(), r) | ||
if err != nil { | ||
log.Errorf("sending reader param: %+v", err) | ||
return | ||
} | ||
req.Header.Set("Content-Type", "application/octet-stream") | ||
resp, err := client.Do(req) | ||
if err != nil { | ||
log.Errorf("sending reader param: %+v", err) | ||
return | ||
} | ||
|
||
defer resp.Body.Close() //nolint | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
b, _ := ioutil.ReadAll(resp.Body) | ||
log.Errorf("sending reader param (%s): non-200 status: %s, msg: '%s'", u.String(), resp.Status, string(b)) | ||
return | ||
} | ||
}() | ||
} | ||
|
||
defer resp.Body.Close() //nolint:errcheck | ||
return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil | ||
}) | ||
} | ||
|
||
if resp.StatusCode != 200 { | ||
b, _ := ioutil.ReadAll(resp.Body) | ||
log.Errorf("sending reader param (%s): non-200 status: %s, msg: '%s'", u.String(), resp.Status, string(b)) | ||
return | ||
} | ||
type resType int | ||
|
||
}() | ||
const ( | ||
resStart resType = iota // send on first read after HEAD | ||
resRedirect // send on redirect before first read after HEAD | ||
resError | ||
// done/closed = close res channel | ||
) | ||
|
||
return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil | ||
}) | ||
type readRes struct { | ||
rt resType | ||
meta string | ||
} | ||
|
||
// watchReadCloser watches the ReadCloser and closes the watch channel when | ||
// RpcReader watches the ReadCloser and closes the res channel when | ||
// either: (1) the ReaderCloser fails on Read (including with a benign error | ||
// like EOF), or (2) when Close is called. | ||
// | ||
// Use it be notified of terminal states, in situations where a Read failure (or | ||
// EOF) is considered a terminal state too (besides Close). | ||
type watchReadCloser struct { | ||
io.ReadCloser | ||
watch chan struct{} | ||
type RpcReader struct { | ||
postBody io.ReadCloser // nil on initial head request | ||
next chan *RpcReader // on head will get us the postBody after sending resStart | ||
mustRedirect bool | ||
|
||
res chan readRes | ||
beginOnce *sync.Once | ||
closeOnce sync.Once | ||
} | ||
|
||
func (w *watchReadCloser) Read(p []byte) (int, error) { | ||
n, err := w.ReadCloser.Read(p) | ||
var ErrHasBody = errors.New("RPCReader has body, either already read from or from a client with no redirect support") | ||
var ErrMustRedirect = errors.New("reader can't be read directly; marked as MustRedirect") | ||
|
||
// MustRedirect marks the reader as required to be redirected. Will make local | ||
// calls Read fail. MUST be called before this reader is used in any goroutine. | ||
// If the reader can't be redirected will return ErrHasBody | ||
func (w *RpcReader) MustRedirect() error { | ||
if w.postBody != nil { | ||
w.closeOnce.Do(func() { | ||
w.res <- readRes{ | ||
rt: resError, | ||
} | ||
close(w.res) | ||
}) | ||
|
||
return ErrHasBody | ||
} | ||
|
||
w.mustRedirect = true | ||
return nil | ||
} | ||
|
||
func (w *RpcReader) beginPost() { | ||
if w.mustRedirect { | ||
w.res <- readRes{ | ||
rt: resError, | ||
} | ||
close(w.res) | ||
return | ||
} | ||
|
||
if w.postBody == nil { | ||
w.res <- readRes{ | ||
rt: resStart, | ||
} | ||
|
||
nr := <-w.next | ||
|
||
w.postBody = nr.postBody | ||
w.res = nr.res | ||
w.beginOnce = nr.beginOnce | ||
} | ||
} | ||
|
||
func (w *RpcReader) Read(p []byte) (int, error) { | ||
w.beginOnce.Do(func() { | ||
w.beginPost() | ||
}) | ||
|
||
if w.mustRedirect { | ||
return 0, ErrMustRedirect | ||
} | ||
|
||
if w.postBody == nil { | ||
return 0, xerrors.Errorf("reader already closed or redirected") | ||
} | ||
|
||
n, err := w.postBody.Read(p) | ||
if err != nil { | ||
w.closeOnce.Do(func() { | ||
close(w.watch) | ||
close(w.res) | ||
}) | ||
} | ||
return n, err | ||
} | ||
|
||
func (w *watchReadCloser) Close() error { | ||
func (w *RpcReader) Close() error { | ||
w.beginOnce.Do(func() {}) | ||
w.closeOnce.Do(func() { | ||
close(w.watch) | ||
close(w.res) | ||
}) | ||
return w.ReadCloser.Close() | ||
if w.postBody == nil { | ||
return nil | ||
} | ||
return w.postBody.Close() | ||
} | ||
|
||
func (w *RpcReader) redirect(to string) bool { | ||
if w.postBody != nil { | ||
return false | ||
} | ||
|
||
done := false | ||
|
||
w.beginOnce.Do(func() { | ||
w.closeOnce.Do(func() { | ||
w.res <- readRes{ | ||
rt: resRedirect, | ||
meta: to, | ||
} | ||
|
||
done = true | ||
close(w.res) | ||
}) | ||
}) | ||
|
||
return done | ||
} | ||
|
||
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { | ||
var readersLk sync.Mutex | ||
readers := map[uuid.UUID]chan *watchReadCloser{} | ||
readers := map[uuid.UUID]chan *RpcReader{} | ||
|
||
// runs on the rpc server side, called by the client before making the jsonrpc request | ||
hnd := func(resp http.ResponseWriter, req *http.Request) { | ||
strId := path.Base(req.URL.Path) | ||
u, err := uuid.Parse(strId) | ||
|
@@ -122,14 +332,24 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { | |
readersLk.Lock() | ||
ch, found := readers[u] | ||
if !found { | ||
ch = make(chan *watchReadCloser) | ||
ch = make(chan *RpcReader) | ||
readers[u] = ch | ||
} | ||
readersLk.Unlock() | ||
|
||
wr := &watchReadCloser{ | ||
ReadCloser: req.Body, | ||
watch: make(chan struct{}), | ||
wr := &RpcReader{ | ||
res: make(chan readRes), | ||
next: ch, | ||
beginOnce: &sync.Once{}, | ||
} | ||
|
||
switch req.Method { | ||
case http.MethodHead: | ||
// leave body nil | ||
case http.MethodPost: | ||
wr.postBody = req.Body | ||
default: | ||
http.Error(resp, "unsupported method", http.StatusMethodNotAllowed) | ||
} | ||
|
||
tctx, cancel := context.WithTimeout(req.Context(), Timeout) | ||
|
@@ -145,18 +365,39 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { | |
} | ||
|
||
select { | ||
case <-wr.watch: | ||
case res, ok := <-wr.res: | ||
if !ok { | ||
if req.Method == http.MethodHead { | ||
resp.WriteHeader(http.StatusNoContent) | ||
} else { | ||
resp.WriteHeader(http.StatusOK) | ||
} | ||
return | ||
} | ||
// TODO should we check if we failed the Read, and if so | ||
// return an HTTP 500? i.e. turn watch into a chan error? | ||
// return an HTTP 500? i.e. turn res into a chan error? | ||
|
||
switch res.rt { | ||
case resRedirect: | ||
http.Redirect(resp, req, res.meta, http.StatusFound) | ||
case resStart: // responding to HEAD, request POST with reader data | ||
resp.WriteHeader(http.StatusOK) | ||
case resError: | ||
resp.WriteHeader(500) | ||
default: | ||
log.Errorf("unknown res.rt") | ||
resp.WriteHeader(500) | ||
} | ||
|
||
return | ||
case <-req.Context().Done(): | ||
log.Errorf("context error in reader stream handler (2): %v", req.Context().Err()) | ||
resp.WriteHeader(500) | ||
return | ||
} | ||
|
||
resp.WriteHeader(200) | ||
} | ||
|
||
// Server side reader decoder. runs on the rpc server side, invoked when decoding client request parameters. json(ReaderStream{}) -> io.Reader | ||
dec := jsonrpc.WithParamDecoder(new(io.Reader), func(ctx context.Context, b []byte) (reflect.Value, error) { | ||
var rs ReaderStream | ||
if err := json.Unmarshal(b, &rs); err != nil { | ||
|
@@ -180,7 +421,7 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { | |
readersLk.Lock() | ||
ch, found := readers[u] | ||
if !found { | ||
ch = make(chan *watchReadCloser) | ||
ch = make(chan *RpcReader) | ||
readers[u] = ch | ||
} | ||
readersLk.Unlock() | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this interrupt the redirect flow on the first occurrence? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's meant to, we want to handle the redirect ourselves in https://github.com/filecoin-project/lotus/pull/6952/files#diff-309dd201d9384284851cf021bce98a146c76f5db8a982949b87f1e4b3d63cb26R138 (we need to save the last URL form the HEAD requests for the final POST. Unfortunately we can't just do a POST and have Go handle the redirects, because Go's http client will send the request body with the first request, evan if it then gets the redirect response - which is why we do this whole HEAD request dance here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(and we have to use this weird CheckRedirect to be able to save the last redirect
Location
header, as otherwise if you just let the http client handle the redirect, there is no way to get that URL)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this error has a very special meaning, perhaps not obvious: