Skip to content

Commit

Permalink
feat: wire the faucet with the message pool
Browse files Browse the repository at this point in the history
  • Loading branch information
amimart committed Aug 10, 2022
1 parent c881f68 commit ea3da76
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 87 deletions.
157 changes: 77 additions & 80 deletions pkg/client/faucet.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,21 @@ import (
type Faucet interface {
Start()
GetConfig() pkg.Config
GetFromAddr() types.AccAddress
SubmitTx(ctx context.Context) (*types.TxResponse, error)
Send(addr string) error
Subscribe(addr string) (<-chan *types.TxResponse, error)
Close() error
}

type faucet struct {
config pkg.Config
grpcConn *grpc.ClientConn
fromAddr types.AccAddress
fromPrivKey crypto.PrivKey
txConfig client.TxConfig
triggerTx <-chan bool
buffer []types.Msg
txResponseChans []chan *types.TxResponse
}

func (f *faucet) GetFromAddr() types.AccAddress {
return f.fromAddr
}

func (f *faucet) GetConfig() pkg.Config {
return f.config
config pkg.Config
fromAddress types.AccAddress
amount int64
denom string
accountPrefix string
grpcConn *grpc.ClientConn
triggerTx <-chan bool
pool *MessagePool
}

func NewFaucet(config pkg.Config, triggerTxChan <-chan bool) (Faucet, error) {
Expand All @@ -65,116 +56,122 @@ func NewFaucet(config pkg.Config, triggerTxChan <-chan bool) (Faucet, error) {
return nil, err
}

fromAddr := types.AccAddress(fromPrivKey.PubKey().Address())
fromAddress := types.AccAddress(fromPrivKey.PubKey().Address())

return &faucet{
config: config,
grpcConn: grpcConn,
fromAddr: fromAddr,
fromPrivKey: fromPrivKey,
txConfig: simapp.MakeTestEncodingConfig().TxConfig,
triggerTx: triggerTxChan,
config: config,
fromAddress: fromAddress,
amount: config.AmountSend,
denom: config.Denom,
accountPrefix: config.Prefix,
grpcConn: grpcConn,
triggerTx: triggerTxChan,
pool: NewMessagePool(
WithTxSubmitter(
makeTxSubmitter(config, simapp.MakeTestEncodingConfig().TxConfig, grpcConn, fromPrivKey, fromAddress),
),
),
}, nil
}

func (f *faucet) Start() {
go func() {
log.Info().Msg("Starting submit routine")
for range f.triggerTx {
msgCount := len(f.buffer)
msgCount := f.pool.Size()

resp, err := f.SubmitTx(context.Background())
if err != nil {
log.Err(err).Int("msgCount", msgCount).Msg("Could not submit transaction")
for _, txResponseChan := range f.txResponseChans {
close(txResponseChan)
}
f.txResponseChans = f.txResponseChans[:0]
} else if resp != nil {
log.Info().
Int("messageCount", msgCount).
Str("txHash", resp.TxHash).
Uint32("txCode", resp.Code).
Msg("Successfully submit transaction")
for _, txResponseChan := range f.txResponseChans {
txResponseChan <- resp
close(txResponseChan)
}
f.txResponseChans = f.txResponseChans[:0]
} else {
log.Info().Msg("No message to submit")
}
}

log.Info().Msg("Stopping submit routine")
}()
}

func (f *faucet) SubmitTx(ctx context.Context) (*types.TxResponse, error) {
if len(f.buffer) == 0 {
return nil, nil
}
func (f *faucet) GetConfig() pkg.Config {
return f.config
}

defer func() {
f.buffer = f.buffer[:0]
}()
func (f *faucet) SubmitTx(ctx context.Context) (*types.TxResponse, error) {
return f.pool.Submit()
}

txBuilder, err := cosmos.BuildUnsignedTx(f.config, f.txConfig, f.buffer)
func (f *faucet) Send(addr string) error {
msgSend, err := f.makeSendMsg(addr)
if err != nil {
return nil, err
return err
}

account, err := cosmos.GetAccount(ctx, f.grpcConn, f.fromAddr.String())
f.pool.RegisterMsg(msgSend)
return nil
}

func (f *faucet) Subscribe(addr string) (<-chan *types.TxResponse, error) {
msgSend, err := f.makeSendMsg(addr)
if err != nil {
return nil, err
}

signerData := signing.SignerData{
ChainID: f.config.ChainID,
AccountNumber: account.GetAccountNumber(),
Sequence: account.GetSequence(),
}
return f.pool.SubscribeMsg(msgSend), nil
}

err = cosmos.SignTx(f.fromPrivKey, signerData, f.txConfig, txBuilder)
if err != nil {
return nil, err
}
func (f *faucet) Close() error {
return f.grpcConn.Close()
}

txBytes, err := f.txConfig.TxEncoder()(txBuilder.GetTx())
func (f *faucet) makeSendMsg(addr string) (types.Msg, error) {
toAddr, err := types.GetFromBech32(addr, f.accountPrefix)
if err != nil {
return nil, err
}

return cosmos.BroadcastTx(ctx, f.grpcConn, txBytes)
return banktypes.NewMsgSend(
f.fromAddress,
toAddr,
types.NewCoins(types.NewInt64Coin(f.denom, f.amount)),
), nil
}

func (f *faucet) Send(addr string) error {
toAddr, err := types.GetFromBech32(addr, f.config.Prefix)
if err != nil {
return err
}
func makeTxSubmitter(config pkg.Config, txConfig client.TxConfig, grpcConn *grpc.ClientConn, privKey crypto.PrivKey, addr types.AccAddress) TxSubmitter {
return func(msgs []types.Msg) (*types.TxResponse, error) {
txBuilder, err := cosmos.BuildUnsignedTx(config, txConfig, msgs)
if err != nil {
return nil, err
}

f.buffer = append(
f.buffer,
banktypes.NewMsgSend(
f.fromAddr,
toAddr,
types.NewCoins(types.NewInt64Coin(f.config.Denom, f.config.AmountSend)),
),
)
return nil
}
account, err := cosmos.GetAccount(context.Background(), grpcConn, addr.String())
if err != nil {
return nil, err
}

func (f *faucet) Subscribe(addr string) (<-chan *types.TxResponse, error) {
if err := f.Send(addr); err != nil {
return nil, err
}
txResponseChan := make(chan *types.TxResponse)
f.txResponseChans = append(f.txResponseChans, txResponseChan)
signerData := signing.SignerData{
ChainID: config.ChainID,
AccountNumber: account.GetAccountNumber(),
Sequence: account.GetSequence(),
}

return txResponseChan, nil
}
err = cosmos.SignTx(privKey, signerData, txConfig, txBuilder)
if err != nil {
return nil, err
}

func (f *faucet) Close() error {
return f.grpcConn.Close()
txBytes, err := txConfig.TxEncoder()(txBuilder.GetTx())
if err != nil {
return nil, err
}

return cosmos.BroadcastTx(context.Background(), grpcConn, txBytes)
}
}

func getTransportCredentials(config pkg.Config) credentials.TransportCredentials {
Expand Down
18 changes: 11 additions & 7 deletions pkg/client/message_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
type MessagePool struct {
mut *sync.Mutex
submitterFunc TxSubmitter
msgs []*types.Msg
msgs []types.Msg
subscribers []chan *types.TxResponse
}

// TxSubmitter shall implement all the logic to build, sign and submit a transaction containing all the messages of the
// pool.
type TxSubmitter func([]*types.Msg) (*types.TxResponse, error)
type TxSubmitter func([]types.Msg) (*types.TxResponse, error)

// MessagePoolOption allow to configure a MessagePool.
type MessagePoolOption func(pool *MessagePool)
Expand All @@ -41,8 +41,12 @@ func WithTxSubmitter(submitterFunc TxSubmitter) MessagePoolOption {
}
}

func (pool *MessagePool) Size() int {
return len(pool.msgs)
}

// RegisterMsg atomically add the message in the pool.
func (pool *MessagePool) RegisterMsg(msg *types.Msg) {
func (pool *MessagePool) RegisterMsg(msg types.Msg) {
pool.lock()
defer pool.unlock()

Expand All @@ -51,7 +55,7 @@ func (pool *MessagePool) RegisterMsg(msg *types.Msg) {

// SubscribeMsg atomically add the message in the pool, it also returns a channel on which will be sent the
// corresponding transaction response, see Submit for more information on how the channel is managed.
func (pool *MessagePool) SubscribeMsg(msg *types.Msg) <-chan *types.TxResponse {
func (pool *MessagePool) SubscribeMsg(msg types.Msg) <-chan *types.TxResponse {
pool.lock()
defer pool.unlock()

Expand All @@ -71,7 +75,7 @@ func (pool *MessagePool) SubscribeMsg(msg *types.Msg) <-chan *types.TxResponse {
//
// Warning: To avoid locking the MessagePool, the channels are closed in separate routines which can lead to goroutine
// leak if they are not consumed.
func (pool *MessagePool) Submit() error {
func (pool *MessagePool) Submit() (*types.TxResponse, error) {
pool.lock()
defer func() {
pool.flush()
Expand All @@ -80,14 +84,14 @@ func (pool *MessagePool) Submit() error {

resp, err := pool.submitterFunc(pool.msgs)
if err != nil {
return err
return nil, err
}

for _, subscriber := range pool.subscribers {
subscriber <- resp
}

return nil
return resp, nil
}

func (pool *MessagePool) lock() {
Expand Down

0 comments on commit ea3da76

Please sign in to comment.