Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Name to Events, Name to Subscription #71

Merged
merged 7 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/kldbind/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type HexBigInt = hexutil.Big
// HexUint64 models and serializes uint64
type HexUint64 = hexutil.Uint64

// HexUint models and serializes uint
type HexUint = hexutil.Uint

// ABIEvent is an event on the ABI
type ABIEvent = abi.Event

Expand Down
5 changes: 4 additions & 1 deletion internal/kldcontracts/rest2eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,10 @@ func (r *rest2eth) subscribeEvent(res http.ResponseWriter, req *http.Request, ad
address := common.HexToAddress(addrStr)
addr = &address
}
sub, err := r.subMgr.AddSubscription(req.Context(), addr, abiEvent, streamID, fromBlock)
// if the end user provided a name for the subscription, use it
// If not provided, it will be set to a system-generated summary
name := r.fromBodyOrForm(req, body, "name")
sub, err := r.subMgr.AddSubscription(req.Context(), addr, abiEvent, streamID, fromBlock, name)
if err != nil {
r.restErrReply(res, req, err, 400)
return
Expand Down
5 changes: 3 additions & 2 deletions internal/kldcontracts/rest2eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (m *mockSubMgr) ResumeStream(ctx context.Context, id string) error {
return m.err
}
func (m *mockSubMgr) DeleteStream(ctx context.Context, id string) error { return m.err }
func (m *mockSubMgr) AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock string) (*kldevents.SubscriptionInfo, error) {
func (m *mockSubMgr) AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock, name string) (*kldevents.SubscriptionInfo, error) {
m.capturedAddr = addr
return m.sub, m.err
}
Expand Down Expand Up @@ -1247,7 +1247,7 @@ func TestSubscribeNoAddressSuccess(t *testing.T) {
dispatcher := &mockREST2EthDispatcher{}
r, _, router := newTestREST2Eth(dispatcher)
sm := &mockSubMgr{
sub: &kldevents.SubscriptionInfo{ID: "sub1"},
sub: &kldevents.SubscriptionInfo{ID: "sub1", Name: "stream-without-address"},
}
r.subMgr = sm
bodyBytes, _ := json.Marshal(&map[string]string{
Expand All @@ -1262,6 +1262,7 @@ func TestSubscribeNoAddressSuccess(t *testing.T) {
err := json.NewDecoder(res.Result().Body).Decode(&reply)
assert.NoError(err)
assert.Equal("sub1", reply.ID)
assert.Equal("stream-without-address", reply.Name)
assert.Nil(sm.capturedAddr)
}

Expand Down
13 changes: 8 additions & 5 deletions internal/kldcontracts/smartcontractgw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,7 @@ func TestAddStreamNoSubMgr(t *testing.T) {

func TestAddStreamOK(t *testing.T) {
assert := assert.New(t)
spec := &kldevents.StreamInfo{Type: "webhook"}
spec := &kldevents.StreamInfo{Type: "webhook", Name: "stream-1"}
b, _ := json.Marshal(spec)
req := httptest.NewRequest("POST", kldevents.StreamPathPrefix, bytes.NewReader(b))
res := httptest.NewRecorder()
Expand All @@ -1607,6 +1607,7 @@ func TestAddStreamOK(t *testing.T) {
json.NewDecoder(res.Body).Decode(&newSpec)
assert.Equal(200, res.Result().StatusCode)
assert.Equal("webhook", newSpec.Type)
assert.Equal("stream-1", newSpec.Name)
s.Shutdown()
}

Expand Down Expand Up @@ -1653,15 +1654,15 @@ func TestListStreams(t *testing.T) {

mockSubMgr := &mockSubMgr{
streams: []*kldevents.StreamInfo{
&kldevents.StreamInfo{
{
TimeSorted: kldmessages.TimeSorted{
CreatedISO8601: time.Now().UTC().Format(time.RFC3339),
}, ID: "earlier",
}, ID: "earlier", Name: "stream-1",
},
&kldevents.StreamInfo{
{
TimeSorted: kldmessages.TimeSorted{
CreatedISO8601: time.Now().UTC().Add(1 * time.Hour).Format(time.RFC3339),
}, ID: "later",
}, ID: "later", Name: "stream-2",
},
},
}
Expand All @@ -1670,7 +1671,9 @@ func TestListStreams(t *testing.T) {
assert.Equal(200, res.Result().StatusCode)
assert.Equal(2, len(results))
assert.Equal("later", results[0].ID)
assert.Equal("stream-2", results[0].Name)
assert.Equal("earlier", results[1].ID)
assert.Equal("stream-1", results[1].Name)
}

func TestListSubs(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions internal/kldeth/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (r *testRPCClient) CallContext(ctx context.Context, result interface{}, met
}

const (
simpleStorage = "pragma solidity >=0.4.22 <0.6.0;\n\ncontract simplestorage {\nuint public storedData;\n\nconstructor(uint initVal) public {\nstoredData = initVal;\n}\n\nfunction set(uint x) public {\nstoredData = x;\n}\n\nfunction get() public view returns (uint retVal) {\nreturn storedData;\n}\n}"
twoContracts = "pragma solidity >=0.4.22 <0.6.0;\n\ncontract contract1 {function f1() public pure returns (uint retVal) {\nreturn 1;\n}\n}\n\ncontract contract2 {function f2() public pure returns (uint retVal) {\nreturn 2;\n}\n}"
simpleStorage = "pragma solidity >=0.4.22 <0.6.9;\n\ncontract simplestorage {\nuint public storedData;\n\nconstructor(uint initVal) public {\nstoredData = initVal;\n}\n\nfunction set(uint x) public {\nstoredData = x;\n}\n\nfunction get() public view returns (uint retVal) {\nreturn storedData;\n}\n}"
twoContracts = "pragma solidity >=0.4.22 <0.6.9;\n\ncontract contract1 {function f1() public pure returns (uint retVal) {\nreturn 1;\n}\n}\n\ncontract contract2 {function f2() public pure returns (uint retVal) {\nreturn 2;\n}\n}"
)

func TestNewContractDeployTxnSimpleStorage(t *testing.T) {
Expand Down Expand Up @@ -446,7 +446,7 @@ func testComplexParam(t *testing.T, solidityType string, val interface{}, expect
assert := assert.New(t)

var msg kldmessages.DeployContract
msg.Solidity = "pragma solidity >=0.4.22 <0.6.0; contract test {constructor(" + solidityType + " p1) public {}}"
msg.Solidity = "pragma solidity >=0.4.22 <0.6.9; contract test {constructor(" + solidityType + " p1) public {}}"
msg.Parameters = []interface{}{val}
msg.From = "0xAA983AD2a0e0eD8ac639277F37be42F2A5d2618c"
msg.Nonce = "123"
Expand Down
3 changes: 2 additions & 1 deletion internal/kldevents/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
type StreamInfo struct {
kldmessages.TimeSorted
ID string `json:"id"`
Name string `json:"name,omitempty"`
Path string `json:"path"`
Suspended bool `json:"suspended"`
Type string `json:"type,omitempty"`
Expand Down Expand Up @@ -422,7 +423,7 @@ func (a *eventStream) performActionWithRetry(batchNumber uint64, events []*event
complete := false
for !a.suspendOrStop() && !complete {
if attempt > 0 {
log.Infof("%s: Watiting %.2fs before re-attempting batch %d", a.spec.ID, delay.Seconds(), batchNumber)
log.Infof("%s: Waiting %.2fs before re-attempting batch %d", a.spec.ID, delay.Seconds(), batchNumber)
time.Sleep(delay)
delay = time.Duration(float64(delay) * a.backoffFactor)
}
Expand Down
18 changes: 17 additions & 1 deletion internal/kldevents/eventstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,21 @@ func TestBatchSizeCap(t *testing.T) {
defer stream.stop()

assert.Equal(uint64(MaxBatchSize), stream.spec.BatchSize)
assert.Equal("", stream.spec.Name)
}

func TestStreamName(t *testing.T) {
assert := assert.New(t)
_, stream, svr, eventStream := newTestStreamForBatching(
&StreamInfo{
Name: "testStream",
Webhook: &webhookAction{},
}, 200)
defer close(eventStream)
defer svr.Close()
defer stream.stop()

assert.Equal("testStream", stream.spec.Name)
}

func TestBlockingBehavior(t *testing.T) {
Expand Down Expand Up @@ -405,7 +420,8 @@ func setupTestSubscription(assert *assert.Assertions, sm *subscriptionMGR, strea
}
addr := kldbind.HexToAddress("0x167f57a13a9c35ff92f0649d2be0e52b4f8ac3ca")
ctx := context.Background()
s, _ := sm.AddSubscription(ctx, &addr, event, stream.spec.ID, "")
subscriptionName := "testSub"
s, _ := sm.AddSubscription(ctx, &addr, event, stream.spec.ID, "", subscriptionName)
return s
}

Expand Down
3 changes: 1 addition & 2 deletions internal/kldevents/logprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package kldevents

import (
"encoding/json"
"math/big"
"strconv"
"strings"
Expand All @@ -32,7 +31,7 @@ import (
type logEntry struct {
Address kldbind.Address `json:"address"`
BlockNumber kldbind.HexBigInt `json:"blockNumber"`
TransactionIndex json.Number `json:"transactionIndex"`
TransactionIndex kldbind.HexUint `json:"transactionIndex"`
TransactionHash kldbind.Hash `json:"transactionHash"`
Data string `json:"data"`
Topics []*kldbind.Hash `json:"topics"`
Expand Down
4 changes: 2 additions & 2 deletions internal/kldevents/submanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type SubscriptionManager interface {
SuspendStream(ctx context.Context, id string) error
ResumeStream(ctx context.Context, id string) error
DeleteStream(ctx context.Context, id string) error
AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock string) (*SubscriptionInfo, error)
AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock, name string) (*SubscriptionInfo, error)
Subscriptions(ctx context.Context) []*SubscriptionInfo
SubscriptionByID(ctx context.Context, id string) (*SubscriptionInfo, error)
DeleteSubscription(ctx context.Context, id string) error
Expand Down Expand Up @@ -125,7 +125,7 @@ func (s *subscriptionMGR) Subscriptions(ctx context.Context) []*SubscriptionInfo
}

// AddSubscription adds a new subscription
func (s *subscriptionMGR) AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock string) (*SubscriptionInfo, error) {
func (s *subscriptionMGR) AddSubscription(ctx context.Context, addr *kldbind.Address, event *kldbind.ABIEvent, streamID, initialBlock, name string) (*SubscriptionInfo, error) {
i := &SubscriptionInfo{
TimeSorted: kldmessages.TimeSorted{
CreatedISO8601: time.Now().UTC().Format(time.RFC3339),
Expand Down
11 changes: 6 additions & 5 deletions internal/kldevents/submanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func TestInitLevelDBFail(t *testing.T) {
func TestActionAndSubscriptionLifecyle(t *testing.T) {
assert := assert.New(t)
dir := tempdir(t)
subscriptionName := "testSub"
defer cleanup(t, dir)
sm := newTestSubscriptionManager()
sm.rpc = kldeth.NewMockRPCClientForSync(nil, nil)
Expand All @@ -123,7 +124,7 @@ func TestActionAndSubscriptionLifecyle(t *testing.T) {
})
assert.NoError(err)

sub, err := sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "ping"}, stream.ID, "")
sub, err := sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "ping"}, stream.ID, "", subscriptionName)
assert.NoError(err)
assert.Equal(stream.ID, sub.Stream)

Expand Down Expand Up @@ -195,7 +196,7 @@ func TestActionChildCleanup(t *testing.T) {
})
assert.NoError(err)

_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "ping"}, stream.ID, "12345")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "ping"}, stream.ID, "12345", "")
err = sm.DeleteStream(ctx, stream.ID)
assert.NoError(err)

Expand Down Expand Up @@ -231,11 +232,11 @@ func TestStreamAndSubscriptionErrors(t *testing.T) {
err = sm.DeleteStream(ctx, "teststream")
assert.EqualError(err, "pop")

_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "nope", "")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "nope", "", "")
assert.EqualError(err, "Stream with ID 'nope' not found")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "teststream", "")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "teststream", "", "test")
assert.EqualError(err, "Failed to store subscription: pop")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "teststream", "!bad integer")
_, err = sm.AddSubscription(ctx, nil, &kldbind.ABIEvent{Name: "any"}, "teststream", "!bad integer", "")
assert.EqualError(err, "FromBlock cannot be parsed as a BigInt")
sm.subscriptions["testsub"] = &subscription{info: &SubscriptionInfo{}, rpc: sm.rpc}
err = sm.DeleteSubscription(ctx, "nope")
Expand Down
14 changes: 10 additions & 4 deletions internal/kldevents/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ type SubscriptionInfo struct {
kldmessages.TimeSorted
ID string `json:"id,omitempty"`
Path string `json:"path"`
Name string `json:"name"`
Summary string `json:"-"` // System generated name for the subscription
Name string `json:"name"` // User provided name for the subscription, set to Summary if missing
Stream string `json:"stream"`
Filter persistedFilter `json:"filter"`
Event kldbind.MarshalledABIEvent `json:"event"`
Expand Down Expand Up @@ -83,13 +84,18 @@ func newSubscription(sm subscriptionManager, rpc kldeth.RPCClient, addr *kldbind
addrStr = addr.String()
}
event := &i.Event.E
i.Name = addrStr + ":" + event.Sig()
i.Summary = addrStr + ":" + event.Sig()
// If a name was not provided by the end user, set it to the system generated summary
if i.Name == "" {
log.Debugf("No name provided for subscription, using auto-generated summary:%s", i.Summary)
i.Name = i.Summary
}
if event == nil || event.Name == "" {
return nil, klderrors.Errorf(klderrors.EventStreamsSubscribeNoEvent)
}
// For now we only support filtering on the event type
f.Topics = [][]kldbind.Hash{[]kldbind.Hash{event.ID()}}
log.Infof("Created subscription %s %s topic:%s", i.ID, i.Name, event.ID().String())
f.Topics = [][]kldbind.Hash{{event.ID()}}
log.Infof("Created subscription ID:%s name:%s topic:%s", i.ID, i.Name, event.ID().String())
return s, nil
}

Expand Down
14 changes: 9 additions & 5 deletions internal/kldevents/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func TestCreateWebhookSub(t *testing.T) {
Name: "glastonbury",
RawName: "glastonbury",
Inputs: []kldbind.ABIArgument{
kldbind.ABIArgument{
{
Name: "field",
Type: kldbind.ABITypeKnown("address"),
},
kldbind.ABIArgument{
{
Name: "tents",
Type: kldbind.ABITypeKnown("uint256"),
},
kldbind.ABIArgument{
{
Name: "mud",
Type: kldbind.ABITypeKnown("bool"),
},
Expand All @@ -105,6 +105,7 @@ func TestCreateWebhookSub(t *testing.T) {

assert.Equal(s.info.ID, s1.info.ID)
assert.Equal("*:glastonbury(address,uint256,bool)", s1.info.Name)
assert.Equal("*:glastonbury(address,uint256,bool)", s1.info.Summary)
assert.Equal(event.ID(), s.info.Filter.Topics[0][0])
}

Expand All @@ -120,11 +121,14 @@ func TestCreateWebhookSubWithAddr(t *testing.T) {
}

addr := kldbind.HexToAddress("0x0123456789abcDEF0123456789abCDef01234567")
s, err := newSubscription(m, rpc, &addr, testSubInfo(event))
subInfo := testSubInfo(event)
subInfo.Name = "mySubscription"
s, err := newSubscription(m, rpc, &addr, subInfo)
assert.NoError(err)
assert.NotEmpty(s.info.ID)
assert.Equal(event.ID(), s.info.Filter.Topics[0][0])
assert.Equal("0x0123456789abcDEF0123456789abCDef01234567:devcon()", s.info.Name)
assert.Equal("0x0123456789abcDEF0123456789abCDef01234567:devcon()", s.info.Summary)
assert.Equal("mySubscription", s.info.Name)
}

func TestCreateSubscriptionNoEvent(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/kldrest/webhookskafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func TestWebhookHandlerYAMLDeployContract(t *testing.T) {
" type: DeployContract\n" +
"from: '0x4b098809E68C88e26442491c57866b7D4852216c'\n" +
"solidity: |-\n" +
" pragma solidity >=0.4.22 <0.6.0;\n" +
" pragma solidity >=0.4.22 <0.6.9;\n" +
" \n" +
" contract simplestorage {\n" +
" uint public storedData;\n" +
Expand Down
10 changes: 5 additions & 5 deletions internal/kldtx/txnprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ const testFromAddr = "0x83dBC8e329b38cBA0Fc4ed99b1Ce9c2a390ABdC1"

var goodDeployTxnJSON = "{" +
" \"headers\":{\"type\": \"DeployContract\"}," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.0; contract t {constructor() public {}}\"," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.9; contract t {constructor() public {}}\"," +
" \"from\":\"" + testFromAddr + "\"," +
" \"nonce\":\"123\"," +
" \"gas\":\"123\"" +
"}"

var goodHDWalletDeployTxnJSON = "{" +
" \"headers\":{\"type\": \"DeployContract\"}," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.0; contract t {constructor() public {}}\"," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.9; contract t {constructor() public {}}\"," +
" \"from\":\"hd-testinst-testwallet-1234\"," +
" \"nonce\":\"123\"," +
" \"gas\":\"123\"" +
Expand All @@ -101,7 +101,7 @@ var goodSendTxnJSON = "{" +

var goodDeployTxnPrivateJSON = "{" +
" \"headers\":{\"type\": \"DeployContract\"}," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.0; contract t {constructor() public {}}\"," +
" \"solidity\":\"pragma solidity >=0.4.22 <0.6.9; contract t {constructor() public {}}\"," +
" \"from\":\"" + testFromAddr + "\"," +
" \"nonce\":\"123\"," +
" \"gas\":\"123\"," +
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestOnSendTransactionMessageBadNonce(t *testing.T) {
testTxnContext.jsonMsg = "{" +
" \"headers\":{\"type\": \"SendTransaction\"}," +
" \"from\":\"0x83dBC8e329b38cBA0Fc4ed99b1Ce9c2a390ABdC1\"," +
" \"nonce\":\"abc\"" +
" \"nonce\":\"123.4\"" +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing this is a go version behavior change: using abc for nonce throws an error in parsing the json in the test file: "json: invalid number literal, trying to unmarshal ""abc"" into Number" instead of throwing an error in txn processing logic

"}"
txnProcessor.OnMessage(testTxnContext)
for len(testTxnContext.errorReplies) == 0 {
Expand All @@ -630,7 +630,7 @@ func TestOnSendTransactionMessageBadMsg(t *testing.T) {
" \"headers\":{\"type\": \"SendTransaction\"}," +
" \"from\":\"0x83dBC8e329b38cBA0Fc4ed99b1Ce9c2a390ABdC1\"," +
" \"nonce\":\"123\"," +
" \"value\":\"abc\"," +
" \"value\":\"123.456\"," +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guessing this is a go version behavior change: using abc for nonce throws an error in parsing the json in the test file: "json: invalid number literal, trying to unmarshal ""abc"" into Number" instead of throwing an error in txn processing logic

" \"method\":{\"name\":\"test\"}" +
"}"
txnProcessor.OnMessage(testTxnContext)
Expand Down
4 changes: 2 additions & 2 deletions test/simpleevents.sol
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pragma solidity >=0.5.2 <0.6.0;
pragma solidity >=0.5.2 <0.6.9;
/**
* @title Simple Storage with events
* @dev Read and write values to the chain
Expand Down Expand Up @@ -42,4 +42,4 @@ contract SimpleEvents {
return (storedI, storedS);
}

}
}