Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy aggregated batch verification #3212

Merged
merged 6 commits into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 116 additions & 63 deletions beacon_chain/gossip_processing/batch_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
{.push raises: [Defect].}

import
std/sequtils,
std/[deques, sequtils],
metrics,
# Status
chronicles, chronos,
../spec/signatures_batch,
Expand All @@ -19,47 +20,61 @@ export signatures_batch, blockchain_dag
logScope:
topics = "gossip_checks"

declareCounter batch_verification_batches,
"Total number of batches processed"
declareCounter batch_verification_signatures,
"Total number of verified signatures before aggregation"
declareCounter batch_verification_aggregates,
"Total number of verified signatures after aggregation"

# Batched gossip validation
# ----------------------------------------------------------------

type
BatchResult* {.pure.} = enum
Invalid # Invalid by default
Valid
Invalid
Timeout

Eager = proc(): bool {.gcsafe, raises: [Defect].} ##\
## Callback that returns true if eager processing should be done to lower
## latency at the expense of spending more cycles validating things, creating
## a crude timesharing priority mechanism.

BatchItem* = object
sigset: SignatureSet
fut: Future[BatchResult]

Batch* = object
created: Moment
pendingBuffer: seq[SignatureSet]
resultsBuffer: seq[Future[BatchResult]]
sigsets: seq[SignatureSet]
items: seq[BatchItem]

BatchCrypto* = object
# Each batch is bounded by BatchedCryptoSize (16) which was chosen:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment need to be updated to 72

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'm playing around with different values to strike a good balance - it turns out that in practice, on a really heavily loaded server the average batch size on mainnet, after applying the libp2p fix to validate messages concurrently:

  • max = 72 gives 28-30 signatures per batch and 2.4-2.6 signatures per aggregate
  • max = 36 gives 17-18 signatures per batch and 1.7-1.9 signatures per aggregate

This suggests that some batches are large, but most are much smaller than the max - basically, when max is large, we occasionally use the given space, but not always - it's also likely that the additional space gives aggregation a better chance simply because we collect signatures from different subnets (and it is only per subnet that aggregation can happen).

An earlier version tried using a separate batcher per subnet explicitly, but that doesn't work very well at all - there's not enough traffic on a single subnet to consistently fill up the batches, so overall, it lowers the efficiency of batching.

high 30ms delay.

The "maximum" delay when the batch is not full remains at 10ms, or whenever the async loop wakes up. I suspect increasing this timeout would improve batching and aggregation, but it would indeed hurt the case you're describing.

Screenshot from 2021-12-28 10-55-48
Screenshot from 2021-12-28 10-55-37

72 vs 36, mainnet server

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important to note is that "signatures per batch" != "verifications per batch": when we have 2.5 signatures per aggregate, we're performing 30/2.5 = 12 verifications per batch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved and expanded commentary - back to 72, it's ridiculously more efficient in terms of throughput and helps significantly with maintaining a healthy mesh on a slow node giving it more time to deal with anomolies

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# - based on "nimble bench" in nim-blscurve
# so that low power devices like Raspberry Pi 4 can process
# that many batched verifications within 20ms
# that many batched verifications within 30ms
# - based on the accumulation rate of attestations and aggregates
# in large instances which were 12000 per slot (12s)
# hence 1 per ms (but the pattern is bursty around the 4s mark)
# The number of batches is bounded by time - batch validation is skipped if
# we can't process them in the time that one slot takes, and we return
# timeout instead which prevents the gossip layer from forwarding the
# batch.
batches: seq[ref Batch]
batches: Deque[ref Batch]
eager: Eager ##\
## Eager is used to enable eager processing of attestations when it's
## prudent to do so (instead of leaving the CPU for other, presumably more
## important work like block processing)
##
verifier: BatchVerifier

pruneTime: Moment ## :ast time we had to prune something
pruneTime: Moment ## last time we had to prune something

# `nim-metrics` library is a bit too slow to update on every batch, so
# we accumulate here instead
counts: tuple[signatures, batches, aggregates: int64]

const
# We cap waiting for an idle slot in case there's a lot of network traffic
Expand All @@ -74,7 +89,7 @@ const
# (RNG for blinding and Final Exponentiation)
# are amortized,
# but not too big as we need to redo checks one-by-one if one failed.
BatchedCryptoSize = 16
BatchedCryptoSize = 72

