Skip to content

Commit

Permalink
Merge pull request #37 from upserve/add_increment_api
Browse files Browse the repository at this point in the history
add counter store
  • Loading branch information
bfulton authored Nov 8, 2017
2 parents 4562403 + 62aeac8 commit 8f771d0
Show file tree
Hide file tree
Showing 18 changed files with 594 additions and 80 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ compile 'com.upserve:uppend:0.0.1'
Hello world:

```java
AppendOnlyStore db = Uppend.fileStore("build/tmp-db").build();
AppendOnlyStore db = Uppend.store("build/tmp-db").build();

db.append("my-partition", "my-key", "value-1".getBytes());
db.append("my-partition", "my-key", "value-2".getBytes());
Expand Down
21 changes: 11 additions & 10 deletions src/main/java/com/upserve/uppend/AppendOnlyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import java.util.stream.Stream;

/**
* Defines the minimum interface required to add byte arrays under a key, and to
* retrieve them. Note the expectation that the byte arrays are appended to the
* value, which can be thought of as an ever-growing list of byte arrays.
* Add byte arrays under a key and partition, and retrieve them. Note the
* expectation that the byte arrays are appended to the value, which is an
* ever-growing list of byte arrays.
*/
public interface AppendOnlyStore extends AutoCloseable, Flushable {
/**
* Append a byte array under a given key
* Append a byte array under a given partition and key
*
* @param partition the partition to store under
* @param key the key to store under
Expand All @@ -27,7 +27,8 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable {
void flush();

/**
* Read byte arrays that have been stored under a given key in parallel
* Read byte arrays that have been stored under a given partition and key in
* parallel
*
* @param partition the partition under which to retrieve
* @param key the key under which to retrieve
Expand All @@ -37,8 +38,8 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable {
Stream<byte[]> read(String partition, String key);

/**
* Read byte arrays that have been stored under a given key in the order
* they were stored in
* Read byte arrays that have been stored under a given partition and key in
* the order they were stored
*
* @param partition the partition under which to retrieve
* @param key the key under which to retrieve
Expand All @@ -49,7 +50,7 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable {


/**
* Read the last byte array that was stored under a given key
* Read the last byte array that was stored under a given partition and key
*
* @param partition the partition under which to retrieve
* @param key the key under which to retrieve
Expand All @@ -59,7 +60,7 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable {
byte[] readLast(String partition, String key);

/**
* Enumerate the keys in the data store
* Enumerate the keys for a given partition
*
* @param partition the partition under which to retrieve
* @throws IllegalArgumentException if partition is invalid
Expand All @@ -68,7 +69,7 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable {
Stream<String> keys(String partition);

/**
* Enumerate the partition in the data store
* Enumerate the partitions in the data store
*
* @return a stream of string partition
*/
Expand Down
81 changes: 81 additions & 0 deletions src/main/java/com/upserve/uppend/CounterStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.upserve.uppend;

import java.io.Flushable;
import java.util.stream.Stream;

/**
* Keep counters for partitioned keys.
*/
public interface CounterStore extends AutoCloseable, Flushable {
/**
* Set the counter under a given partition and key, to the given value
*
* @param partition the partition to increment under
* @param key the key to increment under
* @param value the value to set
* @throws IllegalArgumentException if partition is invalid
* @return the old value of the counter or 0 if it was previously unset
*/
long set(String partition, String key, long value);

/**
* Increment by 1 the counter under a given partition and key, whose value
* is initialized to 0
*
* @param partition the partition to increment under
* @param key the key to increment under
* @throws IllegalArgumentException if partition is invalid
* @return the new value of the counter
*/
long increment(String partition, String key);

/**
* Increment by a given amount the counter under a given partition and key,
* whose value is initialized to 0
*
* @param partition the partition to increment under
* @param key the key to increment under
* @param delta the amount to add to the current value
* @throws IllegalArgumentException if partition is invalid
* @return the new value of the counter
*/
long increment(String partition, String key, long delta);

/**
* Flush any pending appends to durable storage. Will not return until
* the flush is completed.
*/
@Override
void flush();

/**
* Get the value for a given partition and key
*
* @param partition the partition to get
* @param key the key to get
* @throws IllegalArgumentException if partition is invalid
* @return the value for the given partition and key, or 0 if not found
*/
long get(String partition, String key);

/**
* Enumerate the keys for a given partition
*
* @param partition the partition under which to retrieve
* @throws IllegalArgumentException if partition is invalid
* @return a stream of string keys
*/
Stream<String> keys(String partition);

/**
* Enumerate the partitions in the data store
*
* @return a stream of string partition
*/
Stream<String> partitions();

/**
* Remove all keys and values from the store.
*/
void clear();
}
5 changes: 5 additions & 0 deletions src/main/java/com/upserve/uppend/CounterStoreBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.upserve.uppend;

public interface CounterStoreBuilder<T extends CounterStore> {
T build();
}
102 changes: 102 additions & 0 deletions src/main/java/com/upserve/uppend/FileCounterStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.upserve.uppend;

import com.upserve.uppend.lookup.LongLookup;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.nio.file.*;
import java.util.stream.Stream;

@Slf4j
public class FileCounterStore implements CounterStore, Flushable {
/**
* DEFAULT_FLUSH_DELAY_SECONDS is the number of seconds to wait between
* automatically flushing writes.
*/
public static final int DEFAULT_FLUSH_DELAY_SECONDS = FileAppendOnlyStore.DEFAULT_FLUSH_DELAY_SECONDS;

private final Path dir;
private final LongLookup lookup;

public FileCounterStore(Path dir) {
this(
dir,
LongLookup.DEFAULT_HASH_SIZE,
LongLookup.DEFAULT_WRITE_CACHE_SIZE,
DEFAULT_FLUSH_DELAY_SECONDS
);
}

public FileCounterStore(Path dir, int longLookupHashSize, int longLookupWriteCacheSize, int flushDelaySeconds) {
try {
Files.createDirectories(dir);
} catch (IOException e) {
throw new UncheckedIOException("unable to mkdirs: " + dir, e);
}

this.dir = dir;
lookup = new LongLookup(
dir.resolve("inc-lookup"),
longLookupHashSize,
longLookupWriteCacheSize
);
AutoFlusher.register(flushDelaySeconds, this);
}

@Override
public long set(String partition, String key, long value) {
log.trace("setting {}={} in partition '{}'", key, value, partition);
return lookup.put(partition, key, value);
}

@Override
public long increment(String partition, String key) {
return increment(partition, key, 1);
}

@Override
public long increment(String partition, String key, long delta) {
log.trace("incrementing by {} key '{}' in partition '{}'", delta, key, partition);
return lookup.increment(partition, key, delta);
}

@Override
public void flush() {
log.info("flushing {}", dir);
lookup.flush();
log.info("flushed {}", dir);
}

@Override
public long get(String partition, String key) {
long val = lookup.get(partition, key);
return val == -1 ? 0 : val;
}

@Override
public Stream<String> keys(String partition) {
return lookup.keys(partition);
}

@Override
public Stream<String> partitions() {
return lookup.partitions();
}

@Override
public void clear() {
log.trace("clearing");
lookup.clear();
}

@Override
public void close() throws Exception {
log.info("closing: " + dir);
AutoFlusher.deregister(this);
try {
lookup.close();
} catch (Exception e) {
log.error("unable to close lookup", e);
}
}
}
49 changes: 49 additions & 0 deletions src/main/java/com/upserve/uppend/FileCounterStoreBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.upserve.uppend;

import com.upserve.uppend.lookup.LongLookup;
import lombok.extern.slf4j.Slf4j;

import java.nio.file.Path;

@Slf4j
public class FileCounterStoreBuilder implements CounterStoreBuilder<FileCounterStore> {
private Path dir;
private int longLookupHashSize = LongLookup.DEFAULT_HASH_SIZE;
private int longLookupWriteCacheSize = LongLookup.DEFAULT_WRITE_CACHE_SIZE;
private int flushDelaySeconds = FileCounterStore.DEFAULT_FLUSH_DELAY_SECONDS;

public FileCounterStoreBuilder withDir(Path dir) {
this.dir = dir;
return this;
}

public FileCounterStoreBuilder withLongLookupHashSize(int longLookupHashSize) {
this.longLookupHashSize = longLookupHashSize;
return this;
}

public FileCounterStoreBuilder withLongLookupWriteCacheSize(int longLookupWriteCacheSize) {
this.longLookupWriteCacheSize = longLookupWriteCacheSize;
return this;
}

public FileCounterStoreBuilder withFlushDelaySeconds(int flushDelaySeconds) {
this.flushDelaySeconds = flushDelaySeconds;
return this;
}

@Override
public FileCounterStore build() {
log.info("building FileCounterStore from builder: {}", this);
return new FileCounterStore(dir, longLookupHashSize, longLookupWriteCacheSize, flushDelaySeconds);
}

@Override
public String toString() {
return "FileCounterStoreBuilder{" +
"dir=" + dir +
", longLookupHashSize=" + longLookupHashSize +
", longLookupWriteCacheSize=" + longLookupWriteCacheSize +
", flushDelaySeconds=" + flushDelaySeconds +
'}';
}}
15 changes: 12 additions & 3 deletions src/main/java/com/upserve/uppend/Uppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@ public final class Uppend {
private Uppend() {
}

public static FileAppendOnlyStoreBuilder fileStore(String path) {
return fileStore(Paths.get(path));
public static FileAppendOnlyStoreBuilder store(String path) {
return store(Paths.get(path));
}

public static FileAppendOnlyStoreBuilder fileStore(Path path) {
public static FileAppendOnlyStoreBuilder store(Path path) {
return new FileAppendOnlyStoreBuilder().withDir(path);
}

public static FileCounterStoreBuilder counterStore(String path) {
return counterStore(Paths.get(path));
}

public static FileCounterStoreBuilder counterStore(Path path) {
return new FileCounterStoreBuilder().withDir(path);
}


public static void main(String ... args) throws Exception {
Cli.main(args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Benchmark(BenchmarkMode mode, Path path, int maxPartitions, int maxKeys,
log.warn("Location already exists: appending to {}", path);
}

testInstance = Uppend.fileStore(path)
testInstance = Uppend.store(path)
.withLongLookupHashSize(hashSize)
.withLongLookupWriteCacheSize(cachesize)
.withFlushDelaySeconds(flushDelaySeconds)
Expand Down
Loading

0 comments on commit 8f771d0

Please sign in to comment.