Skip to content

Commit

Permalink
accounts,core,eth/filters,les,light,internal/ethapi: Add from/to filt…
Browse files Browse the repository at this point in the history
…ering of ReturnData
  • Loading branch information
reductionista committed May 24, 2018
1 parent 929ebc0 commit a92c854
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 111 deletions.
2 changes: 1 addition & 1 deletion accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscr
return fb.bc.SubscribeLogsEvent(ch)
}

func (fb *filterBackend) SubscribeTransactionEvent(ch chan<- core.TransactionEvent) event.Subscription {
func (fb *filterBackend) SubscribeTransactionEvent(ch chan<- []*core.TransactionEvent) event.Subscription {
return fb.bc.SubscribeTransactionEvent(ch)
}

Expand Down
43 changes: 30 additions & 13 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,10 +1168,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
coalescedLogs = append(coalescedLogs, logs...)
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainEvent{block, block.Hash(), logs})
for _, txPostEvent := range txPostEvents {
events = append(events, txPostEvent)
}
lastCanon = block
events = append(events, txPostEvents)

// Only count canonical blocks for GC processing time
bc.gcproc += proctime
Expand Down Expand Up @@ -1249,11 +1246,12 @@ func countTransactions(chain []*types.Block) (c int) {
// event about them
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
deletedTxs types.Transactions
deletedLogs []*types.Log
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
deletedTxs types.Transactions
txPostEvents []*TransactionEvent
deletedLogs []*types.Log
// collectLogs collects the logs that were generated during the
// processing of the block that corresponds with the given hash.
// These logs are later announced as deleted.
Expand Down Expand Up @@ -1296,6 +1294,8 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
return fmt.Errorf("Invalid new chain")
}

blockLookup := make(map[common.Hash]*big.Int)

for {
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock
Expand All @@ -1306,6 +1306,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
collectLogs(oldBlock.Hash())
// save block # of each deleted transaction; will need this to deduce signer for TransactionEvent
for _, tx := range oldBlock.Transactions() {
blockLookup[tx.Hash()] = oldBlock.Number()
}

oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if oldBlock == nil {
Expand Down Expand Up @@ -1341,9 +1345,22 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
// When transactions get deleted from the database that means the
// receipts that were created in the fork must also be deleted
rawdb.DeleteTxLookupEntry(bc.db, tx.Hash())
// Let ReturnData subscribers know when a transaction is removed from canonical chain
bc.txPostFeed.Send(TransactionEvent{TxHash: tx.Hash(), RetData: &types.ReturnData{TxHash: tx.Hash(), Removed: true}})

// Construct TransactionEvent for subscribers of txPostFeed
from, _ := types.Sender(types.MakeSigner(bc.chainConfig, blockLookup[tx.Hash()]), tx)

data := types.ReturnData{TxHash: tx.Hash(), Removed: true}
txEvent := TransactionEvent{TxHash: tx.Hash(),
From: &from,
To: tx.To(),
RetData: &data,
}
txPostEvents = append(txPostEvents, &txEvent)
}

// Let subscribers know when a transaction is removed from canonical chain
bc.txPostFeed.Send(txPostEvents)

if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
Expand Down Expand Up @@ -1377,7 +1394,7 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
case ChainSideEvent:
bc.chainSideFeed.Send(ev)

case TransactionEvent:
case []*TransactionEvent:
bc.txPostFeed.Send(ev)
}
}
Expand Down Expand Up @@ -1567,7 +1584,7 @@ func (bc *BlockChain) SubscribeChainSideEvent(ch chan<- ChainSideEvent) event.Su
}

