Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester: Validate completed blocks #4256

Merged
merged 5 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add test and fix replay
Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Nov 5, 2024
commit 57f2ce318366e4ccfe8d05e375715382b3db3dd7
92 changes: 92 additions & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/google/uuid"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
"github.com/grafana/tempo/tempodb/wal"
)

Expand Down Expand Up @@ -259,6 +261,96 @@ func TestSearchWAL(t *testing.T) {
require.Equal(t, uint32(1), results.Metrics.InspectedTraces)
}

func TestRediscoverLocalBlocks(t *testing.T) {
tmpDir := t.TempDir()

ctx := user.InjectOrgID(context.Background(), "test")
ingester, traces, traceIDs := defaultIngester(t, tmpDir)

// force cut all traces
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")
}

// force complete all blocks
for _, instance := range ingester.instances {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)
}

// create new ingester. this should rediscover local blocks
ingester, _, _ = defaultIngester(t, tmpDir)

// should be able to find old traces that were replayed
for i, traceID := range traceIDs {
foundTrace, err := ingester.FindTraceByID(ctx, &tempopb.TraceByIDRequest{
TraceID: traceID,
})
require.NoError(t, err, "unexpected error querying")
require.NotNil(t, foundTrace.Trace)
trace.SortTrace(foundTrace.Trace)
equal := proto.Equal(traces[i], foundTrace.Trace)
require.True(t, equal)
}
}

func TestRediscoverDropsInvalidBlocks(t *testing.T) {
tmpDir := t.TempDir()

ctx := user.InjectOrgID(context.Background(), "test")
ingester, _, _ := defaultIngester(t, tmpDir)

// force cut all traces
for _, instance := range ingester.instances {
err := instance.CutCompleteTraces(0, true)
require.NoError(t, err, "unexpected error cutting traces")
}

// force complete all blocks
for _, instance := range ingester.instances {
blockID, err := instance.CutBlockIfReady(0, 0, true)
require.NoError(t, err)

err = instance.CompleteBlock(context.Background(), blockID)
require.NoError(t, err)

err = instance.ClearCompletingBlock(blockID)
require.NoError(t, err)
}

// create new ingester. this should rediscover local blocks. there should be 1 block
ingester, _, _ = defaultIngester(t, tmpDir)

instance, ok := ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 1)

// now mangle a complete block
instance, ok = ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 1)

// this cheats by reaching into the internals of the block and overwriting the parquet file directly. if this test starts failing
// it could be b/c the block internals changed and this no longer breaks a block
block := instance.completeBlocks[0]
err := block.writer.Write(ctx, vparquet4.DataFileName, uuid.UUID(block.BlockMeta().BlockID), "test", []byte("mangled"), nil)
require.NoError(t, err)

// create new ingester. this should rediscover local blocks. there should be 0 blocks
ingester, _, _ = defaultIngester(t, tmpDir)

instance, ok = ingester.instances["test"]
require.True(t, ok)
require.Len(t, instance.completeBlocks, 0)
}

// TODO - This test is flaky and commented out until it's fixed
// TestWalReplayDeletesLocalBlocks simulates the condition where an ingester restarts after a wal is completed
// to the local disk, but before the wal is deleted. On startup both blocks exist, and the ingester now errs
Expand Down
5 changes: 4 additions & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,15 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*LocalBlock, er

// validate the block before adding it to the list. if we drop a block here and its not in the wal this is data loss, but there is no way to recover. this is likely due to disk
// level corruption
if err := b.Validate(ctx); err != nil {
err = b.Validate(ctx)
if err != nil && !errors.Is(err, common.ErrUnsupported) {
level.Error(log.Logger).Log("msg", "local block failed validation, dropping", "tenantID", i.instanceID, "block", id.String(), "error", err)
err = i.local.ClearBlock(id, i.instanceID)
if err != nil {
return nil, fmt.Errorf("deleting invalid local block tenant %v block %v: %w", i.instanceID, id.String(), err)
}

continue
}

ib := NewLocalBlock(ctx, b, i.local)
Expand Down
Loading