Skip to content

Commit

Permalink
test(storage): add test for reproducing #14229
Browse files Browse the repository at this point in the history
  • Loading branch information
e-dard committed Jul 5, 2019
1 parent af6cd96 commit a61e88a
Showing 1 changed file with 106 additions and 0 deletions.
106 changes: 106 additions & 0 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,112 @@ func TestEngine_DeleteWALLoadMetadata(t *testing.T) {
}
}

// See https://github.com/influxdata/influxdb/issues/14229
func TestEngine_DeleteSeriesAfterCacheSnapshot(t *testing.T) {
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=B value=1.2 2000000000`,
); err != nil {
t.Fatalf("failed to write points: %s", err.Error())
}

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}))
e.CreateSeriesIfNotExists([]byte("cpu,host=B"), []byte("cpu"), models.NewTags(map[string]string{"host": "B"}))

// Verify series exist.
n, err := seriesExist(e, "cpu", []string{"host"})
if err != nil {
t.Fatal(err)
} else if got, exp := n, 2; got != exp {
t.Fatalf("got %d points, expected %d", got, exp)
}

// Simulate restart of server
if err := e.Reopen(); err != nil {
t.Fatal(err)
}

// Snapshot the cache
if err := e.WriteSnapshot(); err != nil {
t.Fatalf("failed to snapshot: %s", err.Error())
}

// Verify series exist.
n, err = seriesExist(e, "cpu", []string{"host"})
if err != nil {
t.Fatal(err)
} else if got, exp := n, 2; got != exp {
t.Fatalf("got %d points, expected %d", got, exp)
}

// Delete the series
itr := &seriesIterator{keys: [][]byte{
[]byte("cpu,host=A"),
[]byte("cpu,host=B"),
},
}
if err := e.DeleteSeriesRange(itr, math.MinInt64, math.MaxInt64); err != nil {
t.Fatalf("failed to delete series: %s", err.Error())
}

// Verify the series are no longer present.
n, err = seriesExist(e, "cpu", []string{"host"})
if err != nil {
t.Fatal(err)
} else if got, exp := n, 0; got != exp {
t.Fatalf("got %d points, expected %d", got, exp)
}

// Simulate restart of server
if err := e.Reopen(); err != nil {
t.Fatal(err)
}

// Verify the series are no longer present.
n, err = seriesExist(e, "cpu", []string{"host"})
if err != nil {
t.Fatal(err)
} else if got, exp := n, 0; got != exp {
t.Fatalf("got %d points, expected %d", got, exp)
}
})
}
}

func seriesExist(e *Engine, m string, dims []string) (int, error) {
itr, err := e.CreateIterator(context.Background(), "cpu", query.IteratorOptions{
Expr: influxql.MustParseExpr(`value`),
Dimensions: []string{"host"},
StartTime: influxql.MinTime,
EndTime: influxql.MaxTime,
Ascending: false,
})
if err != nil {
return 0, err
} else if itr == nil {
return 0, nil
}
defer itr.Close()
fitr := itr.(query.FloatIterator)

var n int
for {
p, err := fitr.Next()
if err != nil {
return 0, err
} else if p == nil {
return n, nil
}
n++
}
}

// Ensure that the engine can write & read shard digest files.
func TestEngine_Digest(t *testing.T) {
e := MustOpenEngine(inmem.IndexName)
Expand Down

0 comments on commit a61e88a

Please sign in to comment.