Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for out of order nonce #2710

Merged
merged 7 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,6 @@ github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded h1:ZQlvR5RB4nfT+cO
github.com/livepeer/joy4 v0.1.2-0.20191121080656-b2fea45cbded/go.mod h1:xkDdm+akniYxVT9KW1Y2Y7Hso6aW+rZObz3nrA9yTHw=
github.com/livepeer/livepeer-data v0.4.11 h1:Sv+ss8e4vcscnMWLxcRJ2g3sNIHyQ3RzCtgEelfGPzw=
github.com/livepeer/livepeer-data v0.4.11/go.mod h1:VIbJRdyH2Tas8EgLVkP79IPMepFDOv0dgHYLEZsCaf4=
github.com/livepeer/lpms v0.0.0-20221123192553-7cef5fc8c1d2 h1:q9DU5UATq+3zzpG8MmuGkLQKAalqo/ivXpbbimzuk4U=
github.com/livepeer/lpms v0.0.0-20221123192553-7cef5fc8c1d2/go.mod h1:Hr/JhxxPDipOVd4ZrGYWrdJfpVF8/SEI0nNr2ctAlkM=
github.com/livepeer/lpms v0.0.0-20230106125835-78ae44836422 h1:AIm6737jFdSrUbzUFneCZuxy8OBXllez6k6vRzp93JE=
github.com/livepeer/lpms v0.0.0-20230106125835-78ae44836422/go.mod h1:Hr/JhxxPDipOVd4ZrGYWrdJfpVF8/SEI0nNr2ctAlkM=
github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU=
Expand Down
33 changes: 22 additions & 11 deletions pm/recipient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ var errInsufficientSenderReserve = errors.New("insufficient sender reserve")
// maxWinProb = 2^256 - 1
var maxWinProb = new(big.Int).Sub(new(big.Int).Lsh(big.NewInt(1), 256), big.NewInt(1))

// max number of sender nonces for a given recipient random hash
var maxSenderNonces = 50

var paramsExpirationBlock = big.NewInt(10)
var paramsExpiryBuffer = int64(1)

