Skip to content

Commit

Permalink
Refactored Lightclient prefix system (erigontech#6092)
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 authored and michaelneuder committed Nov 20, 2022
1 parent d861b1b commit ebce8ab
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 608 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,109 +23,8 @@ import (
ssz "github.com/ferranbt/fastssz"
"github.com/golang/snappy"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/communication"
"github.com/libp2p/go-libp2p-core/network"
)

type StreamCodec struct {
s network.Stream
sr *snappy.Reader
}

func NewStreamCodec(
s network.Stream,
) communication.StreamCodec {
return &StreamCodec{
s: s,
sr: snappy.NewReader(s),
}
}

func (d *StreamCodec) Close() error {
if err := d.s.Close(); err != nil {
return err
}
return nil
}

func (d *StreamCodec) CloseWriter() error {
if err := d.s.CloseWrite(); err != nil {
return err
}
return nil
}

func (d *StreamCodec) CloseReader() error {
if err := d.s.CloseRead(); err != nil {
return err
}
return nil
}

// write packet to stream. will add correct header + compression
// will error if packet does not implement ssz.Marshaler interface
func (d *StreamCodec) WritePacket(pkt communication.Packet, prefix ...byte) (err error) {
val, ok := pkt.(ssz.Marshaler)
if !ok {
return nil
}
if len(prefix) > 0 {
return EncodeAndWrite(d.s, val, prefix[0])
}
return EncodeAndWrite(d.s, val)
}

// write raw bytes to stream
func (d *StreamCodec) Write(payload []byte) (n int, err error) {
return d.s.Write(payload)
}

// read raw bytes to stream
func (d *StreamCodec) Read(b []byte) (n int, err error) {
return d.s.Read(b)
}

// read raw bytes to stream
func (d *StreamCodec) ReadByte() (b byte, err error) {
o := [1]byte{}
_, err = io.ReadFull(d.s, o[:])
if err != nil {
return
}
return o[0], nil
}

// decode into packet p, then return the packet context
func (d *StreamCodec) Decode(p communication.Packet) (ctx *communication.StreamContext, err error) {
ctx, err = d.readPacket(p)
return
}

func (d *StreamCodec) readPacket(p communication.Packet) (ctx *communication.StreamContext, err error) {
c := &communication.StreamContext{
Packet: p,
Stream: d.s,
Codec: d,
Protocol: d.s.Protocol(),
}
if val, ok := p.(ssz.Unmarshaler); ok {
ln, _, err := readUvarint(d.s)
if err != nil {
return c, err
}
c.Raw = make([]byte, ln)
_, err = io.ReadFull(d.sr, c.Raw)
if err != nil {
return c, fmt.Errorf("readPacket: %w", err)
}
err = val.UnmarshalSSZ(c.Raw)
if err != nil {
return c, fmt.Errorf("readPacket: %w", err)
}
}
return c, nil
}

func EncodeAndWrite(w io.Writer, val ssz.Marshaler, prefix ...byte) error {
// create prefix for length of packet
lengthBuf := make([]byte, 10)
Expand Down Expand Up @@ -225,7 +124,7 @@ func DecodeListSSZBeaconBlock(data []byte, count uint64, list []cltypes.ObjectSS
for bytesRead < int(encodedLn) {
n, err := sr.Read(raw[bytesRead:])
if err != nil {
return fmt.Errorf("Read error: %w", err)
return fmt.Errorf("read error: %w", err)
}
bytesRead += n
}
Expand Down
311 changes: 0 additions & 311 deletions cmd/sentinel/sentinel/communication/ssz_snappy/stream_test.go

This file was deleted.

47 changes: 0 additions & 47 deletions cmd/sentinel/sentinel/handlers/curry.go

This file was deleted.

11 changes: 5 additions & 6 deletions cmd/sentinel/sentinel/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/communication/ssz_snappy"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/peers"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -61,11 +60,11 @@ func NewConsensusHandlers(ctx context.Context, db kv.RoDB, host host.Host,
ctx: ctx,
}
c.handlers = map[protocol.ID]network.StreamHandler{
protocol.ID(PingProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, c.pingHandler),
protocol.ID(GoodbyeProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, c.goodbyeHandler),
protocol.ID(StatusProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, c.statusHandler),
protocol.ID(MetadataProtocolV1): curryStreamHandler(ssz_snappy.NewStreamCodec, c.metadataV1Handler),
protocol.ID(MetadataProtocolV2): curryStreamHandler(ssz_snappy.NewStreamCodec, c.metadataV2Handler),
protocol.ID(PingProtocolV1): c.pingHandler,
protocol.ID(GoodbyeProtocolV1): c.goodbyeHandler,
protocol.ID(StatusProtocolV1): c.statusHandler,
protocol.ID(MetadataProtocolV1): c.metadataV1Handler,
protocol.ID(MetadataProtocolV2): c.metadataV2Handler,
protocol.ID(BeaconBlocksByRangeProtocolV1): c.blocksByRangeHandler,
protocol.ID(BeaconBlocksByRootProtocolV1): c.beaconBlocksByRootHandler,
protocol.ID(LightClientFinalityUpdateV1): c.lightClientFinalityUpdateHandler,
Expand Down
33 changes: 17 additions & 16 deletions cmd/sentinel/sentinel/handlers/heartbeats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,41 +15,42 @@ package handlers

import (
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/communication"
"github.com/ledgerwatch/erigon/cmd/sentinel/sentinel/communication/ssz_snappy"
"github.com/libp2p/go-libp2p/core/network"
)

// Type safe handlers which all have access to the original stream & decompressed data.
// Since packets are just structs, they can be resent with no issue

func (c *ConsensusHandlers) pingHandler(ctx *communication.StreamContext, _ *communication.EmptyPacket) error {
return ctx.Codec.WritePacket(&cltypes.Ping{
func (c *ConsensusHandlers) pingHandler(s network.Stream) {
ssz_snappy.EncodeAndWrite(s, &cltypes.Ping{
Id: c.metadata.SeqNumber,
}, SuccessfulResponsePrefix)
}

func (c *ConsensusHandlers) goodbyeHandler(ctx *communication.StreamContext, _ *communication.EmptyPacket) error {
// From the spec, these are the valid goodbye numbers. Start with just
// sending 1, but we should think about when the others need to be sent.
// 1: Client shut down.
// 2: Irrelevant network.
// 3: Fault/error.
return ctx.Codec.WritePacket(&cltypes.Ping{
func (c *ConsensusHandlers) goodbyeHandler(s network.Stream) {
ssz_snappy.EncodeAndWrite(s, &cltypes.Ping{
Id: 1,
}, SuccessfulResponsePrefix)
}

func (c *ConsensusHandlers) metadataV1Handler(ctx *communication.StreamContext, _ *communication.EmptyPacket) error {
return ctx.Codec.WritePacket(&cltypes.MetadataV1{
func (c *ConsensusHandlers) metadataV1Handler(s network.Stream) {
ssz_snappy.EncodeAndWrite(s, &cltypes.MetadataV1{
SeqNumber: c.metadata.SeqNumber,
Attnets: c.metadata.Attnets,
}, SuccessfulResponsePrefix)
}

func (c *ConsensusHandlers) metadataV2Handler(ctx *communication.StreamContext, _ *communication.EmptyPacket) error {
return ctx.Codec.WritePacket(c.metadata, SuccessfulResponsePrefix)
func (c *ConsensusHandlers) metadataV2Handler(s network.Stream) {
ssz_snappy.EncodeAndWrite(s, c.metadata, SuccessfulResponsePrefix)
}

// TODO: Actually respond with proper status
func (c *ConsensusHandlers) statusHandler(ctx *communication.StreamContext, dat *cltypes.Status) error {
return ctx.Codec.WritePacket(dat, SuccessfulResponsePrefix)
func (c *ConsensusHandlers) statusHandler(s network.Stream) {
status := &cltypes.Status{}
if err := ssz_snappy.DecodeAndReadNoForkDigest(s, status); err != nil {
s.Close()
return
}
ssz_snappy.EncodeAndWrite(s, status, SuccessfulResponsePrefix)
}
124 changes: 0 additions & 124 deletions cmd/sentinel/sentinel/handlers/heartbeats_test.go

This file was deleted.

4 changes: 2 additions & 2 deletions cmd/sentinel/sentinel/handlers/lightclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (c *ConsensusHandlers) lightClientFinalityUpdateHandler(stream network.Stre
return
}

ssz_snappy.EncodeAndWrite(stream, update)
ssz_snappy.EncodeAndWrite(stream, update, SuccessfulResponsePrefix)
}

func (c *ConsensusHandlers) lightClientOptimisticUpdateHandler(stream network.Stream) {
Expand Down Expand Up @@ -79,7 +79,7 @@ func (c *ConsensusHandlers) lightClientOptimisticUpdateHandler(stream network.St
return
}

ssz_snappy.EncodeAndWrite(stream, update)
ssz_snappy.EncodeAndWrite(stream, update, SuccessfulResponsePrefix)
}

func (c *ConsensusHandlers) lightClientUpdatesByRange(stream network.Stream) {
Expand Down

0 comments on commit ebce8ab

Please sign in to comment.