diff --git a/disperser/dataapi/docs/v2/V2_docs.go b/disperser/dataapi/docs/v2/V2_docs.go index e5b5dee45a..cd0ddc4343 100644 --- a/disperser/dataapi/docs/v2/V2_docs.go +++ b/disperser/dataapi/docs/v2/V2_docs.go @@ -483,6 +483,58 @@ const docTemplateV2 = `{ "big.Int": { "type": "object" }, + "core.BlobHeader": { + "type": "object", + "properties": { + "accountID": { + "description": "AccountID is the account that is paying for the blob to be stored", + "type": "string" + }, + "commitment": { + "$ref": "#/definitions/encoding.G1Commitment" + }, + "length": { + "type": "integer" + }, + "length_commitment": { + "$ref": "#/definitions/encoding.G2Commitment" + }, + "length_proof": { + "$ref": "#/definitions/encoding.LengthProof" + }, + "quorumInfos": { + "description": "QuorumInfos contains the quorum specific parameters for the blob", + "type": "array", + "items": { + "$ref": "#/definitions/core.BlobQuorumInfo" + } + } + } + }, + "core.BlobQuorumInfo": { + "type": "object", + "properties": { + "adversaryThreshold": { + "description": "AdversaryThreshold is the maximum amount of stake that can be controlled by an adversary in the quorum as a percentage of the total stake in the quorum", + "type": "integer" + }, + "chunkLength": { + "description": "ChunkLength is the number of symbols in a chunk", + "type": "integer" + }, + "confirmationThreshold": { + "description": "ConfirmationThreshold is the amount of stake that must sign a message for it to be considered valid as a percentage of the total stake in the quorum", + "type": "integer" + }, + "quorumID": { + "type": "integer" + }, + "quorumRate": { + "description": "Rate Limit. This is a temporary measure until the node can derive rates on its own using rollup authentication. This is used\nfor restricting the rate at which retrievers are able to download data from the DA node to a multiple of the rate at which the\ndata was posted to the DA node.", + "type": "integer" + } + } + }, "core.G1Point": { "type": "object", "properties": { @@ -741,6 +793,23 @@ const docTemplateV2 = `{ } } }, + "github_com_Layr-Labs_eigenda_disperser_common_v2.BlobStatus": { + "type": "integer", + "enum": [ + 0, + 1, + 2, + 3, + 4 + ], + "x-enum-varnames": [ + "Queued", + "Encoded", + "Certified", + "Failed", + "InsufficientSignatures" + ] + }, "github_com_Layr-Labs_eigenda_disperser_dataapi_v2.SignedBatch": { "type": "object", "properties": { @@ -811,6 +880,64 @@ const docTemplateV2 = `{ } } }, + "v2.BlobFeedResponse": { + "type": "object", + "properties": { + "blobs": { + "type": "array", + "items": { + "$ref": "#/definitions/v2.BlobMetadata" + } + }, + "pagination_token": { + "type": "string" + } + } + }, + "v2.BlobMetadata": { + "type": "object", + "properties": { + "blobHeader": { + "$ref": "#/definitions/core.BlobHeader" + }, + "blobSize": { + "description": "BlobSize is the size of the blob in bytes", + "type": "integer" + }, + "blobStatus": { + "description": "BlobStatus indicates the current status of the blob", + "allOf": [ + { + "$ref": "#/definitions/github_com_Layr-Labs_eigenda_disperser_common_v2.BlobStatus" + } + ] + }, + "expiry": { + "description": "Expiry is Unix timestamp of the blob expiry in seconds from epoch", + "type": "integer" + }, + "fragmentSizeBytes": { + "description": "FragmentSizeBytes is the maximum fragment size used to store the chunk coefficients.", + "type": "integer" + }, + "numRetries": { + "description": "NumRetries is the number of times the blob has been retried", + "type": "integer" + }, + "requestedAt": { + "description": "RequestedAt is the Unix timestamp of when the blob was requested in seconds", + "type": "integer" + }, + "totalChunkSizeBytes": { + "description": "TotalChunkSizeBytes is the total size of the file containing all chunk coefficients for the blob.", + "type": "integer" + }, + "updatedAt": { + "description": "UpdatedAt is the Unix timestamp of when the blob was last updated in _nanoseconds_", + "type": "integer" + } + } + }, "v2.BlobResponse": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/v2/V2_swagger.json b/disperser/dataapi/docs/v2/V2_swagger.json index af154b8114..9078308d16 100644 --- a/disperser/dataapi/docs/v2/V2_swagger.json +++ b/disperser/dataapi/docs/v2/V2_swagger.json @@ -480,6 +480,58 @@ "big.Int": { "type": "object" }, + "core.BlobHeader": { + "type": "object", + "properties": { + "accountID": { + "description": "AccountID is the account that is paying for the blob to be stored", + "type": "string" + }, + "commitment": { + "$ref": "#/definitions/encoding.G1Commitment" + }, + "length": { + "type": "integer" + }, + "length_commitment": { + "$ref": "#/definitions/encoding.G2Commitment" + }, + "length_proof": { + "$ref": "#/definitions/encoding.LengthProof" + }, + "quorumInfos": { + "description": "QuorumInfos contains the quorum specific parameters for the blob", + "type": "array", + "items": { + "$ref": "#/definitions/core.BlobQuorumInfo" + } + } + } + }, + "core.BlobQuorumInfo": { + "type": "object", + "properties": { + "adversaryThreshold": { + "description": "AdversaryThreshold is the maximum amount of stake that can be controlled by an adversary in the quorum as a percentage of the total stake in the quorum", + "type": "integer" + }, + "chunkLength": { + "description": "ChunkLength is the number of symbols in a chunk", + "type": "integer" + }, + "confirmationThreshold": { + "description": "ConfirmationThreshold is the amount of stake that must sign a message for it to be considered valid as a percentage of the total stake in the quorum", + "type": "integer" + }, + "quorumID": { + "type": "integer" + }, + "quorumRate": { + "description": "Rate Limit. This is a temporary measure until the node can derive rates on its own using rollup authentication. This is used\nfor restricting the rate at which retrievers are able to download data from the DA node to a multiple of the rate at which the\ndata was posted to the DA node.", + "type": "integer" + } + } + }, "core.G1Point": { "type": "object", "properties": { @@ -738,6 +790,23 @@ } } }, + "github_com_Layr-Labs_eigenda_disperser_common_v2.BlobStatus": { + "type": "integer", + "enum": [ + 0, + 1, + 2, + 3, + 4 + ], + "x-enum-varnames": [ + "Queued", + "Encoded", + "Certified", + "Failed", + "InsufficientSignatures" + ] + }, "github_com_Layr-Labs_eigenda_disperser_dataapi_v2.SignedBatch": { "type": "object", "properties": { @@ -808,6 +877,64 @@ } } }, + "v2.BlobFeedResponse": { + "type": "object", + "properties": { + "blobs": { + "type": "array", + "items": { + "$ref": "#/definitions/v2.BlobMetadata" + } + }, + "pagination_token": { + "type": "string" + } + } + }, + "v2.BlobMetadata": { + "type": "object", + "properties": { + "blobHeader": { + "$ref": "#/definitions/core.BlobHeader" + }, + "blobSize": { + "description": "BlobSize is the size of the blob in bytes", + "type": "integer" + }, + "blobStatus": { + "description": "BlobStatus indicates the current status of the blob", + "allOf": [ + { + "$ref": "#/definitions/github_com_Layr-Labs_eigenda_disperser_common_v2.BlobStatus" + } + ] + }, + "expiry": { + "description": "Expiry is Unix timestamp of the blob expiry in seconds from epoch", + "type": "integer" + }, + "fragmentSizeBytes": { + "description": "FragmentSizeBytes is the maximum fragment size used to store the chunk coefficients.", + "type": "integer" + }, + "numRetries": { + "description": "NumRetries is the number of times the blob has been retried", + "type": "integer" + }, + "requestedAt": { + "description": "RequestedAt is the Unix timestamp of when the blob was requested in seconds", + "type": "integer" + }, + "totalChunkSizeBytes": { + "description": "TotalChunkSizeBytes is the total size of the file containing all chunk coefficients for the blob.", + "type": "integer" + }, + "updatedAt": { + "description": "UpdatedAt is the Unix timestamp of when the blob was last updated in _nanoseconds_", + "type": "integer" + } + } + }, "v2.BlobResponse": { "type": "object", "properties": { diff --git a/disperser/dataapi/docs/v2/V2_swagger.yaml b/disperser/dataapi/docs/v2/V2_swagger.yaml index 6a2b7c1985..fd7bfc9963 100644 --- a/disperser/dataapi/docs/v2/V2_swagger.yaml +++ b/disperser/dataapi/docs/v2/V2_swagger.yaml @@ -2,6 +2,49 @@ basePath: /api/v2 definitions: big.Int: type: object + core.BlobHeader: + properties: + accountID: + description: AccountID is the account that is paying for the blob to be stored + type: string + commitment: + $ref: '#/definitions/encoding.G1Commitment' + length: + type: integer + length_commitment: + $ref: '#/definitions/encoding.G2Commitment' + length_proof: + $ref: '#/definitions/encoding.LengthProof' + quorumInfos: + description: QuorumInfos contains the quorum specific parameters for the blob + items: + $ref: '#/definitions/core.BlobQuorumInfo' + type: array + type: object + core.BlobQuorumInfo: + properties: + adversaryThreshold: + description: AdversaryThreshold is the maximum amount of stake that can be + controlled by an adversary in the quorum as a percentage of the total stake + in the quorum + type: integer + chunkLength: + description: ChunkLength is the number of symbols in a chunk + type: integer + confirmationThreshold: + description: ConfirmationThreshold is the amount of stake that must sign a + message for it to be considered valid as a percentage of the total stake + in the quorum + type: integer + quorumID: + type: integer + quorumRate: + description: |- + Rate Limit. This is a temporary measure until the node can derive rates on its own using rollup authentication. This is used + for restricting the rate at which retrievers are able to download data from the DA node to a multiple of the rate at which the + data was posted to the DA node. + type: integer + type: object core.G1Point: properties: x: @@ -183,6 +226,20 @@ definitions: information (stakes, indexes, etc.) is taken from type: integer type: object + github_com_Layr-Labs_eigenda_disperser_common_v2.BlobStatus: + enum: + - 0 + - 1 + - 2 + - 3 + - 4 + type: integer + x-enum-varnames: + - Queued + - Encoded + - Certified + - Failed + - InsufficientSignatures github_com_Layr-Labs_eigenda_disperser_dataapi_v2.SignedBatch: properties: attestation: @@ -228,6 +285,49 @@ definitions: blob_certificate: $ref: '#/definitions/github_com_Layr-Labs_eigenda_core_v2.BlobCertificate' type: object + v2.BlobFeedResponse: + properties: + blobs: + items: + $ref: '#/definitions/v2.BlobMetadata' + type: array + pagination_token: + type: string + type: object + v2.BlobMetadata: + properties: + blobHeader: + $ref: '#/definitions/core.BlobHeader' + blobSize: + description: BlobSize is the size of the blob in bytes + type: integer + blobStatus: + allOf: + - $ref: '#/definitions/github_com_Layr-Labs_eigenda_disperser_common_v2.BlobStatus' + description: BlobStatus indicates the current status of the blob + expiry: + description: Expiry is Unix timestamp of the blob expiry in seconds from epoch + type: integer + fragmentSizeBytes: + description: FragmentSizeBytes is the maximum fragment size used to store + the chunk coefficients. + type: integer + numRetries: + description: NumRetries is the number of times the blob has been retried + type: integer + requestedAt: + description: RequestedAt is the Unix timestamp of when the blob was requested + in seconds + type: integer + totalChunkSizeBytes: + description: TotalChunkSizeBytes is the total size of the file containing + all chunk coefficients for the blob. + type: integer + updatedAt: + description: UpdatedAt is the Unix timestamp of when the blob was last updated + in _nanoseconds_ + type: integer + type: object v2.BlobResponse: properties: blob_header: diff --git a/disperser/dataapi/v2/server_v2.go b/disperser/dataapi/v2/server_v2.go index 44008b7a61..62700f9899 100644 --- a/disperser/dataapi/v2/server_v2.go +++ b/disperser/dataapi/v2/server_v2.go @@ -16,6 +16,7 @@ import ( "github.com/Layr-Labs/eigenda/core" corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/common/semver" + disperserv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/disperser/dataapi" docsv2 "github.com/Layr-Labs/eigenda/disperser/dataapi/docs/v2" @@ -31,6 +32,10 @@ import ( var errNotFound = errors.New("not found") const ( + // The max number of blobs to return from blob Feed API, regardless of the time + // range or "limit" param. + maxBlobFeedNumBlobs = 1000 + cacheControlParam = "Cache-Control" maxFeedBlobAge = 300 // this is completely static maxOperatorsStakeAge = 300 // not expect the stake change to happen frequently @@ -66,6 +71,11 @@ type ( VerificationInfo *corev2.BlobVerificationInfo `json:"blob_verification_info"` } + BlobFeedResponse struct { + Blobs []*disperserv2.BlobMetadata `json:"blobs"` + PaginationToken string `json:"pagination_token"` + } + BatchResponse struct { BatchHeaderHash string `json:"batch_header_hash"` SignedBatch *SignedBatch `json:"signed_batch"` @@ -255,6 +265,13 @@ func errorResponse(c *gin.Context, err error) { }) } +func invalidParamsErrorResponse(c *gin.Context, err error) { + _ = c.Error(err) + c.JSON(http.StatusBadRequest, ErrorResponse{ + Error: err.Error(), + }) +} + func run(logger logging.Logger, httpServer *http.Server) <-chan error { errChan := make(chan error, 1) ctx, stop := signal.NotifyContext( @@ -294,8 +311,98 @@ func (s *ServerV2) Shutdown() error { return nil } +// FetchBlobFeedHandler godoc +// +// @Summary Fetch blob feed +// @Tags Blob +// @Produce json +// @Param end query string false "Fetch blobs up to the end time (ISO 8601 format: 2006-01-02T15:04:05Z) [default: now]" +// @Param interval query int false "Fetch blobs starting from an interval (in seconds) before the end time [default: 3600]" +// @Param pagination_token query string false "Fetch blobs starting from the pagination token (exclusively). Overrides the interval param if specified [default: empty]" +// @Param limit query int false "The maximum number of blobs to fetch. System max (1000) if limit <= 0 [default: 20; max: 1000]" +// @Success 200 {object} BlobFeedResponse +// @Failure 400 {object} ErrorResponse "error: Bad request" +// @Failure 404 {object} ErrorResponse "error: Not found" +// @Failure 500 {object} ErrorResponse "error: Server error" func (s *ServerV2) FetchBlobFeedHandler(c *gin.Context) { - errorResponse(c, errors.New("FetchBlobFeedHandler unimplemented")) + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("FetchBlobFeedHandler", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + var err error + + endTime := time.Now() + if c.Query("end") != "" { + endTime, err = time.Parse("2006-01-02T15:04:05Z", c.Query("end")) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedHandler") + invalidParamsErrorResponse(c, fmt.Errorf("failed to parse end param: %w", err)) + return + } + } + + interval := 3600 + if c.Query("interval") != "" { + interval, err = strconv.Atoi(c.Query("interval")) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedHandler") + invalidParamsErrorResponse(c, fmt.Errorf("failed to parse interval param: %w", err)) + return + } + } + + limit, err := strconv.Atoi(c.DefaultQuery("limit", "20")) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedHandler") + invalidParamsErrorResponse(c, fmt.Errorf("failed to parse limit param: %w", err)) + return + } + if limit <= 0 || limit > maxBlobFeedNumBlobs { + limit = maxBlobFeedNumBlobs + } + + paginationCursor := blobstore.BlobFeedCursor{ + RequestedAt: 0, + } + if c.Query("pagination_token") != "" { + cursor, err := paginationCursor.FromCursorKey(c.Query("pagination_token")) + if err != nil { + s.metrics.IncrementInvalidArgRequestNum("FetchBlobFeedHandler") + invalidParamsErrorResponse(c, fmt.Errorf("failed to parse the pagination token: %w", err)) + return + } + paginationCursor = *cursor + } + + startTime := endTime.Add(-time.Duration(interval) * time.Second) + startCursor := blobstore.BlobFeedCursor{ + RequestedAt: uint64(startTime.UnixNano()), + } + if startCursor.LessThan(&paginationCursor) { + startCursor = paginationCursor + } + endCursor := blobstore.BlobFeedCursor{ + RequestedAt: uint64(endTime.UnixNano()), + } + + blobs, paginationToken, err := s.blobMetadataStore.GetBlobMetadataByRequestedAt(c.Request.Context(), startCursor, endCursor, limit) + if err != nil { + s.metrics.IncrementFailedRequestNum("FetchBlobFeedHandler") + errorResponse(c, fmt.Errorf("failed to fetch feed from blob metadata store: %w", err)) + } + + token := "" + if paginationToken != nil { + token = paginationToken.ToCursorKey() + } + response := &BlobFeedResponse{ + Blobs: blobs, + PaginationToken: token, + } + c.Writer.Header().Set(cacheControlParam, fmt.Sprintf("max-age=%d", maxFeedBlobAge)) + s.metrics.IncrementSuccessfulRequestNum("FetchBlobFeedHandler") + c.JSON(http.StatusOK, response) } // FetchBlobHandler godoc diff --git a/disperser/dataapi/v2/server_v2_test.go b/disperser/dataapi/v2/server_v2_test.go index 27f9c563bf..afa3c40dd7 100644 --- a/disperser/dataapi/v2/server_v2_test.go +++ b/disperser/dataapi/v2/server_v2_test.go @@ -1,6 +1,7 @@ package v2_test import ( + "bytes" "context" "crypto/rand" _ "embed" @@ -12,6 +13,7 @@ import ( "net/http" "net/http/httptest" "os" + "sort" "testing" "time" @@ -24,6 +26,7 @@ import ( corev2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser/common/inmem" commonv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" + v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" blobstorev2 "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" "github.com/Layr-Labs/eigenda/disperser/dataapi" prommock "github.com/Layr-Labs/eigenda/disperser/dataapi/prometheus/mock" @@ -279,6 +282,18 @@ func decodeResponseBody[T any](t *testing.T, w *httptest.ResponseRecorder) T { return response } +func checkBlobKeyEqual(t *testing.T, blobKey corev2.BlobKey, blobHeader *corev2.BlobHeader) { + bk, err := blobHeader.BlobKey() + assert.Nil(t, err) + assert.Equal(t, blobKey, bk) +} + +func checkPaginationToken(t *testing.T, token string, requestedAt uint64, blobKey corev2.BlobKey) { + cursor, err := new(blobstorev2.BlobFeedCursor).FromCursorKey(token) + require.NoError(t, err) + assert.True(t, cursor.Equal(requestedAt, &blobKey)) +} + func TestFetchBlobHandlerV2(t *testing.T) { r := setUpRouter() @@ -339,6 +354,203 @@ func TestFetchBlobCertificateHandler(t *testing.T) { assert.Equal(t, blobHeader.Signature, response.Certificate.BlobHeader.Signature) } +func TestFetchBlobFeedHandler(t *testing.T) { + r := setUpRouter() + ctx := context.Background() + + // Create a timeline of test blobs: + // - Total of 103 blobs + // - First 3 blobs share the same timestamp (firstBlobTime) + // - The last blob has timestamp "now" + // - Remaining blobs are spaced 1 minute apart + // - Timeline spans roughly 100 minutes into the past from now + numBlobs := 103 + now := uint64(time.Now().UnixNano()) + nanoSecsPerBlob := uint64(60 * 1e9) // 1 blob per minute + firstBlobTime := now - uint64(numBlobs-3)*nanoSecsPerBlob + keys := make([]corev2.BlobKey, numBlobs) + requestedAt := make([]uint64, numBlobs) + + // Actually create blobs + firstBlobKeys := make([][32]byte, 3) + for i := 0; i < numBlobs; i++ { + blobHeader := makeBlobHeaderV2(t) + blobKey, err := blobHeader.BlobKey() + require.NoError(t, err) + keys[i] = blobKey + if i < 3 { + requestedAt[i] = firstBlobTime + firstBlobKeys[i] = keys[i] + } else { + requestedAt[i] = firstBlobTime + nanoSecsPerBlob*uint64(i-2) + } + + now := time.Now() + metadata := &v2.BlobMetadata{ + BlobHeader: blobHeader, + BlobStatus: v2.Encoded, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), + RequestedAt: requestedAt[i], + } + err = blobMetadataStore.PutBlobMetadata(ctx, metadata) + require.NoError(t, err) + } + sort.Slice(firstBlobKeys, func(i, j int) bool { + return bytes.Compare(firstBlobKeys[i][:], firstBlobKeys[j][:]) < 0 + }) + + r.GET("/v2/blobs/feed", testDataApiServerV2.FetchBlobFeedHandler) + + t.Run("invalid params", func(t *testing.T) { + reqUrls := []string{ + "/v2/blobs/feed?pagination_token=abc", + "/v2/blobs/feed?limit=abc", + "/v2/blobs/feed?interval=abc", + "/v2/blobs/feed?end=2006-01-02T15:04:05", + } + for _, url := range reqUrls { + w := httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, url, nil) + r.ServeHTTP(w, req) + require.Equal(t, http.StatusBadRequest, w.Result().StatusCode) + } + }) + + t.Run("default params", func(t *testing.T) { + // Default query returns: + // - Most recent 1 hour of blobs (60 blobs total available, keys[43], ..., keys[102]) + // - Limited to 20 results (the default "limit") + // - Starting from blob[43] through blob[62] + w := executeRequest(t, r, http.MethodGet, "/v2/blobs/feed") + response := decodeResponseBody[serverv2.BlobFeedResponse](t, w) + require.Equal(t, 20, len(response.Blobs)) + for i := 0; i < 20; i++ { + checkBlobKeyEqual(t, keys[43+i], response.Blobs[i].BlobHeader) + assert.Equal(t, requestedAt[43+i], response.Blobs[i].RequestedAt) + } + assert.True(t, len(response.PaginationToken) > 0) + checkPaginationToken(t, response.PaginationToken, requestedAt[62], keys[62]) + }) + + t.Run("various query ranges and limits", func(t *testing.T) { + // Test 1: Unlimited results in 1-hour window + // Returns keys[43] through keys[102] (60 blobs) + w := executeRequest(t, r, http.MethodGet, "/v2/blobs/feed?limit=0") + response := decodeResponseBody[serverv2.BlobFeedResponse](t, w) + require.Equal(t, 60, len(response.Blobs)) + for i := 0; i < 60; i++ { + checkBlobKeyEqual(t, keys[43+i], response.Blobs[i].BlobHeader) + assert.Equal(t, requestedAt[43+i], response.Blobs[i].RequestedAt) + } + assert.True(t, len(response.PaginationToken) > 0) + checkPaginationToken(t, response.PaginationToken, requestedAt[102], keys[102]) + + // Test 2: 2-hour window captures all test blobs + // Verifies correct ordering of timestamp-colliding blobs + w = executeRequest(t, r, http.MethodGet, "/v2/blobs/feed?interval=7200&limit=-1") + response = decodeResponseBody[serverv2.BlobFeedResponse](t, w) + require.Equal(t, numBlobs, len(response.Blobs)) + // First 3 blobs ordered by key due to same timestamp + checkBlobKeyEqual(t, firstBlobKeys[0], response.Blobs[0].BlobHeader) + checkBlobKeyEqual(t, firstBlobKeys[1], response.Blobs[1].BlobHeader) + checkBlobKeyEqual(t, firstBlobKeys[2], response.Blobs[2].BlobHeader) + for i := 3; i < numBlobs; i++ { + checkBlobKeyEqual(t, keys[i], response.Blobs[i].BlobHeader) + assert.Equal(t, requestedAt[i], response.Blobs[i].RequestedAt) + } + assert.True(t, len(response.PaginationToken) > 0) + checkPaginationToken(t, response.PaginationToken, requestedAt[102], keys[102]) + + // Test 3: Custom end time with 1-hour window + // Retrieves keys[41] through keys[100] + tm := time.Unix(0, int64(requestedAt[100])+1).UTC() + endTime := tm.Format("2006-01-02T15:04:05.999999999Z") + reqUrl := fmt.Sprintf("/v2/blobs/feed?end=%s&limit=-1", endTime) + w = executeRequest(t, r, http.MethodGet, reqUrl) + response = decodeResponseBody[serverv2.BlobFeedResponse](t, w) + require.Equal(t, 60, len(response.Blobs)) + for i := 0; i < 60; i++ { + checkBlobKeyEqual(t, keys[41+i], response.Blobs[i].BlobHeader) + assert.Equal(t, requestedAt[41+i], response.Blobs[i].RequestedAt) + } + assert.True(t, len(response.PaginationToken) > 0) + checkPaginationToken(t, response.PaginationToken, requestedAt[100], keys[100]) + }) + + t.Run("pagination", func(t *testing.T) { + // Test pagination behavior: + // 1. First page: blobs in past 1h limited to 20, returns keys[43] through keys[62] + // 2. Second page: the next 20 blobs, returns keys[63] through keys[82] + // Verifies: + // - Correct sequencing across pages + // - Proper token handling + tm := time.Unix(0, time.Now().UnixNano()).UTC() + endTime := tm.Format("2006-01-02T15:04:05.999999999Z") // nano precision format + reqUrl := fmt.Sprintf("/v2/blobs/feed?end=%s&limit=20", endTime) + w := executeRequest(t, r, http.MethodGet, reqUrl) + response := decodeResponseBody[serverv2.BlobFeedResponse](t, w) + require.Equal(t, 20, len(response.Blobs)) + for i := 0; i < 20; i++ { + checkBlobKeyEqual(t, keys[43+i], response.Blobs[i].BlobHeader) + assert.Equal(t, requestedAt[43+i], response.Blobs[i].RequestedAt) + } + assert.True(t, len(response.PaginationToken) > 0) + checkPaginationToken(t, response.PaginationToken, requestedAt[62], keys[62]) + + // Request next page using pagination token + reqUrl = fmt.Sprintf("/v2/blobs/feed?end=%s&limit=20&pagination_token=%s", endTime, response.PaginationToken) + w = executeRequest(t, r, http.MethodGet, reqUrl) + response = decodeResponseBody[serverv2.BlobFeedResponse](t, w) + require.Equal(t, 20, len(response.Blobs)) + for i := 0; i < 20; i++ { + checkBlobKeyEqual(t, keys[63+i], response.Blobs[i].BlobHeader) + assert.Equal(t, requestedAt[63+i], response.Blobs[i].RequestedAt) + } + assert.True(t, len(response.PaginationToken) > 0) + checkPaginationToken(t, response.PaginationToken, requestedAt[82], keys[82]) + }) + + t.Run("pagination over same-timestamp blobs", func(t *testing.T) { + // Test pagination behavior in case of same-timestamp blobs + // - We have 3 blobs with identical timestamp (firstBlobTime): firstBlobKeys[0,1,2] + // - These are followed by sequential blobs: keys[3,4] with different timestamps + // - End time is set to requestedAt[5] + tm := time.Unix(0, int64(requestedAt[5])).UTC() + endTime := tm.Format("2006-01-02T15:04:05.999999999Z") // nano precision format + + // First page: fetch 2 blobs, which have same requestedAt timestamp + reqUrl := fmt.Sprintf("/v2/blobs/feed?end=%s&limit=2", endTime) + w := executeRequest(t, r, http.MethodGet, reqUrl) + response := decodeResponseBody[serverv2.BlobFeedResponse](t, w) + require.Equal(t, 2, len(response.Blobs)) + checkBlobKeyEqual(t, firstBlobKeys[0], response.Blobs[0].BlobHeader) + checkBlobKeyEqual(t, firstBlobKeys[1], response.Blobs[1].BlobHeader) + assert.Equal(t, firstBlobTime, response.Blobs[0].RequestedAt) + assert.Equal(t, firstBlobTime, response.Blobs[1].RequestedAt) + assert.True(t, len(response.PaginationToken) > 0) + checkPaginationToken(t, response.PaginationToken, requestedAt[1], firstBlobKeys[1]) + + // Second page: fetch remaining blobs (limit=0 means no limit, hence reach the last blob) + reqUrl = fmt.Sprintf("/v2/blobs/feed?end=%s&limit=0&pagination_token=%s", endTime, response.PaginationToken) + w = executeRequest(t, r, http.MethodGet, reqUrl) + response = decodeResponseBody[serverv2.BlobFeedResponse](t, w) + // Verify second page contains: + // 1. Last same-timestamp blob + // 2. Following blobs with sequential timestamps + require.Equal(t, 3, len(response.Blobs)) + checkBlobKeyEqual(t, firstBlobKeys[2], response.Blobs[0].BlobHeader) + checkBlobKeyEqual(t, keys[3], response.Blobs[1].BlobHeader) + checkBlobKeyEqual(t, keys[4], response.Blobs[2].BlobHeader) + assert.Equal(t, firstBlobTime, response.Blobs[0].RequestedAt) + assert.Equal(t, requestedAt[3], response.Blobs[1].RequestedAt) + assert.Equal(t, requestedAt[4], response.Blobs[2].RequestedAt) + assert.True(t, len(response.PaginationToken) > 0) + checkPaginationToken(t, response.PaginationToken, requestedAt[4], keys[4]) + }) +} + func TestFetchBlobVerificationInfoHandler(t *testing.T) { r := setUpRouter()