// SubscribeTransactionEvent registers a subscription of SubscribeTransactionEvent.
func (bc *BlockChain) SubscribeTransactionEvent(ch chan<- TransactionEvent) event.Subscription {
func (bc *BlockChain) SubscribeTransactionEvent(ch chan<- []*TransactionEvent) event.Subscription {
return bc.scope.Track(bc.txPostFeed.Subscribe(ch))
}

Expand Down
2 changes: 2 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type NewMinedBlockEvent struct{ Block *types.Block }
// TransactionEvent is posted when a transaction completes execution
type TransactionEvent struct {
TxHash common.Hash
From *common.Address
To *common.Address
RetData *types.ReturnData
}

Expand Down
8 changes: 5 additions & 3 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,16 @@ func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consen
// Process returns the receipts and logs accumulated during the process and
// returns the amount of gas that was used in the process. If any of the
// transactions failed to execute due to insufficient gas it will return an error.
func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, []TransactionEvent, error) {
func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, []*TransactionEvent, error) {
var (
receipts types.Receipts
usedGas = new(uint64)
header = block.Header()
allLogs []*types.Log
gp = new(GasPool).AddGas(block.GasLimit())
retData *types.ReturnData
txPostEvents []TransactionEvent
txPostEvents []*TransactionEvent
signer = types.MakeSigner(p.config, block.Number())
)
// Mutate the the block and state according to any hard-fork specs
if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
Expand All @@ -77,7 +78,8 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
retData = &types.ReturnData{TxHash: tx.Hash(), Data: data}
txPostEvents = append(txPostEvents, TransactionEvent{TxHash: tx.Hash(), RetData: retData})
from, _ := types.Sender(signer, tx)
txPostEvents = append(txPostEvents, &TransactionEvent{TxHash: tx.Hash(), From: &from, To: tx.To(), RetData: retData})
}
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), receipts)
Expand Down
2 changes: 1 addition & 1 deletion core/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ type Validator interface {
// of gas used in the process and return an error if any of the internal rules
// failed.
type Processor interface {
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, []TransactionEvent, error)
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, []*TransactionEvent, error)
}
2 changes: 1 addition & 1 deletion eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (b *EthAPIBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) e
return b.eth.BlockChain().SubscribeChainSideEvent(ch)
}

func (b *EthAPIBackend) SubscribeTransactionEvent(ch chan<- core.TransactionEvent) event.Subscription {
func (b *EthAPIBackend) SubscribeTransactionEvent(ch chan<- []*core.TransactionEvent) event.Subscription {
return b.eth.BlockChain().SubscribeTransactionEvent(ch)
}

Expand Down
138 changes: 101 additions & 37 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package filters
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"reflect"
Expand Down Expand Up @@ -232,10 +231,10 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
return rpcSub, nil
}

