diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index cae9ce5769a..51116b0aeb5 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -66,6 +66,112 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) { } } +// See https://github.com/influxdata/influxdb/issues/14229 +func TestEngine_DeleteSeriesAfterCacheSnapshot(t *testing.T) { + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { + e := MustOpenEngine(index) + defer e.Close() + + if err := e.WritePointsString( + `cpu,host=A value=1.1 1000000000`, + `cpu,host=B value=1.2 2000000000`, + ); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"})) + e.CreateSeriesIfNotExists([]byte("cpu,host=B"), []byte("cpu"), models.NewTags(map[string]string{"host": "B"})) + + // Verify series exist. + n, err := seriesExist(e, "cpu", []string{"host"}) + if err != nil { + t.Fatal(err) + } else if got, exp := n, 2; got != exp { + t.Fatalf("got %d points, expected %d", got, exp) + } + + // Simulate restart of server + if err := e.Reopen(); err != nil { + t.Fatal(err) + } + + // Snapshot the cache + if err := e.WriteSnapshot(); err != nil { + t.Fatalf("failed to snapshot: %s", err.Error()) + } + + // Verify series exist. + n, err = seriesExist(e, "cpu", []string{"host"}) + if err != nil { + t.Fatal(err) + } else if got, exp := n, 2; got != exp { + t.Fatalf("got %d points, expected %d", got, exp) + } + + // Delete the series + itr := &seriesIterator{keys: [][]byte{ + []byte("cpu,host=A"), + []byte("cpu,host=B"), + }, + } + if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil { + t.Fatalf("failed to delete series: %s", err.Error()) + } + + // Verify the series are no longer present. + n, err = seriesExist(e, "cpu", []string{"host"}) + if err != nil { + t.Fatal(err) + } else if got, exp := n, 0; got != exp { + t.Fatalf("got %d points, expected %d", got, exp) + } + + // Simulate restart of server + if err := e.Reopen(); err != nil { + t.Fatal(err) + } + + // Verify the series are no longer present. + n, err = seriesExist(e, "cpu", []string{"host"}) + if err != nil { + t.Fatal(err) + } else if got, exp := n, 0; got != exp { + t.Fatalf("got %d points, expected %d", got, exp) + } + }) + } +} + +func seriesExist(e *Engine, m string, dims []string) (int, error) { + itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{ + Expr: influxql.MustParseExpr(`value`), + Dimensions: []string{"host"}, + StartTime: influxql.MinTime, + EndTime: influxql.MaxTime, + Ascending: false, + }) + if err != nil { + return 0, err + } else if itr == nil { + return 0, nil + } + defer itr.Close() + fitr := itr.(query.FloatIterator) + + var n int + for { + p, err := fitr.Next() + if err != nil { + return 0, err + } else if p == nil { + return n, nil + } + n++ + } +} + // Ensure that the engine can write & read shard digest files. func TestEngine_Digest(t *testing.T) { e := MustOpenEngine(inmem.IndexName)