Expand Down Expand Up @@ -86,7 +89,7 @@ type recipient struct {
maxfacevalue *big.Int

senderNonces map[string]*struct {
nonce uint32
nonceSeen map[uint32]byte
Copy link
Member

@yondonfu yondonfu Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is there an advantage of using byte here instead of a bool?

IIUC a bool should be 1 byte which would be the same size as a byte [1] so I don't think there are any storage benefits of using byte. If this is the case, I suggest using a bool as I think using true/false values will be clearer for code readability than using 0/1.

[1] Can verify using unsafe.Sizeof() on a bool var and a byte var.

expirationBlock *big.Int
}
senderNoncesLock sync.Mutex
Expand Down Expand Up @@ -124,7 +127,7 @@ func NewRecipientWithSecret(addr ethcommon.Address, broker Broker, val Validator
secret: secret,
maxfacevalue: big.NewInt(0),
senderNonces: make(map[string]*struct {
nonce uint32
nonceSeen map[uint32]byte
expirationBlock *big.Int
}),
cfg: cfg,
Expand Down Expand Up @@ -363,16 +366,24 @@ func (r *recipient) updateSenderNonce(rand *big.Int, ticket *Ticket) error {
defer r.senderNoncesLock.Unlock()

randStr := rand.String()
sn, ok := r.senderNonces[randStr]
if ok && ticket.SenderNonce <= sn.nonce {
return errors.Errorf("invalid ticket senderNonce sender=%v nonce=%v highest=%v", ticket.Sender.Hex(), ticket.SenderNonce, sn.nonce)
senderNonce, randKeySeen := r.senderNonces[randStr]
Copy link
Member

@yondonfu yondonfu Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
senderNonce, randKeySeen := r.senderNonces[randStr]
senderNonces, randKeySeen := r.senderNonces[randStr]

Nit: The plural feels a bit clearer here since the map is tracking a collection of nonces as opposed to a single one.

if randKeySeen {
_, isSeen := senderNonce.nonceSeen[ticket.SenderNonce]
if isSeen {
return errors.Errorf("invalid ticket senderNonce: already seen sender=%v nonce=%v", ticket.Sender.Hex(), ticket.SenderNonce)
}
} else {
r.senderNonces[randStr] = &struct {
nonceSeen map[uint32]byte
expirationBlock *big.Int
}{make(map[uint32]byte), ticket.ParamsExpirationBlock}
}

r.senderNonces[randStr] = &struct {
nonce uint32
expirationBlock *big.Int
}{ticket.SenderNonce, ticket.ParamsExpirationBlock}

// check nonce map size
if len(r.senderNonces[randStr].nonceSeen) > maxSenderNonces-1 {
cyberj0g marked this conversation as resolved.
Show resolved Hide resolved
return errors.Errorf("invalid ticket senderNonce: too many values sender=%v nonce=%v", ticket.Sender.Hex(), ticket.SenderNonce)
}
// add new nonce
r.senderNonces[randStr].nonceSeen[ticket.SenderNonce] = 1
return nil
}

Expand Down
45 changes: 31 additions & 14 deletions pm/recipient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ func TestReceiveTicket_ValidNonWinningTicket(t *testing.T) {
recipientRand := genRecipientRand(sender, secret, params)
senderNonce := r.(*recipient).senderNonces[recipientRand.String()]

if senderNonce.nonce != newSenderNonce {
t.Errorf("expected senderNonce to be %d, got %d", newSenderNonce, senderNonce.nonce)
if _, ok := senderNonce.nonceSeen[newSenderNonce]; !ok {
t.Errorf("expected senderNonce to exist: %d", newSenderNonce)
}
}

Expand Down Expand Up @@ -257,8 +257,8 @@ func TestReceiveTicket_ValidWinningTicket(t *testing.T) {
recipientRand := genRecipientRand(sender, secret, params)
senderNonce := r.(*recipient).senderNonces[recipientRand.String()]

if senderNonce.nonce != newSenderNonce {
t.Errorf("expected senderNonce to be %d, got %d", newSenderNonce, senderNonce.nonce)
if _, ok := senderNonce.nonceSeen[newSenderNonce]; !ok {
t.Errorf("expected senderNonce to exist: %d", newSenderNonce)
}
}

Expand Down Expand Up @@ -308,6 +308,7 @@ func TestReceiveTicket_InvalidSenderNonce(t *testing.T) {
}

func TestReceiveTicket_ValidNonWinningTicket_Concurrent(t *testing.T) {
assert := assert.New(t)
sender, b, v, gm, sm, tm, cfg, sig := newRecipientFixtureOrFatal(t)
r := newRecipientOrFatal(t, RandAddress(), b, v, gm, sm, tm, cfg)
params, err := r.TicketParams(sender, big.NewRat(1, 1))
Expand All @@ -330,11 +331,25 @@ func TestReceiveTicket_ValidNonWinningTicket_Concurrent(t *testing.T) {
}
}(uint32(i))
}

wg.Wait()
assert.Zero(errCount)
}

if errCount == 0 {
t.Error("expected more than zero senderNonce errors for concurrent ticket receipt")
func TestReceiveTicket_NonceMapFill(t *testing.T) {
assert := assert.New(t)
sender, b, v, gm, sm, tm, cfg, sig := newRecipientFixtureOrFatal(t)
r := newRecipientOrFatal(t, RandAddress(), b, v, gm, sm, tm, cfg)
params, err := r.TicketParams(sender, big.NewRat(1, 1))
require.Nil(t, err)
// fill nonce map to capacity
for i := 0; i < maxSenderNonces+1; i++ {
ticket := newTicket(sender, params, uint32(i))
_, _, err := r.ReceiveTicket(ticket, sig, params.Seed)
if i < maxSenderNonces {
assert.NoError(err)
} else {
assert.Error(err)
}
}
}

Expand Down Expand Up @@ -656,7 +671,7 @@ func TestSenderNoncesCleanupLoop(t *testing.T) {
tm: tm,
quit: make(chan struct{}),
senderNonces: make(map[string]*struct {
nonce uint32
nonceSeen map[uint32]byte
expirationBlock *big.Int
}),
}
Expand All @@ -666,17 +681,19 @@ func TestSenderNoncesCleanupLoop(t *testing.T) {
rand1 := "charizard"
rand2 := "raichu"
r.senderNonces[rand0] = &struct {
nonce uint32
nonceSeen map[uint32]byte
expirationBlock *big.Int
}{1, big.NewInt(3)}
}{map[uint32]byte{1: 1}, big.NewInt(3)}

r.senderNonces[rand1] = &struct {
nonce uint32
nonceSeen map[uint32]byte
expirationBlock *big.Int
}{1, big.NewInt(2)}
}{map[uint32]byte{1: 1}, big.NewInt(2)}

r.senderNonces[rand2] = &struct {
nonce uint32
nonceSeen map[uint32]byte
expirationBlock *big.Int
}{1, big.NewInt(1)}
}{map[uint32]byte{1: 1}, big.NewInt(1)}

go r.senderNoncesCleanupLoop()
time.Sleep(20 * time.Millisecond)
Expand Down
16 changes: 16 additions & 0 deletions test/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,22 @@ func pushSegmentsBroadcaster(t *testing.T, b *livepeer, numSegs int) {
}
}

func pushSegmentsParallelBroadcaster(t *testing.T, b *livepeer, numSegs int) {
assert := assert.New(t)

var wg sync.WaitGroup
mid := common.RandName()
for i := 0; i < numSegs; i++ {
wg.Add(1)
go func(mid string, seqNo int) {
assert.Nil(pushSegmentBroadcaster(b, mid, i))
wg.Done()
}(mid, i)
}

wg.Wait()
}

func pushSegmentBroadcaster(b *livepeer, manifestID string, seqNo int) error {
data, err := ioutil.ReadFile("test.flv")
rdr := bytes.NewReader(data)
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/http_push_broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ func TestHTTPPushBroadcaster(t *testing.T) {

// Sequential requests
pushSegmentsBroadcaster(t, b, 3)

// Parallel requests
pushSegmentsParallelBroadcaster(t, b, 5)
}