diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 9d2ca22696e8..976a791dc0f9 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -450,6 +450,7 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error defaultMux("/debug/pprof/"), corehttp.MetricsScrapingOption("/debug/metrics/prometheus"), corehttp.LogOption(), + corehttp.ProxyOption(), } if len(cfg.Gateway.RootRedirect) > 0 { diff --git a/p2p/local.go b/p2p/local.go index 381156691373..ba5acc3d729a 100644 --- a/p2p/local.go +++ b/p2p/local.go @@ -57,7 +57,7 @@ func (l *localListener) dial(ctx context.Context) (net.Stream, error) { cctx, cancel := context.WithTimeout(ctx, time.Second*30) //TODO: configurable? defer cancel() - return l.p2p.peerHost.NewStream(cctx, l.peer, l.proto) + return l.p2p.PeerHost.NewStream(cctx, l.peer, l.proto) } func (l *localListener) acceptConns() { diff --git a/p2p/p2p.go b/p2p/p2p.go index 237c891e0ea6..15900d40d034 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -16,23 +16,23 @@ type P2P struct { Streams *StreamRegistry identity peer.ID - peerHost p2phost.Host + PeerHost p2phost.Host peerstore pstore.Peerstore } // NewP2P creates new P2P struct -func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *P2P { +func NewP2P(identity peer.ID, PeerHost p2phost.Host, peerstore pstore.Peerstore) *P2P { return &P2P{ identity: identity, - peerHost: peerHost, + PeerHost: PeerHost, peerstore: peerstore, ListenersLocal: newListenersLocal(), - ListenersP2P: newListenersP2P(peerHost), + ListenersP2P: newListenersP2P(PeerHost), Streams: &StreamRegistry{ Streams: map[uint64]*Stream{}, - ConnManager: peerHost.ConnManager(), + ConnManager: PeerHost.ConnManager(), conns: map[peer.ID]int{}, }, } @@ -41,7 +41,7 @@ func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) // CheckProtoExists checks whether a proto handler is registered to // mux handler func (p2p *P2P) CheckProtoExists(proto string) bool { - protos := p2p.peerHost.Mux().Protocols() + protos := p2p.PeerHost.Mux().Protocols() for _, p := range protos { if p != proto { diff --git a/p2p/proxy.go b/p2p/proxy.go new file mode 100644 index 000000000000..a247600920ab --- /dev/null +++ b/p2p/proxy.go @@ -0,0 +1,92 @@ +package p2p + +import ( + "bufio" + "fmt" + "net" + "net/http" + "strings" + + core "github.com/ipfs/go-ipfs/core" + protocol "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol" + peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" +) + +func ProxyOption() ServeOption { + return func(ipfsNode *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) { + mux.HandleFunc("/proxy/", func(w http.ResponseWriter, request *http.Request) { + // parse request + parsedRequest, err := parseRequest(request) + if err != nil { + handleError(w, "Failed to parse request", err, 400) + return + } + + // open connect to peer + stream, err := ipfsNode.P2P.PeerHost.NewStream(ipfsNode.Context(), parsedRequest.target, protocol.ID("/x/"+parsedRequest.name)) + if err != nil { + msg := fmt.Sprintf("Failed to open stream '%v' to target peer '%v'", parsedRequest.name, parsedRequest.target) + handleError(w, msg, err, 500) + return + } + + // send request to peer + proxyReq, err := http.NewRequest(request.Method, parsedRequest.httpPath, request.Body) + + if err != nil { + handleError(w, "Failed to format proxy request", err, 500) + return + } + + proxyReq.Write(stream) + + s := bufio.NewReader(stream) + proxyResponse, err := http.ReadResponse(s, proxyReq) + defer func() { proxyResponse.Body.Close() }() + if err != nil { + msg := fmt.Sprintf("Failed to send request to stream '%v' to peer '%v'", parsedRequest.name, parsedRequest.target) + handleError(w, msg, err, 500) + return + } + // send client response + proxyResponse.Write(w) + }) + return mux, nil + } +} + +type proxyRequest struct { + target peer.ID + name string + httpPath string // path to send to the proxy-host +} + +// from the url path parse the peer-ID, name and http path +// /http/$peer_id/$name/$http_path +func parseRequest(request *http.Request) (*proxyRequest, error) { + path := request.URL.Path + + split := strings.SplitN(path, "/", 6) + if split[2] != "http" { + return nil, fmt.Errorf("Invalid proxy request protocol '%s'", path) + } + + if len(split) < 6 { + return nil, fmt.Errorf("Invalid request path '%s'", path) + } + + peerID, err := peer.IDB58Decode(split[3]) + + if err != nil { + return nil, err + } + + return &proxyRequest{peerID, split[4], split[5]}, nil +} + +// log error and send response to client +func handleError(w http.ResponseWriter, msg string, err error, code int) { + w.WriteHeader(code) + fmt.Fprintf(w, "%s: %s\n", msg, err) + log.Warningf("server error: %s: %s", err) +} diff --git a/p2p/proxy_test.go b/p2p/proxy_test.go new file mode 100644 index 000000000000..b818ce707738 --- /dev/null +++ b/p2p/proxy_test.go @@ -0,0 +1,21 @@ +package p2p + +import ( + "github.com/ipfs/go-ipfs/thirdparty/assert" + "net/http" + "strings" + "testing" +) + +func TestParseRequest(t *testing.T) { + url := "http://localhost:5001/proxy/http/QmT8JtU54XSmC38xSb1XHFSMm775VuTeajg7LWWWTAwzxT/test-name/path/to/index.txt" + req, _ := http.NewRequest("GET", url, strings.NewReader("")) + + parsed, err := parseRequest(req) + if err != nil { + t.Error(err) + } + assert.True(parsed.httpPath == "path/to/index.txt", t, "proxy request path") + assert.True(parsed.name == "test-name", t, "proxy request name") + assert.True(parsed.target.Pretty() == "QmT8JtU54XSmC38xSb1XHFSMm775VuTeajg7LWWWTAwzxT", t, "proxy request peer-id") +}