Skip to content

Commit

Permalink
Merge pull request #109 from upserve/madvise_jnr
Browse files Browse the repository at this point in the history
Madvise jnr
  • Loading branch information
dstuebe authored Dec 18, 2019
2 parents a5cbe7f + 98b53ec commit bdc272a
Show file tree
Hide file tree
Showing 17 changed files with 273 additions and 65 deletions.
10 changes: 8 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ if (new File('.git').exists() && (exec {
version "unspecified"
}


group 'com.upserve'

description = """Uppend: fast, append-only key-multivalue store"""

// TODO unused-dependency is broken - claims all dependencies are unused!
Expand All @@ -41,6 +39,7 @@ apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'jacoco'
apply plugin: 'com.bmuschko.nexus'
apply plugin: 'c'

jacoco {
toolVersion = "0.8.2" // Fixed to resolve issue with JDK 11 in Gradle 4.X.Y
Expand All @@ -59,6 +58,7 @@ dependencies {
compile 'info.picocli:picocli:4.0.1'
compile 'io.dropwizard.metrics:metrics-core:3.2.3'
compile 'it.unimi.dsi:fastutil:7.0.13'
compile 'com.github.jnr:jnr-ffi:2.1.1'
// compile 'me.lemire.integercompression:JavaFastPFOR:0.1.11'
compile 'org.slf4j:slf4j-api:1.7.22'

Expand All @@ -68,7 +68,9 @@ dependencies {
}

tasks.withType(JavaCompile) {

options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" << "-Werror"
//options.verbose = true
}

sourceSets {
Expand All @@ -89,6 +91,8 @@ processResources {
}
}


// TODO include the cross compiled nativeIO libs as a resource. Unpack and load from jar!
task fatJar(type: Jar) {
dependencies {
compile 'org.apache.logging.log4j:log4j-core:2.8'
Expand All @@ -103,6 +107,8 @@ task fatJar(type: Jar) {
}

tasks.withType(Test) {
maxHeapSize = "2048m"

// From https://stackoverflow.com/a/36130467/2136991
testLogging {
// set options for log level LIFECYCLE
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-all.zip
1 change: 0 additions & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
rootProject.name = 'uppend'

29 changes: 26 additions & 3 deletions src/main/java/com/upserve/uppend/AppendOnlyStoreBuilder.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package com.upserve.uppend;

import com.upserve.uppend.blobs.NativeIO;
import com.upserve.uppend.metrics.*;

public class AppendOnlyStoreBuilder extends FileStoreBuilder<AppendOnlyStoreBuilder> {
// Blocked Longs Config Options
public static final int DEFAULT_BLOBS_PER_BLOCK = 127;

private int blobsPerBlock = DEFAULT_BLOBS_PER_BLOCK;

// Blob Cache Options
public static final int DEFAULT_BLOB_PAGE_SIZE = 4 * 1024 * 1024;

public static final int DEFAULT_BLOB_PAGE_SIZE = NativeIO.pageSize * 1024;
private int blobPageSize = DEFAULT_BLOB_PAGE_SIZE;


public static final boolean DEFAULT_CACHE_BUFFERS = true; // Defaults to madvise normal LRU like page cache behavior
private boolean cacheBuffers = DEFAULT_CACHE_BUFFERS;

private BlobStoreMetrics.Adders blobStoreMetricsAdders = new BlobStoreMetrics.Adders();
private BlockedLongMetrics.Adders blockedLongMetricsAdders = new BlockedLongMetrics.Adders();

Expand All @@ -24,10 +27,23 @@ public AppendOnlyStoreBuilder withBlobsPerBlock(int blobsPerBlock) {

// Blob Options
public AppendOnlyStoreBuilder withBlobPageSize(int blobPageSize) {
if (blobPageSize % NativeIO.pageSize != 0) {
throw new IllegalArgumentException(
String.format(
"Illegal blobPageSize %d; Must be a multiple of the host system page size: %d",
blobPageSize, NativeIO.pageSize
)
);
}
this.blobPageSize = blobPageSize;
return this;
}

public AppendOnlyStoreBuilder withCacheBuffers(boolean cacheBuffers) {
this.cacheBuffers = cacheBuffers;
return this;
}

public AppendOnlyStore build() {
return build(false);
}
Expand All @@ -54,11 +70,18 @@ public int getBlobPageSize() {

public BlockedLongMetrics.Adders getBlockedLongMetricsAdders() { return blockedLongMetricsAdders; }

public boolean getCacheBuffers() {
return cacheBuffers;
}

@Override
public String toString() {
return "AppendOnlyStoreBuilder{" +
"blobsPerBlock=" + blobsPerBlock +
", blobPageSize=" + blobPageSize +
", cacheBuffers=" + cacheBuffers +
", blobStoreMetricsAdders=" + blobStoreMetricsAdders +
", blockedLongMetricsAdders=" + blockedLongMetricsAdders +
'}' + super.toString();
}
}
4 changes: 3 additions & 1 deletion src/main/java/com/upserve/uppend/AppendStorePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ static AppendStorePartition openPartition(Path parentDir, String partition, bool
builder.getBlockedLongMetricsAdders()
);

// Allow control of caching buffers only for large blob content
VirtualPageFile blobs = new VirtualPageFile(
blobsFile(partitionDir),
builder.getLookupHashCount(),
builder.getBlobPageSize(),
builder.getTargetBufferSize(),
readOnly
readOnly,
builder.getCacheBuffers()
);
VirtualPageFile metadata = new VirtualPageFile(
metadataPath(partitionDir),
Expand Down
38 changes: 18 additions & 20 deletions src/main/java/com/upserve/uppend/BlockedLongs.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.upserve.uppend;

import com.google.common.util.concurrent.Striped;
import com.upserve.uppend.blobs.NativeIO;
import com.upserve.uppend.metrics.*;
import org.slf4j.Logger;

Expand All @@ -24,7 +25,7 @@ public class BlockedLongs implements AutoCloseable, Flushable {
private static final int PAGE_SIZE = 128 * 1024 * 1024; // allocate 128 MB chunks
private static final int MAX_PAGES = 32 * 1024; // max 4 TB

static final int HEADER_BYTES = 128; // Currently 16 used for file size and append count
static final int HEADER_BYTES = NativeIO.pageSize; // Currently 16 used for file size and append count
private static final int posBufPosition = 0;
private static final int appendBufPosition = 8;

Expand Down Expand Up @@ -57,6 +58,15 @@ public class BlockedLongs implements AutoCloseable, Flushable {
throw new IllegalArgumentException("null file");
}

if (PAGE_SIZE % NativeIO.pageSize != 0) {
throw new IllegalArgumentException(
String.format(
"The BlockedLong PAGE SIZE %d is not a multiple of the system page size %d on this OS",
PAGE_SIZE, NativeIO.pageSize
)
);
}

this.file = file;
this.readOnly = readOnly;
this.blockedLongMetricsAdders = blockedLongMetricsAdders;
Expand Down Expand Up @@ -100,6 +110,7 @@ public class BlockedLongs implements AutoCloseable, Flushable {

try {
posBuf = blocks.map(readOnly ? FileChannel.MapMode.READ_ONLY : FileChannel.MapMode.READ_WRITE, posBufPosition, 8);
NativeIO.madvise(posBuf, NativeIO.Advice.WillNeed); // Will include the first few blocks
} catch (IOException e) {
throw new UncheckedIOException("Unable to map pos buffer at in " + file, e);
}
Expand Down Expand Up @@ -129,6 +140,7 @@ else if (pos < HEADER_BYTES) {

try {
appendCountBuf = blocks.map(readOnly ? FileChannel.MapMode.READ_ONLY : FileChannel.MapMode.READ_WRITE, appendBufPosition, 8);
NativeIO.madvise(appendCountBuf, NativeIO.Advice.WillNeed);
} catch (IOException e) {
throw new UncheckedIOException("Unable to map pos buffer at in " + file, e);
}
Expand Down Expand Up @@ -420,32 +432,17 @@ public void clear() {
public void close() throws IOException {
log.debug("closing {}", file);

if (readOnly) {
blocks.close();
return;
}
Arrays.fill(pages, null);

IntStream.range(0, LOCK_SIZE).forEach(index -> stripedLocks.getAt(index).lock());
try {
flush();
blocks.close();
} finally {
IntStream.range(0, LOCK_SIZE).forEach(index -> stripedLocks.getAt(index).unlock());
}
flush();
blocks.close();
}

@Override
public void flush() {
if (readOnly) return;
log.debug("flushing {}", file);
posBuf.force();
appendCountBuf.putLong(0, initialAppendCount + appendCounter.sum());
appendCountBuf.force();

Arrays.stream(pages)
.parallel()
.filter(Objects::nonNull)
.forEach(MappedByteBuffer::force);

log.debug("flushed {}", file);
}
Expand Down Expand Up @@ -487,7 +484,7 @@ private MappedByteBuffer page(long pos) {
private void preloadPage(int pageIndex) {
if (pageIndex < MAX_PAGES && pages[pageIndex] == null) {
// preload page
int prev = currentPage.getAndUpdate(current -> current < pageIndex ? pageIndex : current);
int prev = currentPage.getAndUpdate(current -> Math.max(pageIndex, current));
if (prev < pageIndex) {
ensurePage(pageIndex);
}
Expand All @@ -504,6 +501,7 @@ private MappedByteBuffer ensurePage(int pageIndex) {
try {
FileChannel.MapMode mapMode = readOnly ? FileChannel.MapMode.READ_ONLY : FileChannel.MapMode.READ_WRITE;
page = blocks.map(mapMode, pageStart, PAGE_SIZE);
// Could experiment with advise_random to reduce memory use or advise_willneed to hold more in page cache?
} catch (IOException e) {
throw new UncheckedIOException("unable to map page at page index " + pageIndex + " (" + pageStart + " + " + PAGE_SIZE + ") in " + file, e);
}
Expand Down
21 changes: 19 additions & 2 deletions src/main/java/com/upserve/uppend/FileStoreBuilder.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.upserve.uppend;

import com.codahale.metrics.MetricRegistry;
import com.upserve.uppend.blobs.NativeIO;
import com.upserve.uppend.metrics.*;

import java.nio.file.Path;
Expand All @@ -10,11 +11,11 @@ public class FileStoreBuilder<T extends FileStoreBuilder<T>> {
// Long lookup Cache Options
public static final int DEFAULT_PARTITION_COUNT = 0;
public static final int DEFAULT_LOOKUP_HASH_COUNT = 256;
public static final int DEFAULT_LOOKUP_PAGE_SIZE = 256 * 1024;
public static final int DEFAULT_LOOKUP_PAGE_SIZE = NativeIO.pageSize * 64;

public static final int TARGET_PRODUCTION_BUFFER_SIZE = Integer.MAX_VALUE;

public static final int DEFAULT_METADATA_PAGE_SIZE = 4096;
public static final int DEFAULT_METADATA_PAGE_SIZE = NativeIO.pageSize;
public static final int DEFAULT_METADATA_TTL = 0; // Off by default!

private String storeName = "";
Expand Down Expand Up @@ -56,12 +57,28 @@ public T withLongLookupHashCount(int longLookupHashCount) {

@SuppressWarnings("unchecked")
public T withLookupPageSize(int lookupPageSize) {
if (lookupPageSize % NativeIO.pageSize != 0) {
throw new IllegalArgumentException(
String.format(
"Illegal lookupPageSize %d; Must be a multiple of the host system page size: %d",
lookupPageSize, NativeIO.pageSize
)
);
}
this.lookupPageSize = lookupPageSize;
return (T) this;
}

@SuppressWarnings("unchecked")
public T withMetadataPageSize(int metadataPageSize) {
if (metadataPageSize % NativeIO.pageSize != 0){
throw new IllegalArgumentException(
String.format(
"Illegal metadataPageSize %d; Must be a multiple of the host system page size: %d",
metadataPageSize, NativeIO.pageSize
)
);
}
this.metadataPageSize = metadataPageSize;
return (T) this;
}
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/com/upserve/uppend/blobs/FilePage.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,10 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import static java.lang.Integer.min;

/**
* File backed implementation of Page
*/
public class FilePage implements Page {

private final FileChannel channel;
private final int pageSize;
private final long pageStart;
Expand All @@ -25,7 +22,6 @@ public class FilePage implements Page {
this.channel = channel;
this.pageStart = pageStart;
this.pageSize = pageSize;

}

@Override
Expand Down
61 changes: 61 additions & 0 deletions src/main/java/com/upserve/uppend/blobs/NativeIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.upserve.uppend.blobs;

import jnr.ffi.*;
import jnr.ffi.types.size_t;
import org.slf4j.Logger;
import com.kenai.jffi.MemoryIO;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.*;

public class NativeIO {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final NativeC nativeC = LibraryLoader.create(NativeC.class).load("c");
public static final int pageSize = nativeC.getpagesize(); // 4096 on most Linux

public enum Advice {
// These seem to be fairly stable https://github.com/torvalds/linux
// TODO add to https://github.com/jnr/jnr-constants
Normal(0), Random(1), Sequential(2), WillNeed(3), DontNeed(4);
private final int value;
Advice(int val) {
this.value = val;
}
}

public interface NativeC {
int madvise(@size_t long address, @size_t long size, int advice);
int getpagesize();
}

static long alignedAddress(long address) {
return address & (- pageSize);
}

static long alignedSize(long address, int capacity) {
long end = address + capacity;
end = (end + pageSize - 1) & (-pageSize);
return end - alignedAddress(address);
}

public static void madvise(MappedByteBuffer buffer, Advice advice) throws IOException {

final long address = MemoryIO.getInstance().getDirectBufferAddress(buffer);
final int capacity = buffer.capacity();

long alignedAddress = alignedAddress(address);
long alignedSize = alignedSize(alignedAddress, capacity);

log.debug(
"Page size {}; Address: raw - {}, aligned - {}; Size: raw - {}, aligned - {}",
pageSize, address, alignedAddress, capacity, alignedSize
);
int val = nativeC.madvise(alignedAddress, alignedSize, advice.value);

if (val != 0) {
throw new IOException(String.format("System call madvise failed with code: %d", val));
}
}
}
Loading

0 comments on commit bdc272a

Please sign in to comment.