diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9b33a17d72b..528ac8d3007 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -103,6 +103,7 @@ querier:
 * [BUGFIX] Initialize histogram buckets to 0 to avoid downsampling. [#4366](https://github.com/grafana/tempo/pull/4366) (@javiermolinar)
 * [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#4259](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
 * [BUGFIX] Fixed an issue in the generator where the first batch was counted 2x against a traces size. [#4365](https://github.com/grafana/tempo/pull/4365) (@joe-elliott)
+* [BUGFIX] Unstable compactors can occassionally duplicate data. Check for job ownership during compaction and cancel a job if ownership changes. [#4420](https://github.com/grafana/tempo/pull/4420) (@joe-elliott)
 
 # v2.6.1
 
diff --git a/tempodb/compactor.go b/tempodb/compactor.go
index 517844cf126..0d0aa926efb 100644
--- a/tempodb/compactor.go
+++ b/tempodb/compactor.go
@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"sort"
 	"strconv"
+	"strings"
 	"time"
 
 	"github.com/go-kit/log/level"
@@ -70,6 +71,8 @@ var (
 		Name:      "compaction_spans_combined_total",
 		Help:      "Number of spans that are deduped per replication factor.",
 	}, []string{"replication_factor"})
+
+	errCompactionJobNoLongerOwned = fmt.Errorf("compaction job no longer owned")
 )
 
 func (rw *readerWriter) compactionLoop(ctx context.Context) {
@@ -81,23 +84,14 @@ func (rw *readerWriter) compactionLoop(ctx context.Context) {
 	ticker := time.NewTicker(compactionCycle)
 	defer ticker.Stop()
 	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-		}
-
-		select {
-		case <-ticker.C:
-			rw.doCompaction(ctx)
-		case <-ctx.Done():
-			return
-		}
+		doForAtLeast(ctx, compactionCycle, func() {
+			rw.compactOneTenant(ctx)
+		})
 	}
 }
 
-// doCompaction runs a compaction cycle every 30s
-func (rw *readerWriter) doCompaction(ctx context.Context) {
+// compactOneTenant runs a compaction cycle every 30s
+func (rw *readerWriter) compactOneTenant(ctx context.Context) {
 	// List of all tenants in the block list
 	// The block list is updated by constant polling the storage for tenant indexes and/or tenant blocks (and building the index)
 	tenants := rw.blocklist.Tenants()
@@ -142,51 +136,102 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
 
 	start := time.Now()
 
-	level.Debug(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset)
+	level.Info(rw.logger).Log("msg", "starting compaction cycle", "tenantID", tenantID, "offset", rw.compactorTenantOffset)
 	for {
-		select {
-		case <-ctx.Done():
+		// this context is controlled by the service manager. it being cancelled means that the process is shutting down
+		if ctx.Err() != nil {
+			level.Info(rw.logger).Log("msg", "caught context cancelled at the top of the compaction loop. bailing.", "err", ctx.Err(), "cause", context.Cause(ctx))
+			return
+		}
+
+		// Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one
+		toBeCompacted, hashString := blockSelector.BlocksToCompact()
+		if len(toBeCompacted) == 0 {
+			measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
+
+			level.Info(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID)
+			return
+		}
+
+		owns := func() bool {
+			return rw.compactorSharder.Owns(hashString)
+		}
+		if !owns() {
+			// continue on this tenant until we find something we own
+			continue
+		}
+
+		level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString)
+		err := rw.compactWhileOwns(ctx, toBeCompacted, tenantID, owns)
+
+		if errors.Is(err, backend.ErrDoesNotExist) {
+			level.Warn(rw.logger).Log("msg", "unable to find meta during compaction. trying again on this block list", "err", err)
+		} else if err != nil {
+			level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err)
+			metricCompactionErrors.Inc()
+		}
+
+		// after a maintenance cycle bail out
+		if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) {
+			measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
+
+			level.Info(rw.logger).Log("msg", "compacted blocks for a maintenance cycle, bailing out", "tenantID", tenantID)
 			return
-		default:
-			// Pick up to defaultMaxInputBlocks (4) blocks to compact into a single one
-			toBeCompacted, hashString := blockSelector.BlocksToCompact()
-			if len(toBeCompacted) == 0 {
-				measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
+		}
+	}
+}
+
+func (rw *readerWriter) compactWhileOwns(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string, owns func() bool) error {
+	ownsCtx, cancel := context.WithCancelCause(ctx)
+
+	done := make(chan struct{})
+	defer close(done)
+
+	// every second test if we still own the job. if we don't then cancel the context with a cause
+	// that we can then test for
+	go func() {
+		ticker := time.NewTicker(1 * time.Second)
+		defer ticker.Stop()
 
-				level.Debug(rw.logger).Log("msg", "compaction cycle complete. No more blocks to compact", "tenantID", tenantID)
+		for {
+			if !owns() {
+				cancel(errCompactionJobNoLongerOwned)
 				return
 			}
-			if !rw.compactorSharder.Owns(hashString) {
-				// continue on this tenant until we find something we own
-				continue
-			}
-			level.Info(rw.logger).Log("msg", "Compacting hash", "hashString", hashString)
-			// Compact selected blocks into a larger one
-			err := rw.compact(ctx, toBeCompacted, tenantID)
-
-			if errors.Is(err, backend.ErrDoesNotExist) {
-				level.Warn(rw.logger).Log("msg", "unable to find meta during compaction.  trying again on this block list", "err", err)
-			} else if err != nil {
-				level.Error(rw.logger).Log("msg", "error during compaction cycle", "err", err)
-				metricCompactionErrors.Inc()
-			}
 
-			if !rw.compactorSharder.Owns(hashString) {
-				level.Warn(rw.logger).Log("msg", "compaction complete but we no longer own the hash", "hashString", hashString)
+			select {
+			case <-ticker.C:
+			case <-done:
+				return
+			case <-ownsCtx.Done():
+				return
 			}
+		}
+	}()
 
-			// after a maintenance cycle bail out
-			if start.Add(rw.compactorCfg.MaxTimePerTenant).Before(time.Now()) {
-				measureOutstandingBlocks(tenantID, blockSelector, rw.compactorSharder.Owns)
+	err := rw.compactOneJob(ownsCtx, blockMetas, tenantID)
+	if errors.Is(err, context.Canceled) && errors.Is(context.Cause(ownsCtx), errCompactionJobNoLongerOwned) {
+		level.Warn(rw.logger).Log("msg", "lost ownership of this job. abandoning job and trying again on this block list", "err", err)
+		return nil
+	}
 
-				level.Info(rw.logger).Log("msg", "compacted blocks for a maintenance cycle, bailing out", "tenantID", tenantID)
-				return
-			}
+	// test to see if we still own this job. it would be exceptional to log this message, but would be nice to know. a more likely bad case is that
+	// job ownership changes but that change has not yet propagated to this compactor, so it duplicated data w/o realizing it.
+	if !owns() {
+		// format a string with all input metas
+		sb := &strings.Builder{}
+		for _, meta := range blockMetas {
+			sb.WriteString(meta.BlockID.String())
+			sb.WriteString(", ")
 		}
+
+		level.Error(rw.logger).Log("msg", "lost ownership of this job after compaction. possible data duplication", "tenant", tenantID, "input_blocks", sb.String())
 	}
+
+	return err
 }
 
-func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error {
+func (rw *readerWriter) compactOneJob(ctx context.Context, blockMetas []*backend.BlockMeta, tenantID string) error {
 	level.Debug(rw.logger).Log("msg", "beginning compaction", "num blocks compacting", len(blockMetas))
 
 	// todo - add timeout?
@@ -380,3 +425,22 @@ func (i instrumentedObjectCombiner) Combine(dataEncoding string, objs ...[]byte)
 	}
 	return b, wasCombined, err
 }
+
+// doForAtLeast executes the function f. It blocks for at least the passed duration but can go longer. if context is cancelled after
+// the function is done we will bail immediately. in the current use case this means that the process is shutting down
+// we don't force f() to cancel, we assume it also responds to the cancelled context
+func doForAtLeast(ctx context.Context, dur time.Duration, f func()) {
+	startTime := time.Now()
+	f()
+	elapsed := time.Since(startTime)
+
+	if elapsed < dur {
+		ticker := time.NewTicker(dur - elapsed)
+		defer ticker.Stop()
+
+		select {
+		case <-ticker.C:
+		case <-ctx.Done():
+		}
+	}
+}
diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go
index 0cf76ee4230..13532c5f6f8 100644
--- a/tempodb/compactor_test.go
+++ b/tempodb/compactor_test.go
@@ -173,7 +173,7 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) {
 		require.Len(t, blocks, inputBlocks)
 
 		compactions++
-		err := rw.compact(context.Background(), blocks, testTenantID)
+		err := rw.compactOneJob(context.Background(), blocks, testTenantID)
 		require.NoError(t, err)
 
 		expectedBlockCount -= blocksPerCompaction
@@ -336,7 +336,7 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) {
 	combinedStart, err := test.GetCounterVecValue(metricCompactionObjectsCombined, "0")
 	require.NoError(t, err)
 
-	err = rw.compact(ctx, blocks, testTenantID)
+	err = rw.compactOneJob(ctx, blocks, testTenantID)
 	require.NoError(t, err)
 
 	checkBlocklists(t, uuid.Nil, 1, blockCount, rw)
@@ -420,7 +420,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
 	rw.pollBlocklist()
 
 	// compact everything
-	err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
+	err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
 	require.NoError(t, err)
 
 	// New blocklist contains 1 compacted block with everything
@@ -501,7 +501,7 @@ func TestCompactionMetrics(t *testing.T) {
 	assert.NoError(t, err)
 
 	// compact everything
-	err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
+	err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
 	assert.NoError(t, err)
 
 	// Check metric
@@ -570,12 +570,12 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {
 
 	// Verify that tenant 2 compacted, tenant 1 is not
 	// Compaction starts at index 1 for simplicity
-	rw.doCompaction(ctx)
+	rw.compactOneTenant(ctx)
 	assert.Equal(t, 2, len(rw.blocklist.Metas(testTenantID)))
 	assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID2)))
 
 	// Verify both tenants compacted after second run
-	rw.doCompaction(ctx)
+	rw.compactOneTenant(ctx)
 	assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID)))
 	assert.Equal(t, 1, len(rw.blocklist.Metas(testTenantID2)))
 }