proc new*(
T: type BatchCrypto, rng: ref BrHmacDrbgContext,
Expand All @@ -85,20 +100,22 @@ proc new*(
pruneTime: Moment.now())

func len(batch: Batch): int =
doAssert batch.resultsBuffer.len() == batch.pendingBuffer.len()
batch.resultsBuffer.len()
batch.items.len()

func full(batch: Batch): bool =
batch.len() >= BatchedCryptoSize

proc clear(batch: var Batch) =
batch.pendingBuffer.setLen(0)
batch.resultsBuffer.setLen(0)
proc complete(batchItem: var BatchItem, v: BatchResult) =
batchItem.fut.complete(v)
batchItem.fut = nil

proc complete(batchItem: var BatchItem, ok: bool) =
batchItem.fut.complete(if ok: BatchResult.Valid else: BatchResult.Invalid)
batchItem.fut = nil

proc skip(batch: var Batch) =
for res in batch.resultsBuffer.mitems():
for res in batch.items.mitems():
res.complete(BatchResult.Timeout)
batch.clear() # release memory early

proc pruneBatchQueue(batchCrypto: ref BatchCrypto) =
let
Expand All @@ -112,8 +129,18 @@ proc pruneBatchQueue(batchCrypto: ref BatchCrypto) =
notice "Batch queue pruned, skipping attestation validation",
batches = batchCrypto.batches.len()
batchCrypto.pruneTime = Moment.now()
batchCrypto.batches[0][].skip()
batchCrypto.batches.delete(0)

batchCrypto.batches.popFirst()[].skip()

proc combine(a: var Signature, b: Signature) =
var tmp = AggregateSignature.init(CookedSig(a))
tmp.aggregate(b)
a = Signature(tmp.finish())

proc combine(a: var PublicKey, b: PublicKey) =
var tmp = AggregatePublicKey.init(CookedPubKey(a))
tmp.aggregate(b)
a = PublicKey(tmp.finish())

proc processBatch(batchCrypto: ref BatchCrypto) =
## Process one batch, if there is any
Expand All @@ -126,41 +153,52 @@ proc processBatch(batchCrypto: ref BatchCrypto) =
return

let
batch = batchCrypto[].batches[0]
batchSize = batch[].len()
batchCrypto[].batches.del(0)
batch = batchCrypto[].batches.popFirst()
batchSize = batch[].sigsets.len()

if batchSize == 0:
# Nothing to do in this batch, can happen when a batch is created without
# there being any signatures successfully added to it
return

trace "batch crypto - starting",
batchSize

let startTick = Moment.now()

let ok = batchCrypto.verifier.batchVerify(batch.pendingBuffer)

trace "batch crypto - finished",
batchSize,
cryptoVerified = ok,
batchDur = Moment.now() - startTick

if ok:
for res in batch.resultsBuffer.mitems():
res.complete(BatchResult.Valid)
discard
else:
# Batched verification failed meaning that some of the signature checks
# failed, but we don't know which ones - check each signature separately
# instead
debug "batch crypto - failure, falling back",
trace "batch crypto - starting",
batchSize
for i, res in batch.resultsBuffer.mpairs():
let ok = blsVerify batch[].pendingBuffer[i]
res.complete(if ok: BatchResult.Valid else: BatchResult.Invalid)

batch[].clear() # release memory early
let
startTick = Moment.now()
ok =
if batchSize == 1: blsVerify(batch[].sigsets[0])
else: batchCrypto.verifier.batchVerify(batch[].sigsets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to move this in nim-blscurve, thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, that makes a lot of sense - this PR or separate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, that was easier said than done: nim-blscurve doesn't know about the RNG so it can't avoid generating randomness in case of a single signature. Leaving for a potential future refactoring..


trace "batch crypto - finished",
batchSize,
cryptoVerified = ok,
batchDur = Moment.now() - startTick

if ok:
for res in batch.items.mitems():
res.complete(BatchResult.Valid)
else:
# Batched verification failed meaning that some of the signature checks
# failed, but we don't know which ones - check each signature separately
# instead
debug "batch crypto - failure, falling back",
items = batch[].items.len()

for item in batch[].items.mitems():
item.complete(blsVerify item.sigset)

batchCrypto[].counts.batches += 1
batchCrypto[].counts.signatures += batch[].items.len()
batchCrypto[].counts.aggregates += batch[].sigsets.len()

if batchCrypto[].counts.batches >= 256:
# Not too often, so as not to overwhelm our metrics
batch_verification_batches.inc(batchCrypto[].counts.batches)
batch_verification_signatures.inc(batchCrypto[].counts.signatures)
batch_verification_aggregates.inc(batchCrypto[].counts.aggregates)

reset(batchCrypto[].counts)

proc deferCryptoProcessing(batchCrypto: ref BatchCrypto) {.async.} =
## Process pending crypto check after some time has passed - the time is
Expand All @@ -178,14 +216,14 @@ proc getBatch(batchCrypto: ref BatchCrypto): (ref Batch, bool) =
batchCrypto.pruneBatchQueue()

if batchCrypto.batches.len() == 0 or
batchCrypto.batches[^1][].full():
batchCrypto.batches.peekLast[].full():
# There are no batches in progress - start a new batch and schedule a
# deferred task to eventually handle it
let batch = (ref Batch)(created: Moment.now())
batchCrypto[].batches.add(batch)
batchCrypto[].batches.addLast(batch)
(batch, true)
else:
let batch = batchCrypto[].batches[^1]
let batch = batchCrypto[].batches.peekLast()
# len will be 0 when the batch was created but nothing added to it
# because of early failures
(batch, batch[].len() == 0)
Expand All @@ -198,7 +236,7 @@ proc scheduleBatch(batchCrypto: ref BatchCrypto, fresh: bool) =
asyncSpawn batchCrypto.deferCryptoProcessing()

if batchCrypto.batches.len() > 0 and
batchCrypto.batches[0][].full() and
batchCrypto.batches.peekFirst()[].full() and
batchCrypto.eager():
# If there's a full batch, process it eagerly assuming the callback allows
batchCrypto.processBatch()
Expand All @@ -215,13 +253,28 @@ template withBatch(
body: untyped): Future[BatchResult] =
block:
let
(batch {.inject.}, fresh) = batchCrypto.getBatch()

body
(batch, fresh) = batchCrypto.getBatch()

let fut = newFuture[BatchResult](name)

batch[].resultsBuffer.add(fut)
let
fut = newFuture[BatchResult](name)
sigset = body

var found = false
# Find existing signature sets with the same message - if we can verify an
# aggregate instead of several signatures, that is _much_ faster
for item in batch[].sigsets.mitems():
if item.message == sigset.message:
item.signature.combine(sigset.signature)
item.pubkey.combine(sigset.pubkey)
found = true
break

if not found:
batch[].sigsets.add sigset

# We need to keep the "original" sigset to allow verifying each signature
# one by one in the case the combined operation fails
batch[].items.add(BatchItem(sigset: sigset, fut: fut))

batchCrypto.scheduleBatch(fresh)
fut
Expand All @@ -245,7 +298,7 @@ proc scheduleAttestationCheck*(
let
sig = signature.load().orReturnErr("attestation: cannot load signature")
fut = batchCrypto.withBatch("batch_validation.scheduleAttestationCheck"):
batch.pendingBuffer.add_attestation_signature(
attestation_signature_set(
fork, genesis_validators_root, attestationData, pubkey, sig)

ok((fut, sig))
Expand Down Expand Up @@ -291,15 +344,15 @@ proc scheduleAggregateChecks*(

let
aggregatorFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregator"):
batch.pendingBuffer.add_aggregate_and_proof_signature(
aggregate_and_proof_signature_set(
fork, genesis_validators_root, aggregate_and_proof, aggregatorKey,
aggregatorSig)
slotFut = batchCrypto.withBatch("scheduleAggregateChecks.selection_proof"):
batch.pendingBuffer.add_slot_signature(
slot_signature_set(
fork, genesis_validators_root, aggregate.data.slot, aggregatorKey,
slotSig)
aggregateFut = batchCrypto.withBatch("scheduleAggregateChecks.aggregate"):
batch.pendingBuffer.add_attestation_signature(
attestation_signature_set(
fork, genesis_validators_root, aggregate.data, aggregateKey,
aggregateSig)

Expand All @@ -324,7 +377,7 @@ proc scheduleSyncCommitteeMessageCheck*(
sig = signature.load().orReturnErr(
"SyncCommitteMessage: cannot load signature")
fut = batchCrypto.withBatch("scheduleSyncCommitteeMessageCheck"):
batch.pendingBuffer.add_sync_committee_message_signature(
sync_committee_message_signature_set(
fork, genesis_validators_root, slot, beacon_block_root, pubkey, sig)

ok((fut, sig))
Expand Down Expand Up @@ -367,15 +420,15 @@ proc scheduleContributionChecks*(
contribution.aggregation_bits)
let
aggregatorFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.aggregator"):
batch.pendingBuffer.add_contribution_and_proof_signature(
contribution_and_proof_signature_set(
fork, genesis_validators_root, contribution_and_proof, aggregatorKey,
aggregatorSig)
proofFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.selection_proof"):
batch.pendingBuffer.add_sync_committee_selection_proof(
sync_committee_selection_proof_set(
fork, genesis_validators_root, contribution.slot,
contribution.subcommittee_index, aggregatorKey, proofSig)
contributionFut = batchCrypto.withBatch("scheduleContributionAndProofChecks.contribution"):
batch.pendingBuffer.add_sync_committee_message_signature(
sync_committee_message_signature_set(
fork, genesis_validators_root, contribution.slot,
contribution.beacon_block_root, contributionKey, contributionSig)

Expand Down
Loading