func (api *PublicFilterAPI) NewReturnDataFilter() rpc.ID {
func (api *PublicFilterAPI) NewReturnDataFilter(crit TxFilterCriteria) rpc.ID {
var (
retCh = make(chan *types.ReturnData)
retSub = api.events.SubscribeReturnData(retCh)
retCh = make(chan []*types.ReturnData)
retSub = api.events.SubscribeReturnData(retCh, ethereum.TxFilterQuery(crit))
)

api.filtersMu.Lock()
Expand All @@ -246,11 +245,16 @@ func (api *PublicFilterAPI) NewReturnDataFilter() rpc.ID {
for {
select {
case retData := <-retCh:
api.filtersMu.Lock()
if f, found := api.filters[retSub.ID]; found {
f.retData = append(f.retData, *retData)
if len(retData) > 0 {
api.filtersMu.Lock()
f, found := api.filters[retSub.ID]
api.filtersMu.Unlock()
if found {
for _, data := range retData {
f.retData = append(f.retData, *data)
}
}
}
api.filtersMu.Unlock()
case <-retSub.Err():
api.filtersMu.Lock()
delete(api.filters, retSub.ID)
Expand All @@ -263,7 +267,7 @@ func (api *PublicFilterAPI) NewReturnDataFilter() rpc.ID {
return retSub.ID
}

func (api *PublicFilterAPI) ReturnData(ctx context.Context) (*rpc.Subscription, error) {
func (api *PublicFilterAPI) ReturnData(ctx context.Context, crit TxFilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
Expand All @@ -275,8 +279,8 @@ func (api *PublicFilterAPI) ReturnData(ctx context.Context) (*rpc.Subscription,
rpcSub.SetType(rpc.ReturnDataSubscription)

go func() {
retCh := make(chan *types.ReturnData)
retSub := api.events.SubscribeReturnData(retCh)
retCh := make(chan []*types.ReturnData)
retSub := api.events.SubscribeReturnData(retCh, ethereum.TxFilterQuery(crit))

for {
select {
Expand All @@ -289,9 +293,13 @@ func (api *PublicFilterAPI) ReturnData(ctx context.Context) (*rpc.Subscription,
log.Warn(fmt.Sprintf("Received update msg of invalid type %s for ReturnData subscription", reflect.TypeOf(msg).String()))
}
case retData := <-retCh:
if _, has := txListen[retData.TxHash]; has {
// tx from client just completed execution--send back return data
notifier.Notify(rpcSub.ID, retData)
if len(retData) > 0 {
for _, data := range retData {
if _, has := txListen[data.TxHash]; has {
// tx from client just completed execution--send back return data
notifier.Notify(rpcSub.ID, retData)
}
}
}
case <-rpcSub.Err():
retSub.Unsubscribe()
Expand Down Expand Up @@ -344,10 +352,14 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
return rpcSub, nil
}

// FilterCriteria represents a request to create a new filter.
// FilterCriteria represents a request to create a new log filter.
// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
type FilterCriteria ethereum.FilterQuery

// TxFilterCriteria represents a request to create a new transaction filter.
// Adds UnmarshallJSON() method to ethereum.TxFilterQuery
type TxFilterCriteria ethereum.TxFilterQuery

// NewFilter creates a new filter and returns the filter id. It can be
// used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state.
Expand Down Expand Up @@ -534,6 +546,7 @@ func returnRetData(rdata []types.ReturnData) []types.ReturnData {

// UnmarshalJSON sets *args fields with given data.
func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
var err error
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
Expand All @@ -542,7 +555,7 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
}

var raw input
if err := json.Unmarshal(data, &raw); err != nil {
if err = json.Unmarshal(data, &raw); err != nil {
return err
}

Expand All @@ -558,27 +571,9 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {

if raw.Addresses != nil {
// raw.Address can contain a single address or an array of addresses
switch rawAddr := raw.Addresses.(type) {
case []interface{}:
for i, addr := range rawAddr {
if strAddr, ok := addr.(string); ok {
addr, err := decodeAddress(strAddr)
if err != nil {
return fmt.Errorf("invalid address at index %d: %v", i, err)
}
args.Addresses = append(args.Addresses, addr)
} else {
return fmt.Errorf("non-string address at index %d", i)
}
}
case string:
addr, err := decodeAddress(rawAddr)
if err != nil {
return fmt.Errorf("invalid address: %v", err)
}
args.Addresses = []common.Address{addr}
default:
return errors.New("invalid addresses in query")
args.Addresses, err = decodeAddresses(raw.Addresses, "query")
if err != nil {
return err
}
}

Expand Down Expand Up @@ -626,6 +621,75 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
return nil
}

// UnmarshalJSON sets *args fields with given data.
func (args *TxFilterCriteria) UnmarshalJSON(data []byte) error {
var err error
type input struct {
From interface{} `json:"from"`
To interface{} `json:"to"`
}

var raw input
if err = json.Unmarshal(data, &raw); err != nil {
return err
}

args.From = nil
args.To = nil

if raw.From != nil {
args.From, err = decodeAddresses(raw.From, "from")
if err != nil {
return err
}
}

if raw.To != nil {
args.To, err = decodeAddresses(raw.To, "to")
if err != nil {
return err
}
}

return nil
}

// Decodes address field from JSON, which could be either
// a single common.Address or an array []common.Address.
// second arg is the name of the field being decoded, for error reporting.
func decodeAddresses(addresses interface{}, field string) ([]common.Address, error) {
decoded := []common.Address{}

if addresses == nil {
return decoded, nil
}

switch rawAddr := addresses.(type) {
case []interface{}:
for i, addr := range rawAddr {
if strAddr, ok := addr.(string); ok {
addr, err := decodeAddress(strAddr)
if err != nil {
return decoded, fmt.Errorf("invalid address at index %d: %v", i, err)
}
decoded = append(decoded, addr)
} else {
return decoded, fmt.Errorf("non-string address at index %d", i)
}
}
case string:
addr, err := decodeAddress(rawAddr)
if err != nil {
return decoded, fmt.Errorf("invalid address: %v", err)
}
decoded = []common.Address{addr}
default:
return decoded, fmt.Errorf("invalid addresses in %s", field)
}

return decoded, nil
}

func decodeAddress(s string) (common.Address, error) {
b, err := hexutil.Decode(s)
if err == nil && len(b) != common.AddressLength {
Expand Down
Loading

0 comments on commit a92c854

Please sign in to comment.