@@ -643,7 +643,7 @@ func testCompactionHonorsBlockStartEndTimes(t *testing.T, targetBlockVersion str
 	rw.pollBlocklist()
 
 	// compact everything
-	err = rw.compact(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
+	err = rw.compactOneJob(ctx, rw.blocklist.Metas(testTenantID), testTenantID)
 	require.NoError(t, err)
 
 	time.Sleep(100 * time.Millisecond)
@@ -778,6 +778,29 @@ func testCompactionDropsTraces(t *testing.T, targetBlockVersion string) {
 	}
 }
 
+func TestDoForAtLeast(t *testing.T) {
+	// test that it runs for at least the duration
+	start := time.Now()
+	doForAtLeast(context.Background(), time.Second, func() { time.Sleep(time.Millisecond) })
+	require.WithinDuration(t, time.Now(), start.Add(time.Second), 100*time.Millisecond)
+
+	// test that it allows func to overrun
+	start = time.Now()
+	doForAtLeast(context.Background(), time.Second, func() { time.Sleep(2 * time.Second) })
+	require.WithinDuration(t, time.Now(), start.Add(2*time.Second), 100*time.Millisecond)
+
+	// make sure cancelling the context stops the function if the function is complete and we're
+	// just waiting. it is presumed, but not enforced that the function responds to a cancelled contxt
+	ctx, cancel := context.WithCancel(context.Background())
+	start = time.Now()
+	go func() {
+		time.Sleep(time.Second)
+		cancel()
+	}()
+	doForAtLeast(ctx, 2*time.Second, func() {})
+	require.WithinDuration(t, time.Now(), start.Add(time.Second), 100*time.Millisecond)
+}
+
 type testData struct {
 	id         common.ID
 	t          *tempopb.Trace
@@ -894,6 +917,6 @@ func benchmarkCompaction(b *testing.B, targetBlockVersion string) {
 
 	b.ResetTimer()
 
-	err = rw.compact(ctx, metas, testTenantID)
+	err = rw.compactOneJob(ctx, metas, testTenantID)
 	require.NoError(b, err)
 }
diff --git a/tempodb/retention.go b/tempodb/retention.go
index fd7fd667cfd..bffcafadefa 100644
--- a/tempodb/retention.go
+++ b/tempodb/retention.go
@@ -12,6 +12,8 @@ import (
 )
 
 // retentionLoop watches a timer to clean up blocks that are past retention.
+// todo: correctly pass context all the way to the backend so a cancelled context can stop the retention loop.
+// see implementation of compactionLoop()
 func (rw *readerWriter) retentionLoop(ctx context.Context) {
 	ticker := time.NewTicker(rw.cfg.BlocklistPoll)
 	for {
diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go
index d47374f9047..0d993a675aa 100644
--- a/tempodb/tempodb_test.go
+++ b/tempodb/tempodb_test.go
@@ -547,7 +547,7 @@ func TestSearchCompactedBlocks(t *testing.T) {
 	// compact
 	var blockMetas []*backend.BlockMeta
 	blockMetas = append(blockMetas, complete.BlockMeta())
-	require.NoError(t, rw.compact(ctx, blockMetas, testTenantID))
+	require.NoError(t, rw.compactOneJob(ctx, blockMetas, testTenantID))
 
 	// poll
 	rw.pollBlocklist()