Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1898 slashing violations consumer alsp integration test #4549

Merged
merged 11 commits into from
Jul 19, 2023
3 changes: 1 addition & 2 deletions network/alsp/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (m *MisbehaviorReportManager) HandleMisbehaviorReport(channel channels.Chan
Hex("misbehaving_id", logging.ID(report.OriginId())).
Str("reason", report.Reason().String()).
Float64("penalty", report.Penalty()).Logger()
lg.Trace().Msg("received misbehavior report")
m.metrics.OnMisbehaviorReported(channel.String(), report.Reason().String())

nonce := [internal.NonceSize]byte{}
Expand Down Expand Up @@ -333,7 +334,6 @@ func (m *MisbehaviorReportManager) onHeartbeat() error {
Cause: network.DisallowListedCauseAlsp, // sets the ALSP disallow listing cause on node
})
}

// each time we decay the penalty by the decay speed, the penalty is a negative number, and the decay speed
// is a positive number. So the penalty is getting closer to zero.
// We use math.Min() to make sure the penalty is never positive.
Expand Down Expand Up @@ -425,7 +425,6 @@ func (m *MisbehaviorReportManager) processMisbehaviorReport(report internal.Repo
// we should crash the node in this case to prevent further misbehavior reports from being lost and fix the bug.
return fmt.Errorf("failed to apply penalty to the spam record: %w", err)
}

lg.Debug().Float64("updated_penalty", updatedPenalty).Msg("misbehavior report handled")
return nil
}
120 changes: 116 additions & 4 deletions network/alsp/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/onflow/flow-go/network/slashing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -183,7 +185,7 @@ func TestHandleReportedMisbehavior_Cache_Integration(t *testing.T) {
// TestHandleReportedMisbehavior_And_DisallowListing_Integration implements an end-to-end integration test for the
// handling of reported misbehavior and disallow listing.
//
// The test sets up 3 nodes, one victim, one honest, and one (alledged) spammer.
// The test sets up 3 nodes, one victim, one honest, and one (alleged) spammer.
// Initially, the test ensures that all nodes are connected to each other.
// Then, test imitates that victim node reports the spammer node for spamming.
// The test generates enough spam reports to trigger the disallow-listing of the victim node.
Expand All @@ -196,11 +198,11 @@ func TestHandleReportedMisbehavior_And_DisallowListing_Integration(t *testing.T)
// this test is assessing the integration of the ALSP manager with the network. As the ALSP manager is an attribute
// of the network, we need to configure the ALSP manager via the network configuration, and let the network create
// the ALSP manager.
var victimSpamRecordCacheCache alsp.SpamRecordCache
var victimSpamRecordCache alsp.SpamRecordCache
cfg.Opts = []alspmgr.MisbehaviorReportManagerOption{
alspmgr.WithSpamRecordsCacheFactory(func(logger zerolog.Logger, size uint32, metrics module.HeroCacheMetrics) alsp.SpamRecordCache {
victimSpamRecordCacheCache = internal.NewSpamRecordCache(size, logger, metrics, model.SpamRecordFactory())
return victimSpamRecordCacheCache
victimSpamRecordCache = internal.NewSpamRecordCache(size, logger, metrics, model.SpamRecordFactory())
return victimSpamRecordCache
}),
}

Expand Down Expand Up @@ -266,6 +268,116 @@ func TestHandleReportedMisbehavior_And_DisallowListing_Integration(t *testing.T)
p2ptest.EnsureNotConnectedBetweenGroups(t, ctx, []p2p.LibP2PNode{nodes[victimIndex]}, []p2p.LibP2PNode{nodes[spammerIndex]})
}

