Skip to content

Commit

Permalink
gateway: eth_subscribe support
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Jan 31, 2023
1 parent 6491bec commit 1286d76
Show file tree
Hide file tree
Showing 22 changed files with 255 additions and 94 deletions.
3 changes: 1 addition & 2 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"fmt"
"time"

"github.com/filecoin-project/go-jsonrpc"

"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand All @@ -18,6 +16,7 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin/v8/paych"
Expand Down
2 changes: 1 addition & 1 deletion api/api_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,6 @@ type Gateway interface {
EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (<-chan ethtypes.EthSubscriptionResponse, error)
EthSubscribe(ctx context.Context, eventType string, params *ethtypes.EthSubscriptionParams) (ethtypes.EthSubscriptionID, error)
EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionID) (bool, error)
}
4 changes: 2 additions & 2 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func NewFullNodeRPCV0(ctx context.Context, addr string, requestHeader http.Heade
}

// NewFullNodeRPCV1 creates a new http jsonrpc client.
func NewFullNodeRPCV1(ctx context.Context, addr string, requestHeader http.Header) (api.FullNode, jsonrpc.ClientCloser, error) {
func NewFullNodeRPCV1(ctx context.Context, addr string, requestHeader http.Header, opts ...jsonrpc.Option) (api.FullNode, jsonrpc.ClientCloser, error) {
var res v1api.FullNodeStruct
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
api.GetInternalStructs(&res), requestHeader, jsonrpc.WithErrors(api.RPCErrors))
api.GetInternalStructs(&res), requestHeader, append([]jsonrpc.Option{jsonrpc.WithErrors(api.RPCErrors)}, opts...)...)

return &res, closer, err
}
Expand Down
4 changes: 2 additions & 2 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 15 additions & 13 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions api/v0api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
24 changes: 23 additions & 1 deletion cli/util/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,33 @@ func GetFullNodeAPIV1Single(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientClo
return v1API, closer, nil
}

func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, error) {
type GetFullNodeOptions struct {
ethSubHandler api.EthSubscriber
}

type GetFullNodeOption func(*GetFullNodeOptions)

func FullNodeWithEthSubscribtionHandler(sh api.EthSubscriber) GetFullNodeOption {
return func(opts *GetFullNodeOptions) {
opts.ethSubHandler = sh
}
}

func GetFullNodeAPIV1(ctx *cli.Context, opts ...GetFullNodeOption) (v1api.FullNode, jsonrpc.ClientCloser, error) {
if tn, ok := ctx.App.Metadata["testnode-full"]; ok {
return tn.(v1api.FullNode), func() {}, nil
}

var options GetFullNodeOptions
for _, opt := range opts {
opt(&options)
}

var rpcOpts []jsonrpc.Option
if options.ethSubHandler != nil {
rpcOpts = append(rpcOpts, jsonrpc.WithClientHandler("Filecoin", options.ethSubHandler), jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"))
}

heads, err := GetRawAPIMulti(ctx, repo.FullNode, "v1")
if err != nil {
return nil, nil, err
Expand Down
6 changes: 4 additions & 2 deletions cmd/lotus-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ var runCmd = &cli.Command{
log.Fatalf("Cannot register the view: %v", err)
}

api, closer, err := lcli.GetFullNodeAPIV1(cctx)
subHnd := gateway.NewEthSubHandler()

api, closer, err := lcli.GetFullNodeAPIV1(cctx, cliutil.FullNodeWithEthSubscribtionHandler(subHnd))
if err != nil {
return err
}
Expand Down Expand Up @@ -195,7 +197,7 @@ var runCmd = &cli.Command{
return xerrors.Errorf("failed to convert endpoint address to multiaddr: %w", err)
}

gwapi := gateway.NewNode(api, lookbackCap, waitLookback, rateLimit, rateLimitTimeout)
gwapi := gateway.NewNode(api, subHnd, lookbackCap, waitLookback, rateLimit, rateLimitTimeout)
h, err := gateway.Handler(gwapi, api, perConnRateLimit, connPerMinute, serverOptions...)
if err != nil {
return xerrors.Errorf("failed to set up gateway HTTP handler")
Expand Down
71 changes: 34 additions & 37 deletions documentation/en/api-v1-unstable-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -3073,43 +3073,40 @@ Inputs:

Response:
```json
{
"subscription": [
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
],
"result": {}
}
[
55,
105,
12,
254,
198,
193,
191,
76,
59,
146,
136,
199,
165,
215,
131,
233,
135,
49,
233,
11,
10,
76,
23,
124,
42,
55,
76,
122,
148,
39,
53,
94
]
```

### EthUninstallFilter
Expand Down
70 changes: 70 additions & 0 deletions gateway/eth_sub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package gateway

import (
"context"
"sync"

"github.com/filecoin-project/go-jsonrpc"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
)

type EthSubHandler struct {
queued map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse
sinks map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error

lk sync.Mutex
}

func NewEthSubHandler() *EthSubHandler {
return &EthSubHandler{
queued: make(map[ethtypes.EthSubscriptionID][]ethtypes.EthSubscriptionResponse),
sinks: make(map[ethtypes.EthSubscriptionID]func(context.Context, *ethtypes.EthSubscriptionResponse) error),
}
}

func (e *EthSubHandler) addSub(ctx context.Context, id ethtypes.EthSubscriptionID, sink func(context.Context, *ethtypes.EthSubscriptionResponse) error) error {
e.lk.Lock()
defer e.lk.Unlock()

for _, p := range e.queued[id] {
if err := sink(ctx, &p); err != nil {
return err
}
}
delete(e.queued, id)
e.sinks[id] = sink
return nil
}

func (e *EthSubHandler) removeSub(id ethtypes.EthSubscriptionID) {
e.lk.Lock()
defer e.lk.Unlock()

delete(e.sinks, id)
delete(e.queued, id)
}

func (e *EthSubHandler) EthSubscription(ctx context.Context, r jsonrpc.RawParams) error {
p, err := jsonrpc.DecodeParams[ethtypes.EthSubscriptionResponse](r)
if err != nil {
return err
}

e.lk.Lock()

sink := e.sinks[p.SubscriptionID]

if sink == nil {
e.queued[p.SubscriptionID] = append(e.queued[p.SubscriptionID], p)
e.lk.Unlock()
return nil
}

e.lk.Unlock()

return sink(ctx, &p) // todo track errors and auto-unsubscribe on rpc conn close?
}

var _ api.EthSubscriber = (*EthSubHandler)(nil)
4 changes: 2 additions & 2 deletions gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const perConnLimiterKey perConnLimiterKeyType = "limiter"

type filterTrackerKeyType string

const filterTrackerKey filterTrackerKeyType = "filterTracker"
const statefulCallTrackerKey filterTrackerKeyType = "statefulCallTracker"

// Handler returns a gateway http.Handler, to be mounted as-is on the server.
func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinute int64, opts ...jsonrpc.ServerOption) (http.Handler, error) {
Expand Down Expand Up @@ -90,7 +90,7 @@ func (h RateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(context.WithValue(r.Context(), perConnLimiterKey, h.limiter))

// also add a filter tracker to the context
r = r.WithContext(context.WithValue(r.Context(), filterTrackerKey, newFilterTracker()))
r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, newStatefulCallTracker()))

h.handler.ServeHTTP(w, r)
}
Expand Down
Loading

0 comments on commit 1286d76

Please sign in to comment.