diff --git a/README.md b/README.md index c0bbb572..cf4d1be7 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ compile 'com.upserve:uppend:0.0.1' Hello world: ```java -AppendOnlyStore db = Uppend.fileStore("build/tmp-db").build(); +AppendOnlyStore db = Uppend.store("build/tmp-db").build(); db.append("my-partition", "my-key", "value-1".getBytes()); db.append("my-partition", "my-key", "value-2".getBytes()); diff --git a/src/main/java/com/upserve/uppend/AppendOnlyStore.java b/src/main/java/com/upserve/uppend/AppendOnlyStore.java index d25347db..bc0181c3 100644 --- a/src/main/java/com/upserve/uppend/AppendOnlyStore.java +++ b/src/main/java/com/upserve/uppend/AppendOnlyStore.java @@ -4,13 +4,13 @@ import java.util.stream.Stream; /** - * Defines the minimum interface required to add byte arrays under a key, and to - * retrieve them. Note the expectation that the byte arrays are appended to the - * value, which can be thought of as an ever-growing list of byte arrays. + * Add byte arrays under a key and partition, and retrieve them. Note the + * expectation that the byte arrays are appended to the value, which is an + * ever-growing list of byte arrays. */ public interface AppendOnlyStore extends AutoCloseable, Flushable { /** - * Append a byte array under a given key + * Append a byte array under a given partition and key * * @param partition the partition to store under * @param key the key to store under @@ -27,7 +27,8 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable { void flush(); /** - * Read byte arrays that have been stored under a given key in parallel + * Read byte arrays that have been stored under a given partition and key in + * parallel * * @param partition the partition under which to retrieve * @param key the key under which to retrieve @@ -37,8 +38,8 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable { Stream read(String partition, String key); /** - * Read byte arrays that have been stored under a given key in the order - * they were stored in + * Read byte arrays that have been stored under a given partition and key in + * the order they were stored * * @param partition the partition under which to retrieve * @param key the key under which to retrieve @@ -49,7 +50,7 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable { /** - * Read the last byte array that was stored under a given key + * Read the last byte array that was stored under a given partition and key * * @param partition the partition under which to retrieve * @param key the key under which to retrieve @@ -59,7 +60,7 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable { byte[] readLast(String partition, String key); /** - * Enumerate the keys in the data store + * Enumerate the keys for a given partition * * @param partition the partition under which to retrieve * @throws IllegalArgumentException if partition is invalid @@ -68,7 +69,7 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable { Stream keys(String partition); /** - * Enumerate the partition in the data store + * Enumerate the partitions in the data store * * @return a stream of string partition */ diff --git a/src/main/java/com/upserve/uppend/CounterStore.java b/src/main/java/com/upserve/uppend/CounterStore.java new file mode 100644 index 00000000..186b66f2 --- /dev/null +++ b/src/main/java/com/upserve/uppend/CounterStore.java @@ -0,0 +1,81 @@ +package com.upserve.uppend; + +import java.io.Flushable; +import java.util.stream.Stream; + +/** + * Keep counters for partitioned keys. + */ +public interface CounterStore extends AutoCloseable, Flushable { + /** + * Set the counter under a given partition and key, to the given value + * + * @param partition the partition to increment under + * @param key the key to increment under + * @param value the value to set + * @throws IllegalArgumentException if partition is invalid + * @return the old value of the counter or 0 if it was previously unset + */ + long set(String partition, String key, long value); + + /** + * Increment by 1 the counter under a given partition and key, whose value + * is initialized to 0 + * + * @param partition the partition to increment under + * @param key the key to increment under + * @throws IllegalArgumentException if partition is invalid + * @return the new value of the counter + */ + long increment(String partition, String key); + + /** + * Increment by a given amount the counter under a given partition and key, + * whose value is initialized to 0 + * + * @param partition the partition to increment under + * @param key the key to increment under + * @param delta the amount to add to the current value + * @throws IllegalArgumentException if partition is invalid + * @return the new value of the counter + */ + long increment(String partition, String key, long delta); + + /** + * Flush any pending appends to durable storage. Will not return until + * the flush is completed. + */ + @Override + void flush(); + + /** + * Get the value for a given partition and key + * + * @param partition the partition to get + * @param key the key to get + * @throws IllegalArgumentException if partition is invalid + * @return the value for the given partition and key, or 0 if not found + */ + long get(String partition, String key); + + /** + * Enumerate the keys for a given partition + * + * @param partition the partition under which to retrieve + * @throws IllegalArgumentException if partition is invalid + * @return a stream of string keys + */ + Stream keys(String partition); + + /** + * Enumerate the partitions in the data store + * + * @return a stream of string partition + */ + Stream partitions(); + + /** + * Remove all keys and values from the store. + */ + void clear(); +} \ No newline at end of file diff --git a/src/main/java/com/upserve/uppend/CounterStoreBuilder.java b/src/main/java/com/upserve/uppend/CounterStoreBuilder.java new file mode 100644 index 00000000..536da176 --- /dev/null +++ b/src/main/java/com/upserve/uppend/CounterStoreBuilder.java @@ -0,0 +1,5 @@ +package com.upserve.uppend; + +public interface CounterStoreBuilder { + T build(); +} diff --git a/src/main/java/com/upserve/uppend/FileCounterStore.java b/src/main/java/com/upserve/uppend/FileCounterStore.java new file mode 100644 index 00000000..86b7d6ff --- /dev/null +++ b/src/main/java/com/upserve/uppend/FileCounterStore.java @@ -0,0 +1,102 @@ +package com.upserve.uppend; + +import com.upserve.uppend.lookup.LongLookup; +import lombok.extern.slf4j.Slf4j; + +import java.io.*; +import java.nio.file.*; +import java.util.stream.Stream; + +@Slf4j +public class FileCounterStore implements CounterStore, Flushable { + /** + * DEFAULT_FLUSH_DELAY_SECONDS is the number of seconds to wait between + * automatically flushing writes. + */ + public static final int DEFAULT_FLUSH_DELAY_SECONDS = FileAppendOnlyStore.DEFAULT_FLUSH_DELAY_SECONDS; + + private final Path dir; + private final LongLookup lookup; + + public FileCounterStore(Path dir) { + this( + dir, + LongLookup.DEFAULT_HASH_SIZE, + LongLookup.DEFAULT_WRITE_CACHE_SIZE, + DEFAULT_FLUSH_DELAY_SECONDS + ); + } + + public FileCounterStore(Path dir, int longLookupHashSize, int longLookupWriteCacheSize, int flushDelaySeconds) { + try { + Files.createDirectories(dir); + } catch (IOException e) { + throw new UncheckedIOException("unable to mkdirs: " + dir, e); + } + + this.dir = dir; + lookup = new LongLookup( + dir.resolve("inc-lookup"), + longLookupHashSize, + longLookupWriteCacheSize + ); + AutoFlusher.register(flushDelaySeconds, this); + } + + @Override + public long set(String partition, String key, long value) { + log.trace("setting {}={} in partition '{}'", key, value, partition); + return lookup.put(partition, key, value); + } + + @Override + public long increment(String partition, String key) { + return increment(partition, key, 1); + } + + @Override + public long increment(String partition, String key, long delta) { + log.trace("incrementing by {} key '{}' in partition '{}'", delta, key, partition); + return lookup.increment(partition, key, delta); + } + + @Override + public void flush() { + log.info("flushing {}", dir); + lookup.flush(); + log.info("flushed {}", dir); + } + + @Override + public long get(String partition, String key) { + long val = lookup.get(partition, key); + return val == -1 ? 0 : val; + } + + @Override + public Stream keys(String partition) { + return lookup.keys(partition); + } + + @Override + public Stream partitions() { + return lookup.partitions(); + } + + @Override + public void clear() { + log.trace("clearing"); + lookup.clear(); + } + + @Override + public void close() throws Exception { + log.info("closing: " + dir); + AutoFlusher.deregister(this); + try { + lookup.close(); + } catch (Exception e) { + log.error("unable to close lookup", e); + } + } +} diff --git a/src/main/java/com/upserve/uppend/FileCounterStoreBuilder.java b/src/main/java/com/upserve/uppend/FileCounterStoreBuilder.java new file mode 100644 index 00000000..0d3b51e9 --- /dev/null +++ b/src/main/java/com/upserve/uppend/FileCounterStoreBuilder.java @@ -0,0 +1,49 @@ +package com.upserve.uppend; + +import com.upserve.uppend.lookup.LongLookup; +import lombok.extern.slf4j.Slf4j; + +import java.nio.file.Path; + +@Slf4j +public class FileCounterStoreBuilder implements CounterStoreBuilder { + private Path dir; + private int longLookupHashSize = LongLookup.DEFAULT_HASH_SIZE; + private int longLookupWriteCacheSize = LongLookup.DEFAULT_WRITE_CACHE_SIZE; + private int flushDelaySeconds = FileCounterStore.DEFAULT_FLUSH_DELAY_SECONDS; + + public FileCounterStoreBuilder withDir(Path dir) { + this.dir = dir; + return this; + } + + public FileCounterStoreBuilder withLongLookupHashSize(int longLookupHashSize) { + this.longLookupHashSize = longLookupHashSize; + return this; + } + + public FileCounterStoreBuilder withLongLookupWriteCacheSize(int longLookupWriteCacheSize) { + this.longLookupWriteCacheSize = longLookupWriteCacheSize; + return this; + } + + public FileCounterStoreBuilder withFlushDelaySeconds(int flushDelaySeconds) { + this.flushDelaySeconds = flushDelaySeconds; + return this; + } + + @Override + public FileCounterStore build() { + log.info("building FileCounterStore from builder: {}", this); + return new FileCounterStore(dir, longLookupHashSize, longLookupWriteCacheSize, flushDelaySeconds); + } + + @Override + public String toString() { + return "FileCounterStoreBuilder{" + + "dir=" + dir + + ", longLookupHashSize=" + longLookupHashSize + + ", longLookupWriteCacheSize=" + longLookupWriteCacheSize + + ", flushDelaySeconds=" + flushDelaySeconds + + '}'; + }} diff --git a/src/main/java/com/upserve/uppend/Uppend.java b/src/main/java/com/upserve/uppend/Uppend.java index 21161c17..875aa625 100644 --- a/src/main/java/com/upserve/uppend/Uppend.java +++ b/src/main/java/com/upserve/uppend/Uppend.java @@ -20,14 +20,23 @@ public final class Uppend { private Uppend() { } - public static FileAppendOnlyStoreBuilder fileStore(String path) { - return fileStore(Paths.get(path)); + public static FileAppendOnlyStoreBuilder store(String path) { + return store(Paths.get(path)); } - public static FileAppendOnlyStoreBuilder fileStore(Path path) { + public static FileAppendOnlyStoreBuilder store(Path path) { return new FileAppendOnlyStoreBuilder().withDir(path); } + public static FileCounterStoreBuilder counterStore(String path) { + return counterStore(Paths.get(path)); + } + + public static FileCounterStoreBuilder counterStore(Path path) { + return new FileCounterStoreBuilder().withDir(path); + } + + public static void main(String ... args) throws Exception { Cli.main(args); } diff --git a/src/main/java/com/upserve/uppend/cli/benchmark/Benchmark.java b/src/main/java/com/upserve/uppend/cli/benchmark/Benchmark.java index 5f018cae..b97217ae 100644 --- a/src/main/java/com/upserve/uppend/cli/benchmark/Benchmark.java +++ b/src/main/java/com/upserve/uppend/cli/benchmark/Benchmark.java @@ -32,7 +32,7 @@ public Benchmark(BenchmarkMode mode, Path path, int maxPartitions, int maxKeys, log.warn("Location already exists: appending to {}", path); } - testInstance = Uppend.fileStore(path) + testInstance = Uppend.store(path) .withLongLookupHashSize(hashSize) .withLongLookupWriteCacheSize(cachesize) .withFlushDelaySeconds(flushDelaySeconds) diff --git a/src/main/java/com/upserve/uppend/lookup/LongLookup.java b/src/main/java/com/upserve/uppend/lookup/LongLookup.java index 02e87bb0..7c2af598 100644 --- a/src/main/java/com/upserve/uppend/lookup/LongLookup.java +++ b/src/main/java/com/upserve/uppend/lookup/LongLookup.java @@ -121,40 +121,28 @@ public long get(String partition, String key) { return metadata.readData(lenPath.resolve("data"), lookupKey); } - public void put(String partition, String key, long value) { + public long put(String partition, String key, long value) { validatePartition(partition); LookupKey lookupKey = new LookupKey(key); Path lenPath = hashAndLengthPath(partition, lookupKey); - loadFromWriteCache(lenPath).put(lookupKey, value); - } - - private LookupData loadFromWriteCache(Path lenPath) { - synchronized (writeCache) { - return writeCache.computeIfAbsent(lenPath, path -> { - log.trace("cache loading {}", lenPath); - return new LookupData( - parseKeyLengthFromPath(lenPath), - lenPath.resolve("data"), - lenPath.resolve("meta") - ); - }); - } - } - - private LookupData loadFromWriteCacheIfExists(Path lenPath) { - synchronized (writeCache) { - return writeCache.get(lenPath); - } + return loadFromWriteCache(lenPath).put(lookupKey, value); } public long putIfNotExists(String partition, String key, LongSupplier allocateLongFunc) { validatePartition(partition); LookupKey lookupKey = new LookupKey(key); Path lenPath = hashAndLengthPath(partition, lookupKey); - //noinspection ConstantConditions return loadFromWriteCache(lenPath).putIfNotExists(lookupKey, allocateLongFunc); } + public long increment(String partition, String key, long delta) { + validatePartition(partition); + LookupKey lookupKey = new LookupKey(key); + Path lenPath = hashAndLengthPath(partition, lookupKey); + return loadFromWriteCache(lenPath).increment(lookupKey, delta); + + } + public Stream keys(String partition) { validatePartition(partition); @@ -179,6 +167,9 @@ public Stream keys(String partition) { public Stream partitions() { Stream files; try { + if (!Files.exists(dir)) { + return Stream.empty(); + } files = Files.walk(dir, 1); } catch (IOException e) { throw new UncheckedIOException("could not walk dir " + dir, e); @@ -249,6 +240,25 @@ public void clear() { } } + private LookupData loadFromWriteCache(Path lenPath) { + synchronized (writeCache) { + return writeCache.computeIfAbsent(lenPath, path -> { + log.trace("cache loading {}", lenPath); + return new LookupData( + parseKeyLengthFromPath(lenPath), + lenPath.resolve("data"), + lenPath.resolve("meta") + ); + }); + } + } + + private LookupData loadFromWriteCacheIfExists(Path lenPath) { + synchronized (writeCache) { + return writeCache.get(lenPath); + } + } + private Path hashAndLengthPath(String partition, LookupKey key) { log.trace("getting from {}: {}", dir, key); byte[] hash = hashFunction.hashString(key.string(), Charsets.UTF_8).asBytes(); diff --git a/src/main/java/com/upserve/uppend/lookup/LookupData.java b/src/main/java/com/upserve/uppend/lookup/LookupData.java index 43fbb4ad..ab7d6b1b 100644 --- a/src/main/java/com/upserve/uppend/lookup/LookupData.java +++ b/src/main/java/com/upserve/uppend/lookup/LookupData.java @@ -1,7 +1,6 @@ package com.upserve.uppend.lookup; import com.upserve.uppend.util.*; -import it.unimi.dsi.fastutil.Function; import it.unimi.dsi.fastutil.objects.*; import lombok.extern.slf4j.Slf4j; @@ -10,8 +9,8 @@ import java.nio.channels.*; import java.nio.file.*; import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.LongSupplier; +import java.util.concurrent.atomic.*; +import java.util.function.*; import java.util.stream.*; @Slf4j @@ -21,9 +20,10 @@ public class LookupData implements AutoCloseable, Flushable { private final int keyLength; private final Path path; private final Path metadataPath; - - private FileChannel chan; - private DataOutputStream out; + private final Supplier recordBufSupplier; + private final Supplier longValueBufSupplier; + private final FileChannel chan; + private final AtomicLong chanSize; private Object2LongSortedMap mem; private Object2IntLinkedOpenHashMap memOrder; @@ -43,13 +43,15 @@ public LookupData(int keyLength, Path path, Path metadataPath) { } } + recordBufSupplier = ThreadLocalByteBuffers.threadLocalByteBufferSupplier(keyLength + 8); + longValueBufSupplier = ThreadLocalByteBuffers.threadLocalByteBufferSupplier(8); + try { chan = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); - chan.position(chan.size()); + chanSize = new AtomicLong(chan.size()); } catch (IOException e) { throw new UncheckedIOException("can't open file: " + path, e); } - out = new DataOutputStream(new BufferedOutputStream(Channels.newOutputStream(chan), 8192)); try { init(); @@ -71,17 +73,33 @@ public synchronized long get(LookupKey key) { return mem.getLong(key); } - public synchronized void put(LookupKey key, long value) { + /** + * Set the value associated with the given key and return the prior value + * + * @param key the key whose value to set + * @param value the new value set + * @return the old value associated with the key, or {@code Long.MIN_VALUE} + * if the entry didn't exist yet + */ + public synchronized long put(LookupKey key, long value) { log.trace("putting {}={} in {}", key, value, path); - long existingValue = mem.put(key, value); + final long existingValue = mem.put(key, value); + if (existingValue != Long.MIN_VALUE) { - throw new IllegalStateException("can't put same key ('" + key + "') twice: new value = " + value + ", existing value = " + existingValue); - } - int pos = memOrder.size(); - if (memOrder.put(key, pos) != Integer.MIN_VALUE) { - throw new IllegalStateException("encountered repeated mem order key at pos " + pos + ": " + key); + int index = memOrder.getInt(key); + if (index == Integer.MIN_VALUE) { + throw new IllegalStateException("unknown index order for existing key: " + key); + } + set(index, value); + } else { + int index = memOrder.size(); + if (memOrder.put(key, index) != Integer.MIN_VALUE) { + throw new IllegalStateException("encountered repeated mem order key at index " + index + ": " + key); + } + append(key, value); } - append(key, value); + + return existingValue; } public synchronized long putIfNotExists(LookupKey key, long value) { @@ -90,9 +108,9 @@ public synchronized long putIfNotExists(LookupKey key, long value) { if (existingValue != Long.MIN_VALUE) { return existingValue; } - int pos = memOrder.size(); - if (memOrder.put(key, pos) != Integer.MIN_VALUE) { - throw new IllegalStateException("encountered repeated mem order key at pos " + pos + ": " + key); + int index = memOrder.size(); + if (memOrder.put(key, index) != Integer.MIN_VALUE) { + throw new IllegalStateException("encountered repeated mem order key at index " + index + ": " + key); } append(key, value); return value; @@ -109,30 +127,66 @@ public synchronized long putIfNotExists(LookupKey key, LongSupplier allocateLong long newValue = allocateLongFunc.getAsLong(); long existingValue = putIfNotExists(key, newValue); if (existingValue != newValue) { - log.warn("lost race to allocate, wasted new value " + newValue + " for key: " + key); + throw new IllegalStateException("race while putting (if not exists) " + key + "= in " + path); } return existingValue; } + public synchronized long increment(LookupKey key, long delta) { + log.trace("incrementing {} by {} in {}", key, delta, path); + long value = mem.getLong(key); + if (value == Long.MIN_VALUE) { + value = delta; + long existingValue = putIfNotExists(key, value); + if (existingValue != value) { + throw new IllegalStateException("race while incrementing new key " + key + " by " + delta + " in " + path); + } + } else { + int index = memOrder.getInt(key); + if (index == Integer.MIN_VALUE) { + throw new IllegalStateException("unknown index order for existing key: " + key); + } + value += delta; + set(index, value); + } + return value; + } + private void append(LookupKey key, long value) { byte[] keyBytes = key.bytes(); if (keyBytes.length != keyLength) { throw new IllegalStateException("unexpected key length: expected " + keyLength + ", got " + keyBytes.length); } try { - out.write(keyBytes); - out.writeLong(value); + long pos = chanSize.getAndAdd(keyLength + 8); + ByteBuffer buf = recordBufSupplier.get(); + buf.put(keyBytes); + buf.putLong(value); + buf.flip(); + chan.write(buf, pos); } catch (IOException e) { throw new UncheckedIOException("unable to write key: " + key, e); } } + private void set(int index, long value) { + try { + ByteBuffer buf = longValueBufSupplier.get(); + buf.putLong(value); + buf.flip(); + chan.write(buf, index * (keyLength + 8) + keyLength); + } catch (IOException e) { + throw new UncheckedIOException("unable to write value (" + value + ") at index: " + index, e); + } + } + + @Override public synchronized void close() throws IOException { log.trace("closing lookup data at {} (~{} entries)", path, mem.size()); if (isClosed.compareAndSet(false, true)) { flush(); - out.close(); + chan.close(); log.trace("closed lookup data at {}", path); } else { log.warn("lookup data already closed: " + path, new RuntimeException("was closed") /* get stack */); @@ -142,7 +196,7 @@ public synchronized void close() throws IOException { @Override public synchronized void flush() throws IOException { log.trace("flushing lookup and metadata at {}", metadataPath); - out.flush(); + chan.force(true); LookupMetadata metadata = generateMetadata(); metadata.writeTo(metadataPath); log.trace("flushed lookup and metadata at {}: {}", metadataPath, metadata); diff --git a/src/test/java/com/upserve/uppend/AppendOnlyStoreTest.java b/src/test/java/com/upserve/uppend/AppendOnlyStoreTest.java index 9dc9ff12..9f08f803 100644 --- a/src/test/java/com/upserve/uppend/AppendOnlyStoreTest.java +++ b/src/test/java/com/upserve/uppend/AppendOnlyStoreTest.java @@ -1,23 +1,17 @@ package com.upserve.uppend; import com.upserve.uppend.lookup.LongLookup; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.junit.rules.ExpectedException; -import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.*; import static org.junit.Assert.*; public abstract class AppendOnlyStoreTest { protected abstract AppendOnlyStore newStore(); - protected AppendOnlyStore store = newStore(); + private AppendOnlyStore store; @Rule public ExpectedException thrown = ExpectedException.none(); @@ -227,12 +221,12 @@ public void testReadWriteEmpty() { tester(1, 0); } - public void tester(int number, int size){ + private void tester(int number, int size){ String key = "foobar"; String partition = "partition"; byte[] bytes; - ArrayList inputBytes = new ArrayList(); + ArrayList inputBytes = new ArrayList<>(); for(int i=0; i