// TestHandleReportedMisbehavior_And_SlashingViolationsConsumer_Integration implements an end-to-end integration test for the
// handling of reported misbehavior from the slashing.ViolationsConsumer.
//
// The test sets up one victim, one honest, and one (alleged) spammer for each of the current slashing violations.
// Initially, the test ensures that all nodes are connected to each other.
// Then, test imitates the slashing violations consumer on the victim node reporting misbehavior's for each slashing violation.
// The test generates enough slashing violations to trigger the connection to each of the spamming nodes to be eventually pruned.
// The test ensures that the victim node is disconnected from all spammer nodes.
// The test ensures that despite attempting on connections, no inbound or outbound connections between the victim and
// the pruned spammer nodes are established.
func TestHandleReportedMisbehavior_And_SlashingViolationsConsumer_Integration(t *testing.T) {
cfg := managerCfgFixture(t)
kc1116 marked this conversation as resolved.
Show resolved Hide resolved

// this test is assessing the integration of the ALSP manager with the network. As the ALSP manager is an attribute
// of the network, we need to configure the ALSP manager via the network configuration, and let the network create
// the ALSP manager.
var victimSpamRecordCache alsp.SpamRecordCache
cfg.Opts = []alspmgr.MisbehaviorReportManagerOption{
alspmgr.WithSpamRecordsCacheFactory(func(logger zerolog.Logger, size uint32, metrics module.HeroCacheMetrics) alsp.SpamRecordCache {
victimSpamRecordCache = internal.NewSpamRecordCache(size, logger, metrics, model.SpamRecordFactory())
return victimSpamRecordCache
}),
}
kc1116 marked this conversation as resolved.
Show resolved Hide resolved

slashingMisbehaviors := []network.Misbehavior{
alsp.InvalidMessage, alsp.SenderEjected, alsp.UnauthorizedUnicastOnChannel,
alsp.UnauthorizedPublishOnChannel, alsp.UnknownMsgType,
}

// create 1 victim node, 1 honest node and a node for each slashing violation
ids, nodes, _ := testutils.LibP2PNodeForMiddlewareFixture(t, len(slashingMisbehaviors)+2,
p2ptest.WithPeerManagerEnabled(p2ptest.PeerManagerConfigFixture(), nil))
kc1116 marked this conversation as resolved.
Show resolved Hide resolved
mws, _ := testutils.MiddlewareFixtures(t, ids, nodes, testutils.MiddlewareConfigFixture(t), mocknetwork.NewViolationsConsumer(t))
networkCfg := testutils.NetworkConfigFixture(t, *ids[0], ids, mws[0], p2p.WithAlspConfig(cfg))
kc1116 marked this conversation as resolved.
Show resolved Hide resolved
victimNetwork, err := p2p.NewNetwork(networkCfg)
require.NoError(t, err)

// create slashing violations consumer with victim node network providing the network.MisbehaviorReportConsumer interface
violationsConsumer := slashing.NewSlashingViolationsConsumer(unittest.Logger(), metrics.NewNoopCollector(), victimNetwork)
mws[0].SetSlashingViolationsConsumer(violationsConsumer)

ctx, cancel := context.WithCancel(context.Background())
signalerCtx := irrecoverable.NewMockSignalerContext(t, ctx)
testutils.StartNodesAndNetworks(signalerCtx, t, nodes, []network.Network{victimNetwork}, 100*time.Millisecond)
defer testutils.StopComponents[p2p.LibP2PNode](t, nodes, 100*time.Millisecond)
defer cancel()

p2ptest.LetNodesDiscoverEachOther(t, ctx, nodes, ids)
// initially victim and misbehaving nodes should be able to connect to each other.
p2ptest.TryConnectionAndEnsureConnected(t, ctx, nodes)

// each slashing violation func is mapped to a violation with the identity of one of the misbehaving nodes
// index of the victim node in the nodes slice.
victimIndex := 0
honestNodeIndex := 1
invalidMessageIndex := 2
senderEjectedIndex := 3
unauthorizedUnicastOnChannelIndex := 4
unauthorizedPublishOnChannelIndex := 5
unknownMsgTypeIndex := 6
slashingViolationTestCases := []struct {
violationsConsumerFunc func(violation *network.Violation)
violation *network.Violation
}{
{violationsConsumer.OnUnAuthorizedSenderError, &network.Violation{Identity: ids[invalidMessageIndex]}},
{violationsConsumer.OnSenderEjectedError, &network.Violation{Identity: ids[senderEjectedIndex]}},
{violationsConsumer.OnUnauthorizedUnicastOnChannel, &network.Violation{Identity: ids[unauthorizedUnicastOnChannelIndex]}},
{violationsConsumer.OnUnauthorizedPublishOnChannel, &network.Violation{Identity: ids[unauthorizedPublishOnChannelIndex]}},
{violationsConsumer.OnUnknownMsgTypeError, &network.Violation{Identity: ids[unknownMsgTypeIndex]}},
}

violationsWg := sync.WaitGroup{}
violationCount := 120
for _, testCase := range slashingViolationTestCases {
for i := 0; i < violationCount; i++ {
testCase := testCase
violationsWg.Add(1)
go func() {
defer violationsWg.Done()
testCase.violationsConsumerFunc(testCase.violation)
}()
}
}
unittest.RequireReturnsBefore(t, violationsWg.Wait, 100*time.Millisecond, "slashing violations not reported in time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do the calls to report each violation need to be done async? This could introduce flakiness due to timeouts, while reporting the callbacks serially most likely wouldn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done async so that we can ensure all async components under the hood are working as expected (queue, locks, workers etc). This will not cause flakiness as there are no network operations happening, it's just building up a queue of violations that will eventually be processed.


forEachMisbehavingNode := func(f func(i int)) {
for misbehavingNodeIndex := 2; misbehavingNodeIndex <= len(nodes)-1; misbehavingNodeIndex++ {
f(misbehavingNodeIndex)
}
}

// ensures all misbehaving nodes are disconnected from the victim node
forEachMisbehavingNode(func(misbehavingNodeIndex int) {
p2ptest.RequireEventuallyNotConnected(t, []p2p.LibP2PNode{nodes[victimIndex]}, []p2p.LibP2PNode{nodes[misbehavingNodeIndex]}, 100*time.Millisecond, 2*time.Second)
})

// despite being disconnected from the victim node, misbehaving nodes and the honest node are still connected.
forEachMisbehavingNode(func(misbehavingNodeIndex int) {
p2ptest.RequireConnectedEventually(t, []p2p.LibP2PNode{nodes[honestNodeIndex], nodes[misbehavingNodeIndex]}, 1*time.Millisecond, 100*time.Millisecond)
})

// despite disconnecting misbehaving nodes, ensure that (victim and honest) are still connected.
p2ptest.RequireConnectedEventually(t, []p2p.LibP2PNode{nodes[honestNodeIndex], nodes[victimIndex]}, 1*time.Millisecond, 100*time.Millisecond)

// while misbehaving nodes are disconnected, they cannot connect to the victim node. Also, the victim node cannot directly dial and connect to the misbehaving nodes until each node's peer score decays.
forEachMisbehavingNode(func(misbehavingNodeIndex int) {
p2ptest.EnsureNotConnectedBetweenGroups(t, ctx, []p2p.LibP2PNode{nodes[victimIndex]}, []p2p.LibP2PNode{nodes[misbehavingNodeIndex]})
})
}

// TestMisbehaviorReportMetrics tests the recording of misbehavior report metrics.
// It checks that when a misbehavior report is received by the ALSP manager, the metrics are recorded.
// It fails the test if the metrics are not recorded or if they are recorded incorrectly.
Expand Down
1 change: 0 additions & 1 deletion network/slashing/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (c *Consumer) reportMisbehavior(misbehavior network.Misbehavior, violation
Err(err).
Str("peerID", violation.PeerID).
Msg("failed to create misbehavior report")

}
c.misbehaviorReportConsumer.ReportMisbehaviorOnChannel(violation.Channel, report)
}
Expand Down