Skip to content

Commit

Permalink
fix(storage): Fix issue where fields re-appear
Browse files Browse the repository at this point in the history
Fixes #10052

This commit fixes an issue where field keys would reappear in results
when querying previously dropped measurements.

The issue manifests itself when duplicates of a new series are inserted
into the `inmem` index. In this case, a map that tracks the number of
series belonging to a measurement was incorrectly incremented once for
each duplication of the series. Then, when it came time to drop the
measurement, the index assumed there were several series belonging to
the measurement left in the index (because the counter was higher than
it should be). The result of that was that the `fields.idx` file (which
stores a mapping between measurements and field keys) was not truncated
and rebuilt. This left old field keys in that file, which were then
returned in subsequent queries over all field keys.
  • Loading branch information
e-dard committed Jul 5, 2019
1 parent 7ca4e64 commit 9bfd111
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 5 deletions.
66 changes: 62 additions & 4 deletions tests/server_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,65 @@ func TestServer_Insert_Delete_1515777603585914810(t *testing.T) {
}
}

// This test reproduces the issue identified in https://github.com/influxdata/influxdb/issues/10052
func TestServer_Insert_Delete_10052(t *testing.T) {
t.Parallel()

s := OpenDefaultServer(NewConfig())
defer s.Close()

mustWrite(s,
"ping,server=ping a=1,b=2,c=3,d=4,e=5 1",
"ping,server=ping a=1,b=2,c=3,d=4,e=5 2",
"ping,server=ping a=1,b=2,c=3,d=4,e=5 3",
"ping,server=ping a=1,b=2,c=3,d=4,e=5 4",
"ping,server=ping a=1,b=2,c=3,d=4,e=5 5",
"ping,server=ping a=1,b=2,c=3,d=4,e=5 6",
)

mustDropMeasurement(s, "ping")
gotSeries := mustGetSeries(s)
expectedSeries := []string(nil)
if !reflect.DeepEqual(gotSeries, expectedSeries) {
t.Fatalf("got series %v, expected %v", gotSeries, expectedSeries)
}

mustWrite(s, "ping v=1 1")
gotSeries = mustGetSeries(s)
expectedSeries = []string{"ping"}
if !reflect.DeepEqual(gotSeries, expectedSeries) {
t.Fatalf("got series %v, expected %v", gotSeries, expectedSeries)
}

gotSeries = mustGetFieldKeys(s)
expectedSeries = []string{"v"}
if !reflect.DeepEqual(gotSeries, expectedSeries) {
t.Fatalf("got series %v, expected %v", gotSeries, expectedSeries)
}
}

func mustGetSeries(s Server) []string {
// Compare series left in index.
result, err := s.QueryWithParams("SHOW SERIES", url.Values{"db": []string{"db0"}})
if err != nil {
panic(err)
}

gotSeries, err := seriesFromShowSeries(result)
gotSeries, err := valuesFromShowQuery(result)
if err != nil {
panic(err)
}
return gotSeries
}

func mustGetFieldKeys(s Server) []string {
// Compare series left in index.
result, err := s.QueryWithParams("SHOW FIELD KEYS", url.Values{"db": []string{"db0"}})
if err != nil {
panic(err)
}

gotSeries, err := valuesFromShowQuery(result)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -318,6 +369,13 @@ func mustDelete(s Server, name string, min, max int64) {
}
}

func mustDropMeasurement(s Server, name string) {
query := fmt.Sprintf("DROP MEASUREMENT %q", name)
if _, err := s.QueryWithParams(query, url.Values{"db": []string{db}}); err != nil {
panic(err)
}
}

// SeriesTracker is a lockable tracker of which shards should own which series.
type SeriesTracker struct {
sync.RWMutex
Expand Down Expand Up @@ -544,7 +602,7 @@ func (s *SeriesTracker) Verify() error {
}

// Get all series...
gotSeries, err := seriesFromShowSeries(res)
gotSeries, err := valuesFromShowQuery(res)
if err != nil {
return err
}
Expand All @@ -561,9 +619,9 @@ func (s *SeriesTracker) Verify() error {
return nil
}

// seriesFromShowSeries extracts a lexicographically sorted set of series keys
// valuesFromShowQuery extracts a lexicographically sorted set of series keys
// from a SHOW SERIES query.
func seriesFromShowSeries(result string) ([]string, error) {
func valuesFromShowQuery(result string) ([]string, error) {
// Get all series...
var results struct {
Results []struct {
Expand Down
5 changes: 4 additions & 1 deletion tsdb/index/inmem/inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, measu
}

for j, key := range keys {
if seriesList[j] != nil {
// Note, keys may contain duplicates (e.g., because of points for the same series
// in the same batch). If the duplicate series are new, the index must
// be rechecked on each iteration.
if seriesList[j] != nil || i.series[string(key)] != nil {
continue
}

Expand Down

0 comments on commit 9bfd111

Please sign in to comment.