Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generify SharedBlobCacheService #93164

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.action.StepListener;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.blobcache.common.CacheKey;
import org.elasticsearch.blobcache.common.SparseFileTracker;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
Expand All @@ -34,7 +33,6 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.monitor.fs.FsProbe;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -59,7 +57,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class SharedBlobCacheService implements Releasable {
public class SharedBlobCacheService<KeyType> implements Releasable {

private static final String SHARED_CACHE_SETTINGS_PREFIX = "xpack.searchable.snapshot.shared_cache.";

Expand Down Expand Up @@ -230,11 +228,11 @@ public void validate(ByteSizeValue value, Map<Setting<?>, Object> settings, bool

private static final Logger logger = LogManager.getLogger(SharedBlobCacheService.class);

private final ConcurrentHashMap<RegionKey, Entry<CacheFileRegion>> keyMapping;
private final ConcurrentHashMap<RegionKey<KeyType>, Entry<CacheFileRegion>> keyMapping;

private final LongSupplier currentTimeSupplier;

private final KeyedLock<CacheKey> keyedLock = new KeyedLock<>();
private final KeyedLock<KeyType> keyedLock = new KeyedLock<>();

private final SharedBytes sharedBytes;
private final long cacheSize;
Expand Down Expand Up @@ -370,10 +368,10 @@ private long getRegionSize(long fileLength, int region) {
return effectiveRegionSize;
}

public CacheFileRegion get(CacheKey cacheKey, long fileLength, int region) {
public CacheFileRegion get(KeyType cacheKey, long fileLength, int region) {
final long effectiveRegionSize = getRegionSize(fileLength, region);
try (Releasable ignore = keyedLock.acquire(cacheKey)) {
final RegionKey regionKey = new RegionKey(cacheKey, region);
final RegionKey<KeyType> regionKey = new RegionKey<>(cacheKey, region);
final long now = currentTimeSupplier.getAsLong();
final Entry<CacheFileRegion> entry = keyMapping.computeIfAbsent(
regionKey,
Expand Down Expand Up @@ -560,17 +558,11 @@ private void computeDecay() {
}
}

public void removeFromCache(CacheKey cacheKey) {
public void removeFromCache(KeyType cacheKey) {
forceEvict(cacheKey::equals);
}

public void markShardAsEvictedInCache(String snapshotUUID, String snapshotIndexName, ShardId shardId) {
forceEvict(
k -> shardId.equals(k.shardId()) && snapshotIndexName.equals(k.snapshotIndexName()) && snapshotUUID.equals(k.snapshotUUID())
);
}

private void forceEvict(Predicate<CacheKey> cacheKeyPredicate) {
public void forceEvict(Predicate<KeyType> cacheKeyPredicate) {
final List<Entry<CacheFileRegion>> matchingEntries = new ArrayList<>();
keyMapping.forEach((key, value) -> {
if (cacheKeyPredicate.test(key.file)) {
Expand Down Expand Up @@ -628,7 +620,7 @@ public String toString() {
}
}

private record RegionKey(CacheKey file, int region) {
private record RegionKey<KeyType> (KeyType file, int region) {
@Override
public String toString() {
return "Chunk{" + "file=" + file + ", region=" + region + '}';
Expand All @@ -649,11 +641,11 @@ static class Entry<T> {
}

class CacheFileRegion extends AbstractRefCounted {
final RegionKey regionKey;
final RegionKey<KeyType> regionKey;
final SparseFileTracker tracker;
volatile int sharedBytesPos = -1;

CacheFileRegion(RegionKey regionKey, long regionSize) {
CacheFileRegion(RegionKey<KeyType> regionKey, long regionSize) {
this.regionKey = regionKey;
assert regionSize > 0L;
tracker = new SparseFileTracker("file", regionSize);
Expand Down Expand Up @@ -817,10 +809,10 @@ protected void alreadyClosed() {

public class CacheFile {

private final CacheKey cacheKey;
private final KeyType cacheKey;
private final long length;

private CacheFile(CacheKey cacheKey, long length) {
private CacheFile(KeyType cacheKey, long length) {
this.cacheKey = cacheKey;
this.length = length;
}
Expand All @@ -829,7 +821,7 @@ public long getLength() {
return length;
}

public CacheKey getCacheKey() {
public KeyType getCacheKey() {
return cacheKey;
}

Expand Down Expand Up @@ -890,7 +882,7 @@ public String toString() {
}
}

public CacheFile getFrozenCacheFile(CacheKey cacheKey, long length) {
public CacheFile getFrozenCacheFile(KeyType cacheKey, long length) {
return new CacheFile(cacheKey, length);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,8 @@
import org.apache.lucene.tests.mockfile.FilterFileSystemProvider;
import org.apache.lucene.tests.mockfile.FilterPath;
import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.blobcache.common.CacheFile;
import org.elasticsearch.common.TriConsumer;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.PathUtilsForTesting;
import org.hamcrest.MatcherAssert;

import java.io.IOException;
import java.nio.channels.FileChannel;
Expand All @@ -25,51 +21,19 @@
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.spi.FileSystemProvider;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.synchronizedNavigableSet;
import static org.elasticsearch.test.ESTestCase.between;
import static org.elasticsearch.test.ESTestCase.randomLongBetween;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertTrue;

public final class BlobCacheTestUtils {
private BlobCacheTestUtils() {}

public static SortedSet<ByteRange> randomPopulateAndReads(final CacheFile cacheFile) {
return randomPopulateAndReads(cacheFile, (fileChannel, aLong, aLong2) -> {});
}

public static SortedSet<ByteRange> randomPopulateAndReads(CacheFile cacheFile, TriConsumer<FileChannel, Long, Long> consumer) {
final SortedSet<ByteRange> ranges = synchronizedNavigableSet(new TreeSet<>());
final List<Future<Integer>> futures = new ArrayList<>();
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue();
for (int i = 0; i < between(0, 10); i++) {
final long start = randomLongBetween(0L, Math.max(0L, cacheFile.getLength() - 1L));
final long end = randomLongBetween(Math.min(start + 1L, cacheFile.getLength()), cacheFile.getLength());
final ByteRange range = ByteRange.of(start, end);
futures.add(
cacheFile.populateAndRead(range, range, channel -> Math.toIntExact(end - start), (channel, from, to, progressUpdater) -> {
consumer.apply(channel, from, to);
ranges.add(ByteRange.of(from, to));
progressUpdater.accept(to);
}, deterministicTaskQueue.getThreadPool().generic())
);
}
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(futures.stream().allMatch(Future::isDone));
return mergeContiguousRanges(ranges);
}

public static long numberOfRanges(long fileSize, long rangeSize) {
return numberOfRanges(BlobCacheUtils.toIntBytes(fileSize), BlobCacheUtils.toIntBytes(rangeSize));
}
Expand Down Expand Up @@ -127,17 +91,6 @@ public static SortedSet<ByteRange> mergeContiguousRanges(final SortedSet<ByteRan
});
}

public static void assertCacheFileEquals(CacheFile expected, CacheFile actual) {
MatcherAssert.assertThat(actual.getLength(), equalTo(expected.getLength()));
MatcherAssert.assertThat(actual.getFile(), equalTo(expected.getFile()));
MatcherAssert.assertThat(actual.getCacheKey(), equalTo(expected.getCacheKey()));
MatcherAssert.assertThat(actual.getCompletedRanges(), equalTo(expected.getCompletedRanges()));
}

public static long sumOfCompletedRangesLengths(CacheFile cacheFile) {
return cacheFile.getCompletedRanges().stream().mapToLong(ByteRange::length).sum();
}

/**
* A {@link FileSystemProvider} that counts the number of times the method {@link FileChannel#force(boolean)} is executed on every
* files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
package org.elasticsearch.blobcache.shared;

import org.elasticsearch.blobcache.common.ByteRange;
import org.elasticsearch.blobcache.common.CacheKey;
import org.elasticsearch.blobcache.shared.SharedBlobCacheService.CacheFileRegion;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -23,7 +21,6 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -56,17 +53,17 @@ public void testBasicEviction() throws IOException {
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
SharedBlobCacheService cacheService = new SharedBlobCacheService(environment, settings, taskQueue.getThreadPool())
var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool())
) {
final CacheKey cacheKey = generateCacheKey();
final var cacheKey = generateCacheKey();
assertEquals(5, cacheService.freeRegionCount());
final CacheFileRegion region0 = cacheService.get(cacheKey, size(250), 0);
final var region0 = cacheService.get(cacheKey, size(250), 0);
assertEquals(size(100), region0.tracker.getLength());
assertEquals(4, cacheService.freeRegionCount());
final CacheFileRegion region1 = cacheService.get(cacheKey, size(250), 1);
final var region1 = cacheService.get(cacheKey, size(250), 1);
assertEquals(size(100), region1.tracker.getLength());
assertEquals(3, cacheService.freeRegionCount());
final CacheFileRegion region2 = cacheService.get(cacheKey, size(250), 2);
final var region2 = cacheService.get(cacheKey, size(250), 2);
assertEquals(size(50), region2.tracker.getLength());
assertEquals(2, cacheService.freeRegionCount());

Expand Down Expand Up @@ -101,21 +98,21 @@ public void testAutoEviction() throws IOException {
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
SharedBlobCacheService cacheService = new SharedBlobCacheService(environment, settings, taskQueue.getThreadPool())
var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool())
) {
final CacheKey cacheKey = generateCacheKey();
final var cacheKey = generateCacheKey();
assertEquals(2, cacheService.freeRegionCount());
final CacheFileRegion region0 = cacheService.get(cacheKey, size(250), 0);
final var region0 = cacheService.get(cacheKey, size(250), 0);
assertEquals(size(100), region0.tracker.getLength());
assertEquals(1, cacheService.freeRegionCount());
final CacheFileRegion region1 = cacheService.get(cacheKey, size(250), 1);
final var region1 = cacheService.get(cacheKey, size(250), 1);
assertEquals(size(100), region1.tracker.getLength());
assertEquals(0, cacheService.freeRegionCount());
assertFalse(region0.isEvicted());
assertFalse(region1.isEvicted());

// acquire region 2, which should evict region 0 (oldest)
final CacheFileRegion region2 = cacheService.get(cacheKey, size(250), 2);
final var region2 = cacheService.get(cacheKey, size(250), 2);
assertEquals(size(50), region2.tracker.getLength());
assertEquals(0, cacheService.freeRegionCount());
assertTrue(region0.isEvicted());
Expand All @@ -137,14 +134,14 @@ public void testForceEviction() throws IOException {
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
SharedBlobCacheService cacheService = new SharedBlobCacheService(environment, settings, taskQueue.getThreadPool())
var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool())
) {
final CacheKey cacheKey1 = generateCacheKey();
final CacheKey cacheKey2 = generateCacheKey();
final var cacheKey1 = generateCacheKey();
final var cacheKey2 = generateCacheKey();
assertEquals(5, cacheService.freeRegionCount());
final CacheFileRegion region0 = cacheService.get(cacheKey1, size(250), 0);
final var region0 = cacheService.get(cacheKey1, size(250), 0);
assertEquals(4, cacheService.freeRegionCount());
final CacheFileRegion region1 = cacheService.get(cacheKey2, size(250), 1);
final var region1 = cacheService.get(cacheKey2, size(250), 1);
assertEquals(3, cacheService.freeRegionCount());
assertFalse(region0.isEvicted());
assertFalse(region1.isEvicted());
Expand All @@ -165,14 +162,14 @@ public void testDecay() throws IOException {
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
SharedBlobCacheService cacheService = new SharedBlobCacheService(environment, settings, taskQueue.getThreadPool())
var cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool())
) {
final CacheKey cacheKey1 = generateCacheKey();
final CacheKey cacheKey2 = generateCacheKey();
final var cacheKey1 = generateCacheKey();
final var cacheKey2 = generateCacheKey();
assertEquals(5, cacheService.freeRegionCount());
final CacheFileRegion region0 = cacheService.get(cacheKey1, size(250), 0);
final var region0 = cacheService.get(cacheKey1, size(250), 0);
assertEquals(4, cacheService.freeRegionCount());
final CacheFileRegion region1 = cacheService.get(cacheKey2, size(250), 1);
final var region1 = cacheService.get(cacheKey2, size(250), 1);
assertEquals(3, cacheService.freeRegionCount());

assertEquals(0, cacheService.getFreq(region0));
Expand All @@ -181,7 +178,7 @@ public void testDecay() throws IOException {
taskQueue.advanceTime();
taskQueue.runAllRunnableTasks();

final CacheFileRegion region0Again = cacheService.get(cacheKey1, size(250), 0);
final var region0Again = cacheService.get(cacheKey1, size(250), 0);
assertSame(region0Again, region0);
assertEquals(1, cacheService.getFreq(region0));
assertEquals(0, cacheService.getFreq(region1));
Expand Down Expand Up @@ -342,13 +339,8 @@ public void testCalculateCacheSize() {
assertThat(SharedBlobCacheService.calculateCacheSize(settings, largeSize), equalTo(largeSize - ByteSizeValue.ofGb(100).getBytes()));
}

private static CacheKey generateCacheKey() {
return new CacheKey(
randomAlphaOfLength(10),
randomAlphaOfLength(10),
new ShardId(randomAlphaOfLength(10), randomAlphaOfLength(10), randomInt(10)),
randomAlphaOfLength(10)
);
private static Object generateCacheKey() {
return new Object();
}

public void testCacheSizeChanges() throws IOException {
Expand All @@ -362,7 +354,7 @@ public void testCacheSizeChanges() throws IOException {
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
SharedBlobCacheService cacheService = new SharedBlobCacheService(environment, settings, taskQueue.getThreadPool())
SharedBlobCacheService<?> cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool())
) {
assertEquals(val1.getBytes(), cacheService.getStats().size());
}
Expand All @@ -374,7 +366,7 @@ public void testCacheSizeChanges() throws IOException {
.build();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
SharedBlobCacheService cacheService = new SharedBlobCacheService(environment, settings, taskQueue.getThreadPool())
SharedBlobCacheService<?> cacheService = new SharedBlobCacheService<>(environment, settings, taskQueue.getThreadPool())
) {
assertEquals(val2.getBytes(), cacheService.getStats().size());
}
Expand Down
Loading