diff --git a/api/grpc/relay/relay.pb.go b/api/grpc/relay/relay.pb.go index d01287a918..d35cf5ff88 100644 --- a/api/grpc/relay/relay.pb.go +++ b/api/grpc/relay/relay.pb.go @@ -126,12 +126,31 @@ type GetChunksRequest struct { // The chunk requests. Chunks are returned in the same order as they are requested. ChunkRequests []*ChunkRequest `protobuf:"bytes,1,rep,name=chunk_requests,json=chunkRequests,proto3" json:"chunk_requests,omitempty"` - // If this is an authenticated request, this should hold the ID of the requester. If this - // is an unauthenticated request, this field should be empty. - RequesterId uint64 `protobuf:"varint,2,opt,name=requester_id,json=requesterId,proto3" json:"requester_id,omitempty"` - // If this is an authenticated request, this field will hold a signature by the requester - // on the chunks being requested. - RequesterSignature []byte `protobuf:"bytes,3,opt,name=requester_signature,json=requesterSignature,proto3" json:"requester_signature,omitempty"` + // If this is an authenticated request, this should hold the ID of the operator. If this + // is an unauthenticated request, this field should be empty. Relays may choose to reject + // unauthenticated requests. + OperatorId []byte `protobuf:"bytes,2,opt,name=operator_id,json=operatorId,proto3" json:"operator_id,omitempty"` + // If this is an authenticated request, this field will hold a BLS signature by the requester + // on the hash of this request. Relays may choose to reject unauthenticated requests. + // + // The following describes the schema for computing the hash of this request + // This algorithm is implemented in golang using relay.auth.HashGetChunksRequest(). + // + // All integers are encoded as unsigned 4 byte big endian values. + // + // Perform a keccak256 hash on the following data in the following order: + // 1. the operator id + // 2. for each chunk request: + // a. if the chunk request is a request by index: + // i. a one byte ASCII representation of the character "i" (aka Ox69) + // ii. the blob key + // iii. the start index + // iv. the end index + // b. if the chunk request is a request by range: + // i. a one byte ASCII representation of the character "r" (aka Ox72) + // ii. the blob key + // iii. each requested chunk index, in order + OperatorSignature []byte `protobuf:"bytes,3,opt,name=operator_signature,json=operatorSignature,proto3" json:"operator_signature,omitempty"` } func (x *GetChunksRequest) Reset() { @@ -173,16 +192,16 @@ func (x *GetChunksRequest) GetChunkRequests() []*ChunkRequest { return nil } -func (x *GetChunksRequest) GetRequesterId() uint64 { +func (x *GetChunksRequest) GetOperatorId() []byte { if x != nil { - return x.RequesterId + return x.OperatorId } - return 0 + return nil } -func (x *GetChunksRequest) GetRequesterSignature() []byte { +func (x *GetChunksRequest) GetOperatorSignature() []byte { if x != nil { - return x.RequesterSignature + return x.OperatorSignature } return nil } @@ -456,52 +475,52 @@ var file_relay_relay_proto_rawDesc = []byte{ 0x6c, 0x6f, 0x62, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x62, 0x4b, 0x65, 0x79, 0x22, 0x22, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6c, 0x6f, 0x62, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6c, 0x6f, 0x62, 0x22, 0xa1, 0x01, 0x0a, 0x10, 0x47, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6c, 0x6f, 0x62, 0x22, 0x9d, 0x01, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x39, 0x0a, 0x0e, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x0d, 0x63, 0x68, 0x75, - 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x72, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, - 0x52, 0x0b, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x2f, 0x0a, - 0x13, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x74, 0x75, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x12, 0x72, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x65, 0x72, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x22, 0x55, - 0x0a, 0x13, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x42, 0x79, - 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x6c, 0x6f, 0x62, 0x5f, 0x6b, 0x65, - 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x62, 0x4b, 0x65, 0x79, - 0x12, 0x23, 0x0a, 0x0d, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x5f, 0x69, 0x6e, 0x64, 0x69, 0x63, 0x65, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x0c, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x49, 0x6e, - 0x64, 0x69, 0x63, 0x65, 0x73, 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x42, 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x08, - 0x62, 0x6c, 0x6f, 0x62, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, - 0x62, 0x6c, 0x6f, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x72, 0x74, - 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x74, - 0x61, 0x72, 0x74, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x6e, 0x64, 0x5f, - 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x65, 0x6e, 0x64, - 0x49, 0x6e, 0x64, 0x65, 0x78, 0x22, 0x89, 0x01, 0x0a, 0x0c, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x08, 0x62, 0x79, 0x5f, 0x69, 0x6e, 0x64, - 0x65, 0x78, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, - 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x42, 0x79, 0x49, 0x6e, - 0x64, 0x65, 0x78, 0x48, 0x00, 0x52, 0x07, 0x62, 0x79, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x36, - 0x0a, 0x08, 0x62, 0x79, 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x19, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x42, 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x07, 0x62, - 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x22, 0x24, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x52, 0x65, - 0x70, 0x6c, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x03, 0x28, - 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x7b, 0x0a, 0x05, 0x52, 0x65, 0x6c, 0x61, 0x79, - 0x12, 0x35, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x14, 0x2e, 0x6e, 0x6f, - 0x64, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x12, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, - 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x43, 0x68, - 0x75, 0x6e, 0x6b, 0x73, 0x12, 0x16, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x43, - 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x6e, - 0x6f, 0x64, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x52, 0x65, 0x70, - 0x6c, 0x79, 0x22, 0x00, 0x42, 0x2d, 0x5a, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x4c, 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, - 0x65, 0x6e, 0x64, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x65, - 0x6c, 0x61, 0x79, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x6f, 0x70, + 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x0a, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x49, 0x64, 0x12, 0x2d, 0x0a, 0x12, 0x6f, + 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x5f, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x11, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, + 0x72, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x22, 0x55, 0x0a, 0x13, 0x43, 0x68, + 0x75, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x42, 0x79, 0x49, 0x6e, 0x64, 0x65, + 0x78, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x6c, 0x6f, 0x62, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x23, 0x0a, 0x0d, + 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x5f, 0x69, 0x6e, 0x64, 0x69, 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x0d, 0x52, 0x0c, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x49, 0x6e, 0x64, 0x69, 0x63, 0x65, + 0x73, 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x42, 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x62, 0x6c, 0x6f, 0x62, + 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x62, 0x6c, 0x6f, 0x62, + 0x4b, 0x65, 0x79, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x69, 0x6e, 0x64, + 0x65, 0x78, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x49, + 0x6e, 0x64, 0x65, 0x78, 0x12, 0x1b, 0x0a, 0x09, 0x65, 0x6e, 0x64, 0x5f, 0x69, 0x6e, 0x64, 0x65, + 0x78, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x49, 0x6e, 0x64, 0x65, + 0x78, 0x22, 0x89, 0x01, 0x0a, 0x0c, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x36, 0x0a, 0x08, 0x62, 0x79, 0x5f, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x43, 0x68, 0x75, 0x6e, + 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x42, 0x79, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x48, + 0x00, 0x52, 0x07, 0x62, 0x79, 0x49, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x36, 0x0a, 0x08, 0x62, 0x79, + 0x5f, 0x72, 0x61, 0x6e, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6e, + 0x6f, 0x64, 0x65, 0x2e, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x42, 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x48, 0x00, 0x52, 0x07, 0x62, 0x79, 0x52, 0x61, 0x6e, + 0x67, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x24, 0x0a, + 0x0e, 0x47, 0x65, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, + 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, 0x52, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x32, 0x7b, 0x0a, 0x05, 0x52, 0x65, 0x6c, 0x61, 0x79, 0x12, 0x35, 0x0a, 0x07, + 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x14, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x47, + 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, + 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x00, 0x12, 0x3b, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, + 0x12, 0x16, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, 0x47, 0x65, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x6e, 0x6f, 0x64, 0x65, 0x2e, + 0x47, 0x65, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, + 0x42, 0x2d, 0x5a, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4c, + 0x61, 0x79, 0x72, 0x2d, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x65, 0x69, 0x67, 0x65, 0x6e, 0x64, 0x61, + 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/api/proto/relay/relay.proto b/api/proto/relay/relay.proto index ed52c0a0a5..4ec157616c 100644 --- a/api/proto/relay/relay.proto +++ b/api/proto/relay/relay.proto @@ -32,13 +32,32 @@ message GetChunksRequest { // The chunk requests. Chunks are returned in the same order as they are requested. repeated ChunkRequest chunk_requests = 1; - // If this is an authenticated request, this should hold the ID of the requester. If this - // is an unauthenticated request, this field should be empty. - uint64 requester_id = 2; + // If this is an authenticated request, this should hold the ID of the operator. If this + // is an unauthenticated request, this field should be empty. Relays may choose to reject + // unauthenticated requests. + bytes operator_id = 2; - // If this is an authenticated request, this field will hold a signature by the requester - // on the chunks being requested. - bytes requester_signature = 3; + // If this is an authenticated request, this field will hold a BLS signature by the requester + // on the hash of this request. Relays may choose to reject unauthenticated requests. + // + // The following describes the schema for computing the hash of this request + // This algorithm is implemented in golang using relay.auth.HashGetChunksRequest(). + // + // All integers are encoded as unsigned 4 byte big endian values. + // + // Perform a keccak256 hash on the following data in the following order: + // 1. the operator id + // 2. for each chunk request: + // a. if the chunk request is a request by index: + // i. a one byte ASCII representation of the character "i" (aka Ox69) + // ii. the blob key + // iii. the start index + // iv. the end index + // b. if the chunk request is a request by range: + // i. a one byte ASCII representation of the character "r" (aka Ox72) + // ii. the blob key + // iii. each requested chunk index, in order + bytes operator_signature = 3; } // A request for chunks within a specific blob. Each chunk is requested individually by its index. diff --git a/relay/auth/authenticator.go b/relay/auth/authenticator.go new file mode 100644 index 0000000000..a85c1f4865 --- /dev/null +++ b/relay/auth/authenticator.go @@ -0,0 +1,205 @@ +package auth + +import ( + "context" + "errors" + "fmt" + pb "github.com/Layr-Labs/eigenda/api/grpc/relay" + "github.com/Layr-Labs/eigenda/core" + lru "github.com/hashicorp/golang-lru/v2" + "sync" + "time" +) + +// RequestAuthenticator authenticates requests to the relay service. This object is thread safe. +type RequestAuthenticator interface { + // AuthenticateGetChunksRequest authenticates a GetChunksRequest, returning an error if the request is invalid. + // The origin is the address of the peer that sent the request. This may be used to cache auth results + // in order to save server resources. + AuthenticateGetChunksRequest( + origin string, + request *pb.GetChunksRequest, + now time.Time) error +} + +// authenticationTimeout is used to track the expiration of an auth. +type authenticationTimeout struct { + origin string + expiration time.Time +} + +var _ RequestAuthenticator = &requestAuthenticator{} + +type requestAuthenticator struct { + ics core.IndexedChainState + + // authenticatedClients is a set of client IDs that have been recently authenticated. + authenticatedClients map[string]struct{} + + // authenticationTimeouts is a list of authentications that have been performed, along with their expiration times. + authenticationTimeouts []*authenticationTimeout + + // authenticationTimeoutDuration is the duration for which an auth is valid. + // If this is zero, then auth saving is disabled, and each request will be authenticated independently. + authenticationTimeoutDuration time.Duration + + // savedAuthLock is used for thread safe atomic modification of the authenticatedClients map and the + // authenticationTimeouts queue. + savedAuthLock sync.Mutex + + // keyCache is used to cache the public keys of operators. Operator keys are assumed to never change. + keyCache *lru.Cache[core.OperatorID, *core.G2Point] +} + +// NewRequestAuthenticator creates a new RequestAuthenticator. +func NewRequestAuthenticator( + ics core.IndexedChainState, + keyCacheSize int, + authenticationTimeoutDuration time.Duration) (RequestAuthenticator, error) { + + keyCache, err := lru.New[core.OperatorID, *core.G2Point](keyCacheSize) + if err != nil { + return nil, fmt.Errorf("failed to create key cache: %w", err) + } + + authenticator := &requestAuthenticator{ + ics: ics, + authenticatedClients: make(map[string]struct{}), + authenticationTimeouts: make([]*authenticationTimeout, 0), + authenticationTimeoutDuration: authenticationTimeoutDuration, + keyCache: keyCache, + } + + err = authenticator.preloadCache() + if err != nil { + return nil, fmt.Errorf("failed to preload cache: %w", err) + } + + return authenticator, nil +} + +func (a *requestAuthenticator) preloadCache() error { + blockNumber, err := a.ics.GetCurrentBlockNumber() + if err != nil { + return fmt.Errorf("failed to get current block number: %w", err) + } + operators, err := a.ics.GetIndexedOperators(context.Background(), blockNumber) + if err != nil { + return fmt.Errorf("failed to get operators: %w", err) + } + + for operatorID, operator := range operators { + a.keyCache.Add(operatorID, operator.PubkeyG2) + } + + return nil +} + +func (a *requestAuthenticator) AuthenticateGetChunksRequest( + origin string, + request *pb.GetChunksRequest, + now time.Time) error { + + if a.isAuthenticationStillValid(now, origin) { + // We've recently authenticated this client. Do not authenticate again for a while. + return nil + } + + key, err := a.getOperatorKey(core.OperatorID(request.OperatorId)) + if err != nil { + return fmt.Errorf("failed to get operator key: %w", err) + } + + g1Point, err := (&core.G1Point{}).Deserialize(request.OperatorSignature) + if err != nil { + return fmt.Errorf("failed to deserialize signature: %w", err) + } + + signature := core.Signature{ + G1Point: g1Point, + } + + hash := HashGetChunksRequest(request) + isValid := signature.Verify(key, ([32]byte)(hash)) + + if !isValid { + return errors.New("signature verification failed") + } + + a.saveAuthenticationResult(now, origin) + return nil +} + +// getOperatorKey returns the public key of the operator with the given ID, caching the result. +func (a *requestAuthenticator) getOperatorKey(operatorID core.OperatorID) (*core.G2Point, error) { + key, ok := a.keyCache.Get(operatorID) + if ok { + return key, nil + } + + blockNumber, err := a.ics.GetCurrentBlockNumber() + if err != nil { + return nil, fmt.Errorf("failed to get current block number: %w", err) + } + operators, err := a.ics.GetIndexedOperators(context.Background(), blockNumber) + if err != nil { + return nil, fmt.Errorf("failed to get operators: %w", err) + } + + operator, ok := operators[operatorID] + if !ok { + return nil, errors.New("operator not found") + } + key = operator.PubkeyG2 + + a.keyCache.Add(operatorID, key) + return key, nil +} + +// saveAuthenticationResult saves the result of an auth. +func (a *requestAuthenticator) saveAuthenticationResult(now time.Time, origin string) { + if a.authenticationTimeoutDuration == 0 { + // Authentication saving is disabled. + return + } + + a.savedAuthLock.Lock() + defer a.savedAuthLock.Unlock() + + a.authenticatedClients[origin] = struct{}{} + a.authenticationTimeouts = append(a.authenticationTimeouts, + &authenticationTimeout{ + origin: origin, + expiration: now.Add(a.authenticationTimeoutDuration), + }) +} + +// isAuthenticationStillValid returns true if the client at the given address has been authenticated recently. +func (a *requestAuthenticator) isAuthenticationStillValid(now time.Time, address string) bool { + if a.authenticationTimeoutDuration == 0 { + // Authentication saving is disabled. + return false + } + + a.savedAuthLock.Lock() + defer a.savedAuthLock.Unlock() + + a.removeOldAuthentications(now) + _, ok := a.authenticatedClients[address] + return ok +} + +// removeOldAuthentications removes any authentications that have expired. +// This method is not thread safe and should be called with the savedAuthLock held. +func (a *requestAuthenticator) removeOldAuthentications(now time.Time) { + index := 0 + for ; index < len(a.authenticationTimeouts); index++ { + if a.authenticationTimeouts[index].expiration.After(now) { + break + } + delete(a.authenticatedClients, a.authenticationTimeouts[index].origin) + } + if index > 0 { + a.authenticationTimeouts = a.authenticationTimeouts[index:] + } +} diff --git a/relay/auth/authenticator_test.go b/relay/auth/authenticator_test.go new file mode 100644 index 0000000000..debcbccc61 --- /dev/null +++ b/relay/auth/authenticator_test.go @@ -0,0 +1,224 @@ +package auth + +import ( + "context" + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/mock" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +// TestMockSigning is a meta-test to verify that +// the test framework's BLS keys are functioning correctly. +func TestMockSigning(t *testing.T) { + tu.InitializeRandom() + + operatorID := mock.MakeOperatorId(0) + stakes := map[core.QuorumID]map[core.OperatorID]int{ + core.QuorumID(0): { + operatorID: 1, + }, + } + ics, err := mock.NewChainDataMock(stakes) + require.NoError(t, err) + + operators, err := ics.GetIndexedOperators(context.Background(), 0) + require.NoError(t, err) + + operator, ok := operators[operatorID] + require.True(t, ok) + + bytesToSign := tu.RandomBytes(32) + signature := ics.KeyPairs[operatorID].SignMessage([32]byte(bytesToSign)) + + isValid := signature.Verify(operator.PubkeyG2, [32]byte(bytesToSign)) + require.True(t, isValid) + + // Changing a byte in the message should invalidate the signature + bytesToSign[0] = bytesToSign[0] ^ 1 + + isValid = signature.Verify(operator.PubkeyG2, [32]byte(bytesToSign)) + require.False(t, isValid) +} + +func TestValidRequest(t *testing.T) { + tu.InitializeRandom() + + operatorID := mock.MakeOperatorId(0) + stakes := map[core.QuorumID]map[core.OperatorID]int{ + core.QuorumID(0): { + operatorID: 1, + }, + } + ics, err := mock.NewChainDataMock(stakes) + require.NoError(t, err) + ics.Mock.On("GetCurrentBlockNumber").Return(uint(0), nil) + + timeout := 10 * time.Second + + authenticator, err := NewRequestAuthenticator(ics, 1024, timeout) + require.NoError(t, err) + + request := randomGetChunksRequest() + request.OperatorId = operatorID[:] + signature := SignGetChunksRequest(ics.KeyPairs[operatorID], request) + request.OperatorSignature = signature + + now := time.Now() + + err = authenticator.AuthenticateGetChunksRequest( + "foobar", + request, + now) + require.NoError(t, err) + + // Making additional requests before timeout elapses should not trigger authentication for the address "foobar". + // To probe at this, intentionally make a request that would be considered invalid if it were authenticated. + invalidRequest := randomGetChunksRequest() + invalidRequest.OperatorId = operatorID[:] + invalidRequest.OperatorSignature = signature // the previous signature is invalid here + + start := now + for now.Before(start.Add(timeout)) { + err = authenticator.AuthenticateGetChunksRequest( + "foobar", + invalidRequest, + now) + require.NoError(t, err) + + err = authenticator.AuthenticateGetChunksRequest( + "baz", + invalidRequest, + now) + require.Error(t, err) + + now = now.Add(time.Second) + } + + // After the timeout elapses, new requests should trigger authentication. + err = authenticator.AuthenticateGetChunksRequest( + "foobar", + invalidRequest, + now) + require.Error(t, err) +} + +func TestAuthenticationSavingDisabled(t *testing.T) { + tu.InitializeRandom() + + operatorID := mock.MakeOperatorId(0) + stakes := map[core.QuorumID]map[core.OperatorID]int{ + core.QuorumID(0): { + operatorID: 1, + }, + } + ics, err := mock.NewChainDataMock(stakes) + require.NoError(t, err) + ics.Mock.On("GetCurrentBlockNumber").Return(uint(0), nil) + + // This disables saving of authentication results. + timeout := time.Duration(0) + + authenticator, err := NewRequestAuthenticator(ics, 1024, timeout) + require.NoError(t, err) + + request := randomGetChunksRequest() + request.OperatorId = operatorID[:] + signature := SignGetChunksRequest(ics.KeyPairs[operatorID], request) + request.OperatorSignature = signature + + now := time.Now() + + err = authenticator.AuthenticateGetChunksRequest( + "foobar", + request, + now) + require.NoError(t, err) + + // There is no authentication timeout, so a new request should trigger authentication. + // To probe at this, intentionally make a request that would be considered invalid if it were authenticated. + invalidRequest := randomGetChunksRequest() + invalidRequest.OperatorId = operatorID[:] + invalidRequest.OperatorSignature = signature // the previous signature is invalid here + + err = authenticator.AuthenticateGetChunksRequest( + "foobar", + invalidRequest, + now) + require.Error(t, err) +} + +func TestNonExistingClient(t *testing.T) { + tu.InitializeRandom() + + operatorID := mock.MakeOperatorId(0) + stakes := map[core.QuorumID]map[core.OperatorID]int{ + core.QuorumID(0): { + operatorID: 1, + }, + } + ics, err := mock.NewChainDataMock(stakes) + require.NoError(t, err) + ics.Mock.On("GetCurrentBlockNumber").Return(uint(0), nil) + + timeout := 10 * time.Second + + authenticator, err := NewRequestAuthenticator(ics, 1024, timeout) + require.NoError(t, err) + + invalidOperatorID := tu.RandomBytes(32) + + request := randomGetChunksRequest() + request.OperatorId = invalidOperatorID + + err = authenticator.AuthenticateGetChunksRequest( + "foobar", + request, + time.Now()) + require.Error(t, err) +} + +func TestBadSignature(t *testing.T) { + tu.InitializeRandom() + + operatorID := mock.MakeOperatorId(0) + stakes := map[core.QuorumID]map[core.OperatorID]int{ + core.QuorumID(0): { + operatorID: 1, + }, + } + ics, err := mock.NewChainDataMock(stakes) + require.NoError(t, err) + ics.Mock.On("GetCurrentBlockNumber").Return(uint(0), nil) + + timeout := 10 * time.Second + + authenticator, err := NewRequestAuthenticator(ics, 1024, timeout) + require.NoError(t, err) + + request := randomGetChunksRequest() + request.OperatorId = operatorID[:] + request.OperatorSignature = SignGetChunksRequest(ics.KeyPairs[operatorID], request) + + now := time.Now() + + err = authenticator.AuthenticateGetChunksRequest( + "foobar", + request, + now) + require.NoError(t, err) + + // move time forward to wipe out previous authentication + now = now.Add(timeout) + + // Change a byte in the signature to make it invalid + request.OperatorSignature[0] = request.OperatorSignature[0] ^ 1 + + err = authenticator.AuthenticateGetChunksRequest( + "foobar", + request, + now) + require.Error(t, err) +} diff --git a/relay/auth/request_signing.go b/relay/auth/request_signing.go new file mode 100644 index 0000000000..149992bce2 --- /dev/null +++ b/relay/auth/request_signing.go @@ -0,0 +1,58 @@ +package auth + +import ( + "encoding/binary" + pb "github.com/Layr-Labs/eigenda/api/grpc/relay" + "github.com/Layr-Labs/eigenda/core" + "golang.org/x/crypto/sha3" +) + +var ( + iByte = []byte{0x69} + rByte = []byte{0x72} +) + +// HashGetChunksRequest hashes the given GetChunksRequest. +func HashGetChunksRequest(request *pb.GetChunksRequest) []byte { + + // Protobuf serialization is non-deterministic, so we can't just hash the + // serialized bytes. Instead, we have to define our own hashing function. + + hasher := sha3.NewLegacyKeccak256() + + hasher.Write(request.GetOperatorId()) + for _, chunkRequest := range request.GetChunkRequests() { + if chunkRequest.GetByIndex() != nil { + getByIndex := chunkRequest.GetByIndex() + hasher.Write(iByte) + hasher.Write(getByIndex.BlobKey) + for _, index := range getByIndex.ChunkIndices { + indexBytes := make([]byte, 4) + binary.BigEndian.PutUint32(indexBytes, index) + hasher.Write(indexBytes) + } + } else { + getByRange := chunkRequest.GetByRange() + hasher.Write(rByte) + hasher.Write(getByRange.BlobKey) + + startBytes := make([]byte, 4) + binary.BigEndian.PutUint32(startBytes, getByRange.StartIndex) + hasher.Write(startBytes) + + endBytes := make([]byte, 4) + binary.BigEndian.PutUint32(endBytes, getByRange.EndIndex) + hasher.Write(endBytes) + } + } + + return hasher.Sum(nil) +} + +// SignGetChunksRequest signs the given GetChunksRequest with the given private key. Does not +// write the signature into the request. +func SignGetChunksRequest(keys *core.KeyPair, request *pb.GetChunksRequest) []byte { + hash := HashGetChunksRequest(request) + signature := keys.SignMessage(([32]byte)(hash)) + return signature.G1Point.Serialize() +} diff --git a/relay/auth/request_signing_test.go b/relay/auth/request_signing_test.go new file mode 100644 index 0000000000..3c05188514 --- /dev/null +++ b/relay/auth/request_signing_test.go @@ -0,0 +1,71 @@ +package auth + +import ( + pb "github.com/Layr-Labs/eigenda/api/grpc/relay" + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/stretchr/testify/require" + "golang.org/x/exp/rand" + "testing" +) + +func randomGetChunksRequest() *pb.GetChunksRequest { + requestedChunks := make([]*pb.ChunkRequest, 0) + requestCount := rand.Intn(10) + 1 + for i := 0; i < requestCount; i++ { + + if rand.Intn(2) == 0 { + indices := make([]uint32, rand.Intn(10)+1) + for j := 0; j < len(indices); j++ { + indices[j] = rand.Uint32() + } + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByIndex{ + ByIndex: &pb.ChunkRequestByIndex{ + BlobKey: tu.RandomBytes(32), + ChunkIndices: indices, + }, + }, + }) + } else { + requestedChunks = append(requestedChunks, &pb.ChunkRequest{ + Request: &pb.ChunkRequest_ByRange{ + ByRange: &pb.ChunkRequestByRange{ + BlobKey: tu.RandomBytes(32), + StartIndex: rand.Uint32(), + EndIndex: rand.Uint32(), + }, + }, + }) + } + } + return &pb.GetChunksRequest{ + OperatorId: tu.RandomBytes(32), + ChunkRequests: requestedChunks, + } +} + +func TestHashGetChunksRequest(t *testing.T) { + tu.InitializeRandom() + + requestA := randomGetChunksRequest() + requestB := randomGetChunksRequest() + + // Hashing the same request twice should yield the same hash + hashA := HashGetChunksRequest(requestA) + hashAA := HashGetChunksRequest(requestA) + require.Equal(t, hashA, hashAA) + + // Hashing different requests should yield different hashes + hashB := HashGetChunksRequest(requestB) + require.NotEqual(t, hashA, hashB) + + // Adding a signature should not affect the hash + requestA.OperatorSignature = tu.RandomBytes(32) + hashAA = HashGetChunksRequest(requestA) + require.Equal(t, hashA, hashAA) + + // Changing the requester ID should change the hash + requestA.OperatorId = tu.RandomBytes(32) + hashAA = HashGetChunksRequest(requestA) + require.NotEqual(t, hashA, hashAA) +} diff --git a/relay/cmd/config.go b/relay/cmd/config.go index bb7566f5a1..42078db96a 100644 --- a/relay/cmd/config.go +++ b/relay/cmd/config.go @@ -2,13 +2,14 @@ package main import ( "fmt" - "github.com/Layr-Labs/eigenda/relay/limiter" - "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/geth" + "github.com/Layr-Labs/eigenda/core/thegraph" core "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/relay" "github.com/Layr-Labs/eigenda/relay/cmd/flags" + "github.com/Layr-Labs/eigenda/relay/limiter" "github.com/urfave/cli" ) @@ -29,6 +30,12 @@ type Config struct { // RelayConfig is the configuration for the relay. RelayConfig relay.Config + + // Configuration for the graph indexer. + EthClientConfig geth.EthClientConfig + BLSOperatorStateRetrieverAddr string + EigenDAServiceManagerAddr string + ChainStateConfig thegraph.Config } func NewConfig(ctx *cli.Context) (Config, error) { @@ -73,7 +80,14 @@ func NewConfig(ctx *cli.Context) (Config, error) { GetChunkBytesBurstinessClient: ctx.Int(flags.GetChunkBytesBurstinessClientFlag.Name), MaxConcurrentGetChunkOpsClient: ctx.Int(flags.MaxConcurrentGetChunkOpsClientFlag.Name), }, + AuthenticationKeyCacheSize: ctx.Int(flags.AuthenticationKeyCacheSizeFlag.Name), + AuthenticationTimeout: ctx.Duration(flags.AuthenticationTimeoutFlag.Name), + AuthenticationDisabled: ctx.Bool(flags.AuthenticationDisabledFlag.Name), }, + EthClientConfig: geth.ReadEthClientConfig(ctx), + BLSOperatorStateRetrieverAddr: ctx.String(flags.BlsOperatorStateRetrieverAddrFlag.Name), + EigenDAServiceManagerAddr: ctx.String(flags.EigenDAServiceManagerAddrFlag.Name), + ChainStateConfig: thegraph.ReadCLIConfig(ctx), } for i, id := range relayIDs { config.RelayConfig.RelayIDs[i] = core.RelayKey(id) diff --git a/relay/cmd/flags/flags.go b/relay/cmd/flags/flags.go index 9abd673566..57fcc6cbd0 100644 --- a/relay/cmd/flags/flags.go +++ b/relay/cmd/flags/flags.go @@ -3,7 +3,9 @@ package flags import ( "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/aws" + "github.com/Layr-Labs/eigenda/common/geth" "github.com/urfave/cli" + "time" ) const ( @@ -189,6 +191,45 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_CONCURRENT_GET_CHUNK_OPS_CLIENT"), Value: 1, } + BlsOperatorStateRetrieverAddrFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "bls-operator-state-retriever-addr"), + Usage: "Address of the BLS operator state retriever", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "BLS_OPERATOR_STATE_RETRIEVER_ADDR"), + } + EigenDAServiceManagerAddrFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "eigen-da-service-manager-addr"), + Usage: "Address of the Eigen DA service manager", + Required: true, + EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGEN_DA_SERVICE_MANAGER_ADDR"), + } + IndexerPullIntervalFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "indexer-pull-interval"), + Usage: "Interval to pull from the indexer", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "INDEXER_PULL_INTERVAL"), + Value: 5 * time.Minute, + } + AuthenticationKeyCacheSizeFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "authentication-key-cache-size"), + Usage: "Max number of items in the authentication key cache", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "AUTHENTICATION_KEY_CACHE_SIZE"), + Value: 1024 * 1024, + } + AuthenticationTimeoutFlag = cli.DurationFlag{ + Name: common.PrefixFlag(FlagPrefix, "authentication-timeout"), + Usage: "Duration to keep authentication results", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "AUTHENTICATION_TIMEOUT"), + Value: 5 * time.Minute, + } + AuthenticationDisabledFlag = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "authentication-disabled"), + Usage: "Disable GetChunks() authentication", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "AUTHENTICATION_DISABLED"), + } ) var requiredFlags = []cli.Flag{ @@ -196,6 +237,8 @@ var requiredFlags = []cli.Flag{ BucketNameFlag, MetadataTableNameFlag, RelayIDsFlag, + BlsOperatorStateRetrieverAddrFlag, + EigenDAServiceManagerAddrFlag, } var optionalFlags = []cli.Flag{ @@ -221,6 +264,10 @@ var optionalFlags = []cli.Flag{ MaxGetChunkBytesPerSecondClientFlag, GetChunkBytesBurstinessClientFlag, MaxConcurrentGetChunkOpsClientFlag, + IndexerPullIntervalFlag, + AuthenticationKeyCacheSizeFlag, + AuthenticationTimeoutFlag, + AuthenticationDisabledFlag, } var Flags []cli.Flag @@ -229,4 +276,5 @@ func init() { Flags = append(requiredFlags, optionalFlags...) Flags = append(Flags, common.LoggerCLIFlags(envVarPrefix, FlagPrefix)...) Flags = append(Flags, aws.ClientFlags(envVarPrefix, FlagPrefix)...) + Flags = append(Flags, geth.EthClientFlags(envVarPrefix)...) } diff --git a/relay/cmd/main.go b/relay/cmd/main.go index 2730ccef21..eb39cdb03a 100644 --- a/relay/cmd/main.go +++ b/relay/cmd/main.go @@ -3,6 +3,12 @@ package main import ( "context" "fmt" + "github.com/Layr-Labs/eigenda/common/geth" + "github.com/Layr-Labs/eigenda/core" + coreeth "github.com/Layr-Labs/eigenda/core/eth" + "github.com/Layr-Labs/eigenda/core/thegraph" + "github.com/Layr-Labs/eigensdk-go/logging" + gethcommon "github.com/ethereum/go-ethereum/common" "log" "os" @@ -64,6 +70,10 @@ func RunRelay(ctx *cli.Context) error { metadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.MetadataTableName) blobStore := blobstore.NewBlobStore(config.BucketName, s3Client, logger) chunkReader := chunkstore.NewChunkReader(logger, s3Client, config.BucketName) + ics, err := buildICS(logger, &config) + if err != nil { + return fmt.Errorf("failed to build ics: %w", err) + } server, err := relay.NewServer( context.Background(), @@ -71,7 +81,8 @@ func RunRelay(ctx *cli.Context) error { &config.RelayConfig, metadataStore, blobStore, - chunkReader) + chunkReader, + ics) if err != nil { return fmt.Errorf("failed to create relay server: %w", err) } @@ -83,3 +94,21 @@ func RunRelay(ctx *cli.Context) error { return nil } + +func buildICS(logger logging.Logger, config *Config) (core.IndexedChainState, error) { + client, err := geth.NewMultiHomingClient(config.EthClientConfig, gethcommon.Address{}, logger) + if err != nil { + logger.Error("Cannot create chain.Client", "err", err) + return nil, err + } + + tx, err := coreeth.NewWriter(logger, client, config.BLSOperatorStateRetrieverAddr, config.EigenDAServiceManagerAddr) + if err != nil { + return nil, fmt.Errorf("failed to create eth writer: %w", err) + } + + cs := coreeth.NewChainState(tx, client) + ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) + + return ics, nil +} diff --git a/relay/limiter/blob_rate_limiter.go b/relay/limiter/blob_rate_limiter.go index 0ac260cba8..1131af863b 100644 --- a/relay/limiter/blob_rate_limiter.go +++ b/relay/limiter/blob_rate_limiter.go @@ -57,10 +57,12 @@ func (l *BlobRateLimiter) BeginGetBlobOperation(now time.Time) error { defer l.lock.Unlock() if l.operationsInFlight >= l.config.MaxConcurrentGetBlobOps { - return fmt.Errorf("global concurrent request limit exceeded for getBlob operations, try again later") + return fmt.Errorf("global concurrent request limit %d exceeded for getBlob operations, try again later", + l.config.MaxConcurrentGetBlobOps) } if l.opLimiter.TokensAt(now) < 1 { - return fmt.Errorf("global rate limit exceeded for getBlob operations, try again later") + return fmt.Errorf("global rate limit %0.1fhz exceeded for getBlob operations, try again later", + l.config.MaxGetBlobOpsPerSecond) } l.operationsInFlight++ @@ -96,7 +98,8 @@ func (l *BlobRateLimiter) RequestGetBlobBandwidth(now time.Time, bytes uint32) e allowed := l.bandwidthLimiter.AllowN(now, int(bytes)) if !allowed { - return fmt.Errorf("global rate limit exceeded for getBlob bandwidth, try again later") + return fmt.Errorf("global rate limit %dMib/s exceeded for getBlob bandwidth, try again later", + int(l.config.MaxGetBlobBytesPerSecond/1024/1024)) } return nil } diff --git a/relay/limiter/chunk_rate_limiter.go b/relay/limiter/chunk_rate_limiter.go index fe899e5b17..af71b317b9 100644 --- a/relay/limiter/chunk_rate_limiter.go +++ b/relay/limiter/chunk_rate_limiter.go @@ -26,10 +26,6 @@ type ChunkRateLimiter struct { // per-client limiters - // Note: in its current form, these expose a DOS vector, since an attacker can create many clients IDs - // and force these maps to become arbitrarily large. This will be remedied when authentication - // is implemented, as only authentication will happen prior to rate limiting. - // perClientOpLimiter enforces per-client rate limits on the maximum rate of GetChunk operations perClientOpLimiter map[string]*rate.Limiter @@ -94,16 +90,21 @@ func (l *ChunkRateLimiter) BeginGetChunkOperation( } if l.globalOperationsInFlight >= l.config.MaxConcurrentGetChunkOps { - return fmt.Errorf("global concurrent request limit exceeded for GetChunks operations, try again later") + return fmt.Errorf( + "global concurrent request limit %d exceeded for GetChunks operations, try again later", + l.config.MaxConcurrentGetChunkOps) } if l.globalOpLimiter.TokensAt(now) < 1 { - return fmt.Errorf("global rate limit exceeded for GetChunks operations, try again later") + return fmt.Errorf("global rate limit %0.1fhz exceeded for GetChunks operations, try again later", + l.config.MaxGetChunkOpsPerSecond) } if l.perClientOperationsInFlight[requesterID] >= l.config.MaxConcurrentGetChunkOpsClient { - return fmt.Errorf("client concurrent request limit exceeded for GetChunks") + return fmt.Errorf("client concurrent request limit %d exceeded for GetChunks", + l.config.MaxConcurrentGetChunkOpsClient) } if l.perClientOpLimiter[requesterID].TokensAt(now) < 1 { - return fmt.Errorf("client rate limit exceeded for GetChunks, try again later") + return fmt.Errorf("client rate limit %0.1fhz exceeded for GetChunks, try again later", + l.config.MaxGetChunkOpsPerSecondClient) } l.globalOperationsInFlight++ @@ -138,13 +139,19 @@ func (l *ChunkRateLimiter) RequestGetChunkBandwidth(now time.Time, requesterID s allowed := l.globalBandwidthLimiter.AllowN(now, bytes) if !allowed { - return fmt.Errorf("global rate limit exceeded for GetChunk bandwidth, try again later") + return fmt.Errorf("global rate limit %dMiB exceeded for GetChunk bandwidth, try again later", + int(l.config.MaxGetChunkBytesPerSecond/1024/1024)) } - allowed = l.perClientBandwidthLimiter[requesterID].AllowN(now, bytes) + limiter, ok := l.perClientBandwidthLimiter[requesterID] + if !ok { + return fmt.Errorf("internal error, unable to find bandwidth limiter for client ID %s", requesterID) + } + allowed = limiter.AllowN(now, bytes) if !allowed { l.globalBandwidthLimiter.AllowN(now, -bytes) - return fmt.Errorf("client rate limit exceeded for GetChunk bandwidth, try again later") + return fmt.Errorf("client rate limit %dMiB exceeded for GetChunk bandwidth, try again later", + int(l.config.MaxGetChunkBytesPerSecondClient/1024/1024)) } return nil diff --git a/relay/metadata_provider.go b/relay/metadata_provider.go index 33407fa124..e70531702d 100644 --- a/relay/metadata_provider.go +++ b/relay/metadata_provider.go @@ -164,7 +164,7 @@ func (m *metadataProvider) fetchMetadata(key v2.BlobKey) (*blobMetadata, error) } // TODO(cody-littley): blob size is not correct https://github.com/Layr-Labs/eigenda/pull/906#discussion_r1847396530 - blobSize := uint32(cert.BlobHeader.BlobCommitments.Length) + blobSize := uint32(cert.BlobHeader.BlobCommitments.Length) * encoding.BYTES_PER_SYMBOL chunkSize, err := v2.GetChunkLength(cert.BlobHeader.BlobVersion, blobSize) chunkSize *= encoding.BYTES_PER_SYMBOL if err != nil { diff --git a/relay/server.go b/relay/server.go index 56bedea146..730a13f82d 100644 --- a/relay/server.go +++ b/relay/server.go @@ -13,10 +13,12 @@ import ( v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/relay/auth" "github.com/Layr-Labs/eigenda/relay/chunkstore" "github.com/Layr-Labs/eigenda/relay/limiter" "github.com/Layr-Labs/eigensdk-go/logging" "google.golang.org/grpc" + "google.golang.org/grpc/peer" "google.golang.org/grpc/reflection" ) @@ -49,6 +51,9 @@ type Server struct { // grpcServer is the gRPC server. grpcServer *grpc.Server + + // authenticator is used to authenticate requests to the relay service. + authenticator auth.RequestAuthenticator } type Config struct { @@ -88,6 +93,17 @@ type Config struct { // RateLimits contains configuration for rate limiting. RateLimits limiter.Config + + // AuthenticationKeyCacheSize is the maximum number of operator public keys that can be cached. + AuthenticationKeyCacheSize int + + // AuthenticationTimeout is the duration for which an authentication is "cached". A request from the same client + // within this duration will not trigger a new authentication in order to save resources. If zero, then each request + // will be authenticated independently, regardless of timing. + AuthenticationTimeout time.Duration + + // AuthenticationDisabled will disable authentication if set to true. + AuthenticationDisabled bool } // NewServer creates a new relay Server. @@ -97,7 +113,8 @@ func NewServer( config *Config, metadataStore *blobstore.BlobMetadataStore, blobStore *blobstore.BlobStore, - chunkReader chunkstore.ChunkReader) (*Server, error) { + chunkReader chunkstore.ChunkReader, + ics core.IndexedChainState) (*Server, error) { mp, err := newMetadataProvider( ctx, @@ -130,6 +147,17 @@ func NewServer( return nil, fmt.Errorf("error creating chunk provider: %w", err) } + var authenticator auth.RequestAuthenticator + if !config.AuthenticationDisabled { + authenticator, err = auth.NewRequestAuthenticator( + ics, + config.AuthenticationKeyCacheSize, + config.AuthenticationTimeout) + if err != nil { + return nil, fmt.Errorf("error creating authenticator: %w", err) + } + } + return &Server{ config: config, logger: logger, @@ -138,6 +166,7 @@ func NewServer( chunkProvider: cp, blobRateLimiter: limiter.NewBlobRateLimiter(&config.RateLimits), chunkRateLimiter: limiter.NewChunkRateLimiter(&config.RateLimits), + authenticator: authenticator, }, nil } @@ -190,7 +219,6 @@ func (s *Server) GetBlob(ctx context.Context, request *pb.GetBlobRequest) (*pb.G func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (*pb.GetChunksReply, error) { // TODO(cody-littley): - // - authentication // - timeouts if len(request.ChunkRequests) <= 0 { @@ -201,9 +229,22 @@ func (s *Server) GetChunks(ctx context.Context, request *pb.GetChunksRequest) (* "too many chunk requests provided, max is %d", s.config.MaxKeysPerGetChunksRequest) } - // Future work: client IDs will be fixed when authentication is implemented - clientID := fmt.Sprintf("%d", request.RequesterId) + if s.authenticator != nil { + client, ok := peer.FromContext(ctx) + if !ok { + return nil, errors.New("could not get peer information") + } + clientAddress := client.Addr.String() + + err := s.authenticator.AuthenticateGetChunksRequest(clientAddress, request, time.Now()) + if err != nil { + return nil, fmt.Errorf("auth failed: %w", err) + } + } + + clientID := string(request.OperatorId) err := s.chunkRateLimiter.BeginGetChunkOperation(time.Now(), clientID) + if err != nil { return nil, err } diff --git a/relay/server_test.go b/relay/server_test.go index 10a7ecca91..f67a8fb650 100644 --- a/relay/server_test.go +++ b/relay/server_test.go @@ -46,6 +46,7 @@ func defaultConfig() *Config { GetChunkBytesBurstinessClient: 2 * 1024 * 1024, MaxConcurrentGetChunkOpsClient: 1, }, + AuthenticationDisabled: true, } } @@ -102,6 +103,7 @@ func TestReadWriteBlobs(t *testing.T) { config, metadataStore, blobStore, + nil, /* not used in this test*/ nil /* not used in this test*/) require.NoError(t, err) @@ -179,7 +181,8 @@ func TestReadNonExistentBlob(t *testing.T) { config, metadataStore, blobStore, - nil /* not used in this test */) + nil, /* not used in this test */ + nil /* not used in this test*/) require.NoError(t, err) go func() { @@ -231,6 +234,7 @@ func TestReadWriteBlobsWithSharding(t *testing.T) { config, metadataStore, blobStore, + nil, /* not used in this test*/ nil /* not used in this test*/) require.NoError(t, err) @@ -348,7 +352,8 @@ func TestReadWriteChunks(t *testing.T) { config, metadataStore, nil, /* not used in this test*/ - chunkReader) + chunkReader, + nil /* not used in this test*/) require.NoError(t, err) go func() { @@ -543,7 +548,8 @@ func TestBatchedReadWriteChunks(t *testing.T) { config, metadataStore, nil, /* not used in this test */ - chunkReader) + chunkReader, + nil /* not used in this test*/) require.NoError(t, err) go func() { @@ -668,7 +674,8 @@ func TestReadWriteChunksWithSharding(t *testing.T) { config, metadataStore, nil, /* not used in this test*/ - chunkReader) + chunkReader, + nil /* not used in this test*/) require.NoError(t, err) go func() { @@ -942,7 +949,8 @@ func TestBatchedReadWriteChunksWithSharding(t *testing.T) { config, metadataStore, nil, /* not used in this test */ - chunkReader) + chunkReader, + nil /* not used in this test*/) require.NoError(t, err) go func() {