Skip to content

Commit

Permalink
Implement awaitData tailable cursors (#3900)
Browse files Browse the repository at this point in the history
  • Loading branch information
noisersup authored and sachinpuranik committed Jan 2, 2024
1 parent 93c8870 commit f8b1f40
Show file tree
Hide file tree
Showing 6 changed files with 480 additions and 17 deletions.
301 changes: 293 additions & 8 deletions integration/cursors/awaitdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
Expand All @@ -26,19 +27,18 @@ import (
"github.com/FerretDB/FerretDB/integration"
"github.com/FerretDB/FerretDB/integration/setup"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/util/testutil"
)

func TestCursorsTailableAwaitData(tt *testing.T) {
tt.Parallel()

t := setup.FailsForFerretDB(tt, "https://github.com/FerretDB/FerretDB/issues/2283")
func TestCursorsTailableAwaitDataGetMoreMaxTimeMS(t *testing.T) {
t.Parallel()

s := setup.SetupWithOpts(t, nil)

db, ctx := s.Collection.Database(), s.Ctx

opts := options.CreateCollection().SetCapped(true).SetSizeInBytes(10000000000)
opts := options.CreateCollection().SetCapped(true).SetSizeInBytes(10000)
err := db.CreateCollection(s.Ctx, testutil.CollectionName(t), opts)
require.NoError(t, err)

Expand Down Expand Up @@ -88,6 +88,72 @@ func TestCursorsTailableAwaitData(tt *testing.T) {
require.Equal(t, 1, nextBatch.Len())
}

func TestCursorsTailableAwaitDataNonFullBatch(t *testing.T) {
t.Parallel()

s := setup.SetupWithOpts(t, nil)

db, ctx := s.Collection.Database(), s.Ctx

opts := options.CreateCollection().SetCapped(true).SetSizeInBytes(1000)
err := db.CreateCollection(s.Ctx, testutil.CollectionName(t), opts)
require.NoError(t, err)

collection := db.Collection(testutil.CollectionName(t))

bsonArr, _ := integration.GenerateDocuments(0, 2)

_, err = collection.InsertMany(ctx, bsonArr)
require.NoError(t, err)

cmd := bson.D{
{"find", collection.Name()},
{"batchSize", 1},
{"tailable", true},
{"awaitData", true},
}

var res bson.D
err = collection.Database().RunCommand(ctx, cmd).Decode(&res)
require.NoError(t, err)

var firstBatch *types.Array
firstBatch, cursorID := getFirstBatch(t, res)

require.Equal(t, 1, firstBatch.Len())

getMoreCmd := bson.D{
{"getMore", cursorID},
{"collection", collection.Name()},
{"batchSize", 2},
{"maxTimeMS", (30 * time.Second).Milliseconds()},
}

err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res)
require.NoError(t, err)

nextBatch, nextID := getNextBatch(t, res)
require.Equal(t, cursorID, nextID)
require.Equal(t, 1, nextBatch.Len())

insertChan := make(chan error)

go func() {
time.Sleep(1 * time.Second)
_, insertErr := collection.InsertOne(ctx, bson.D{{"v", "bar"}})
insertChan <- insertErr
}()

err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res)
require.NoError(t, err)

require.NoError(t, <-insertChan)

nextBatch, nextID = getNextBatch(t, res)
require.Equal(t, cursorID, nextID)
require.Equal(t, 1, nextBatch.Len())
}

func TestCursorsAwaitDataErrors(t *testing.T) {
t.Parallel()

Expand All @@ -104,9 +170,7 @@ func TestCursorsAwaitDataErrors(t *testing.T) {
_, err = collection.InsertOne(ctx, bson.D{{"v", "foo"}})
require.NoError(t, err)

t.Run("NonTailable", func(tt *testing.T) {
t := setup.FailsForFerretDB(tt, "https://github.com/FerretDB/FerretDB/issues/2283")

t.Run("NonTailable", func(t *testing.T) {
err = collection.Database().RunCommand(ctx, bson.D{
{"find", collection.Name()},
{"batchSize", 1},
Expand All @@ -121,3 +185,224 @@ func TestCursorsAwaitDataErrors(t *testing.T) {
integration.AssertEqualCommandError(t, expectedErr, err)
})
}

func TestCursorsTailableAwaitData(t *testing.T) {
t.Parallel()

s := setup.SetupWithOpts(t, nil)

db, ctx := s.Collection.Database(), s.Ctx

opts := options.CreateCollection().SetCapped(true).SetSizeInBytes(10000)
err := db.CreateCollection(s.Ctx, testutil.CollectionName(t), opts)
require.NoError(t, err)

collection := db.Collection(t.Name())

bsonArr, arr := integration.GenerateDocuments(0, 3)

_, err = collection.InsertMany(ctx, bsonArr)
require.NoError(t, err)

var cursorID any

t.Run("FirstBatch", func(t *testing.T) {
cmd := bson.D{
{"find", collection.Name()},
{"batchSize", 1},
{"tailable", true},
{"awaitData", true},
}

var res bson.D
err = collection.Database().RunCommand(ctx, cmd).Decode(&res)
require.NoError(t, err)

var firstBatch *types.Array
firstBatch, cursorID = getFirstBatch(t, res)

expectedFirstBatch := integration.ConvertDocuments(t, arr[:1])
require.Equal(t, len(expectedFirstBatch), firstBatch.Len())
require.Equal(t, expectedFirstBatch[0], must.NotFail(firstBatch.Get(0)))
})

t.Run("GetMore", func(t *testing.T) {
getMoreCmd := bson.D{
{"getMore", cursorID},
{"collection", collection.Name()},
{"batchSize", 1},
}

for i := 0; i < 2; i++ {
var res bson.D
err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res)
require.NoError(t, err)

nextBatch, nextID := getNextBatch(t, res)
expectedNextBatch := integration.ConvertDocuments(t, arr[i+1:i+2])

assert.Equal(t, cursorID, nextID)

require.Equal(t, len(expectedNextBatch), nextBatch.Len())
require.Equal(t, expectedNextBatch[0], must.NotFail(nextBatch.Get(0)))
}
})

t.Run("GetMoreEmpty", func(t *testing.T) {
getMoreCmd := bson.D{
{"getMore", cursorID},
{"collection", collection.Name()},
{"batchSize", 1},
}

var res bson.D
err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res)
require.NoError(t, err)

nextBatch, nextID := getNextBatch(t, res)
require.Equal(t, 0, nextBatch.Len())
assert.Equal(t, cursorID, nextID)
})

t.Run("GetMoreNewDoc", func(t *testing.T) {
getMoreCmd := bson.D{
{"getMore", cursorID},
{"collection", collection.Name()},
{"batchSize", 1},
{"maxTimeMS", 2000},
}

newDoc := bson.D{{"_id", "new"}}
_, err = collection.InsertOne(ctx, newDoc)
require.NoError(t, err)

var res bson.D
err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res)
require.NoError(t, err)

nextBatch, nextID := getNextBatch(t, res)

assert.Equal(t, cursorID, nextID)

require.Equal(t, 1, nextBatch.Len())
require.Equal(t, integration.ConvertDocument(t, newDoc), must.NotFail(nextBatch.Get(0)))
})

t.Run("GetMoreEmptyAfterInsertion", func(t *testing.T) {
getMoreCmd := bson.D{
{"getMore", cursorID},
{"collection", collection.Name()},
{"batchSize", 1},
}

var res bson.D
err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res)
require.NoError(t, err)

nextBatch, nextID := getNextBatch(t, res)
require.Equal(t, 0, nextBatch.Len())
assert.Equal(t, cursorID, nextID)
})
}

func TestCursorsTailableAwaitDataTwoCursorsSameCollection(t *testing.T) {
t.Parallel()

s := setup.SetupWithOpts(t, nil)

db, ctx := s.Collection.Database(), s.Ctx

opts := options.CreateCollection().SetCapped(true).SetSizeInBytes(10000)
err := db.CreateCollection(s.Ctx, t.Name(), opts)
require.NoError(t, err)

collection := db.Collection(t.Name())

bsonArr, arr := integration.GenerateDocuments(0, 50)

_, err = collection.InsertMany(ctx, bsonArr)
require.NoError(t, err)

var cursorID1, cursorID2 any

cmd := bson.D{
{"find", collection.Name()},
{"batchSize", 1},
{"tailable", true},
{"awaitData", true},
}

var res bson.D

err = collection.Database().RunCommand(ctx, cmd).Decode(&res)
require.NoError(t, err)

var firstBatch1 *types.Array
firstBatch1, cursorID1 = getFirstBatch(t, res)

err = collection.Database().RunCommand(ctx, cmd).Decode(&res)
require.NoError(t, err)

var firstBatch2 *types.Array
firstBatch2, cursorID2 = getFirstBatch(t, res)

expectedFirstBatch := integration.ConvertDocuments(t, arr[:1])

require.Equal(t, len(expectedFirstBatch), firstBatch1.Len())
require.Equal(t, expectedFirstBatch[0], must.NotFail(firstBatch1.Get(0)))

require.Equal(t, len(expectedFirstBatch), firstBatch2.Len())
require.Equal(t, expectedFirstBatch[0], must.NotFail(firstBatch2.Get(0)))

getMoreCmd1 := bson.D{
{"getMore", cursorID1},
{"collection", collection.Name()},
{"batchSize", 1},
{"maxTimeMS", (2 * time.Second).Milliseconds()},
}

getMoreCmd2 := bson.D{
{"getMore", cursorID2},
{"collection", collection.Name()},
{"batchSize", 1},
{"maxTimeMS", (2 * time.Second).Milliseconds()},
}

for i := 0; i < 49; i++ {
err = collection.Database().RunCommand(ctx, getMoreCmd1).Decode(&res)
require.NoError(t, err)

nextBatch1, nextID1 := getNextBatch(t, res)

err = collection.Database().RunCommand(ctx, getMoreCmd2).Decode(&res)
require.NoError(t, err)

nextBatch2, nextID2 := getNextBatch(t, res)

expectedNextBatch := integration.ConvertDocuments(t, arr[i+1:i+2])

assert.Equal(t, cursorID1, nextID1)
require.Equal(t, len(expectedNextBatch), nextBatch1.Len())
require.Equal(t, expectedNextBatch[0], must.NotFail(nextBatch1.Get(0)))

assert.Equal(t, cursorID2, nextID2)
require.Equal(t, len(expectedNextBatch), nextBatch2.Len())
require.Equal(t, expectedNextBatch[0], must.NotFail(nextBatch2.Get(0)))
}

err = collection.Database().RunCommand(ctx, getMoreCmd1).Decode(&res)
require.NoError(t, err)

nextBatch1, nextID1 := getNextBatch(t, res)

err = collection.Database().RunCommand(ctx, getMoreCmd2).Decode(&res)
require.NoError(t, err)

nextBatch2, nextID2 := getNextBatch(t, res)

require.Equal(t, 0, nextBatch1.Len())
assert.Equal(t, cursorID1, nextID1)

require.Equal(t, 0, nextBatch2.Len())
assert.Equal(t, cursorID2, nextID2)
}
Loading

0 comments on commit f8b1f40

Please sign in to comment.