Skip to content

Commit

Permalink
Merge pull request #33 from upserve/release_process
Browse files Browse the repository at this point in the history
Release process
  • Loading branch information
bfulton authored Nov 7, 2017
2 parents 40759ec + e0652ce commit 515ea11
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 47 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ sudo: false
jdk:
- oraclejdk8
script: ./gradlew clean build
after_success:
- bash <(curl -s https://codecov.io/bash)
37 changes: 17 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,20 +1,12 @@
Uppend: an append-only, key-multivalue store
============================================
[![Build Status](https://travis-ci.com/upserve/uppend.svg?token=***REMOVED***&branch=add_travis_yml)](https://travis-ci.com/upserve/uppend)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.upserve/uppend/badge.svg)](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.upserve%22%20AND%20a%3Auppend)

Uppend is an append-only, key-multivalue store, suitable for creating analytics
views from event streams.

Benefits:

* Optimized to be I/O constrained on modest hardware

Tradeoffs:

* Individual values are immutable
* Assumes a single writer process
[![Build Status](https://img.shields.io/travis/upserve/uppend/master.svg?style=flat-square)](https://travis-ci.org/upserve/uppend)
[![Release Artifact](https://img.shields.io/maven-central/v/com.upserve/uppend.svg?style=flat-square)](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.upserve%22%20AND%20a%3Auppend)
[![Test Coverage](https://img.shields.io/codecov/c/github/upserveuppend/master.svg?style=flat-square)](https://codecov.io/github/upserve/uppend?branch=master)

Uppend is an append-only, key-multivalue store which is suitable for streaming
event aggregation. It assumes a single writer process, and appended values are
immutable.

Use
---
Expand All @@ -25,22 +17,27 @@ Maven:
<dependency>
<groupId>com.upserve</groupId>
<artifactId>uppend</artifactId>
<version>1.0.0</version>
<version>0.0.1</version>
</dependency>
```

Gradle:
```gradle
compile 'com.upserve:uppend:1.0.0'
compile 'com.upserve:uppend:0.0.1'
```

Hello world:

```java
//import com.upserve.uppend.*
DB db = DBMaker.memoryDB().make();
ConcurrentMap map = db.hashMap("map").make();
map.put("something", "here");
AppendOnlyStore db = Uppend.fileStore("build/tmp-db").build();

db.append("my-partition", "my-key", "value-1".getBytes());
db.append("my-partition", "my-key", "value-2".getBytes());

String values = db.readSequential("my-partition", "my-key")
.map(String::new)
.collect(Collectors.joining(", "));
// value-1, value-2
```

Development
Expand Down
68 changes: 67 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.bmuschko:gradle-nexus-plugin:2.3.1'
}
}

plugins {
id 'com.palantir.git-version' version '0.9.1'
id 'nebula.lint' version '6.7.0'
Expand All @@ -9,7 +18,7 @@ version gitVersion()

group 'com.upserve'

description = """Uppend: fast, append-only key-value store"""
description = """Uppend: fast, append-only key-multivalue store"""

task wrapper(type: Wrapper) {
gradleVersion = '3.3'
Expand All @@ -19,6 +28,8 @@ gradleLint.rules += 'unused-dependency'

apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'jacoco'
apply plugin: 'com.bmuschko.nexus'

sourceCompatibility = 1.8
targetCompatibility = 1.8
Expand Down Expand Up @@ -73,3 +84,58 @@ task fatJar(type: Jar) {
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}

jacocoTestReport {
reports {
xml.enabled = true
html.enabled = true
}
}

check.dependsOn jacocoTestReport

modifyPom {
project {
name 'Uppend'
description 'Uppend is an append-only, key-multivalue store.'
url 'https://github.com/upserve/uppend'
inceptionYear '2017'

scm {
url 'https://github.com/upserve/uppend'
connection 'scm:https://upserve@github.com/upserve/uppend.git'
developerConnection 'scm:git://github.com/upserve/uppend.git'
}

licenses {
license {
name 'The MIT License'
url 'https://opensource.org/licenses/MIT'
distribution 'repo'
}
}

developers {
developer {
id 'bfulton'
name 'Bright Fulton'
url 'https://github.com/bfulton'
}
developer {
id 'dstuebe'
name 'David Stuebe'
url 'https://github.com/dstuebe'
}
developer {
id 'jazzdan'
name 'Dan Miller'
url 'https://github.com/jazzdan'
}
developer {
id 'kbarrette'
name 'Keith Barrette'
url 'https://github.com/kbarrette'
}
}
}
}
16 changes: 14 additions & 2 deletions src/main/java/com/upserve/uppend/AppendOnlyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,27 @@ public interface AppendOnlyStore extends AutoCloseable, Flushable {
void flush();

/**
* Read byte arrays that have been stored under a given key
* Read byte arrays that have been stored under a given key in parallel
*
* @param partition the partition under which to retrieve
* @param key the key under which to retrieve
* @throws IllegalArgumentException if partition is invalid
* @return a stream of the stored byte arrays
* @return a parallel stream of the stored byte arrays
*/
Stream<byte[]> read(String partition, String key);

/**
* Read byte arrays that have been stored under a given key in the order
* they were stored in
*
* @param partition the partition under which to retrieve
* @param key the key under which to retrieve
* @throws IllegalArgumentException if partition is invalid
* @return a stream of the stored byte arrays in storage order
*/
Stream<byte[]> readSequential(String partition, String key);


/**
* Read the last byte array that was stored under a given key
*
Expand Down
18 changes: 17 additions & 1 deletion src/main/java/com/upserve/uppend/BlockedLongs.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.LongStream;

Expand All @@ -29,6 +30,8 @@ public class BlockedLongs implements AutoCloseable, Flushable {
private final FileChannel blocksPos;
private final MappedByteBuffer posBuf;

private final AtomicBoolean outDirty;

public BlockedLongs(Path file, int valuesPerBlock) {
this.file = file;

Expand Down Expand Up @@ -74,13 +77,16 @@ public BlockedLongs(Path file, int valuesPerBlock) {
} catch (IOException e) {
throw new UncheckedIOException("unable to init blocks pos file: " + posFile, e);
}

outDirty = new AtomicBoolean(false);
}

public synchronized long allocate() {
log.trace("allocating {} bytes in {}", blockSize, file);
synchronized (posBuf) {
long pos = getPos();
putPos(pos + blockSize);
outDirty.set(true);
return pos;
}
}
Expand All @@ -106,12 +112,16 @@ public synchronized void append(long pos, long val) {
writeLong(valuePos, val);
writeLong(pos, numValuesLong + 1);
}
outDirty.set(true);
}
log.trace("appended value {} to {} at {}", val, file, pos);
}

public LongStream values(long pos) {
log.trace("streaming values from {} at {}", file, pos);
if (outDirty.get()) {
flush();
}
if (pos >= getPos()) {
return LongStream.empty();
}
Expand Down Expand Up @@ -148,6 +158,9 @@ public LongStream values(long pos) {

public long lastValue(long pos) {
log.trace("reading last value from {} at {}", file, pos);
if (outDirty.get()) {
flush();
}
if (pos >= getPos()) {
return -1;
}
Expand All @@ -173,12 +186,13 @@ public long lastValue(long pos) {
}
}

public void clear() {
public synchronized void clear() {
log.debug("clearing {}", file);
try {
blocks.truncate(0);
putPos(0);
Arrays.fill(pages, null);
outDirty.set(true);
} catch (IOException e) {
throw new UncheckedIOException("unable to clear", e);
}
Expand All @@ -195,6 +209,7 @@ public synchronized void close() throws Exception {
blocks.close();
posBuf.force();
blocksPos.close();
outDirty.set(false);
}

@Override
Expand All @@ -206,6 +221,7 @@ public synchronized void flush() {
}
}
posBuf.force();
outDirty.set(false);
}

private ByteBuffer readBlock(long pos) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/upserve/uppend/FileAppendOnlyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public Stream<byte[]> read(String partition, String key) {
}

@Override
public Stream<byte[]> readSequential(String partition, String key) {
return blockValues(partition, key)
.mapToObj(blobs::read);
}

public byte[] readLast(String partition, String key) {
long pos = blockLastValue(partition, key);
if (pos == -1) {
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/upserve/uppend/Uppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.upserve.uppend.cli.Cli;

import java.io.*;
import java.nio.file.*;

public final class Uppend {
public static final String VERSION;
Expand All @@ -19,8 +20,12 @@ public final class Uppend {
private Uppend() {
}

public static FileAppendOnlyStoreBuilder fileAppendOnlyStore() {
return new FileAppendOnlyStoreBuilder();
public static FileAppendOnlyStoreBuilder fileStore(String path) {
return fileStore(Paths.get(path));
}

public static FileAppendOnlyStoreBuilder fileStore(Path path) {
return new FileAppendOnlyStoreBuilder().withDir(path);
}

public static void main(String ... args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ public Benchmark(BenchmarkMode mode, Path path, int maxPartitions, int maxKeys,
log.warn("Location already exists: appending to {}", path);
}

testInstance = new FileAppendOnlyStoreBuilder()
.withDir(path)
testInstance = Uppend.fileStore(path)
.withLongLookupHashSize(hashSize)
.withLongLookupWriteCacheSize(cachesize)
.withFlushDelaySeconds(flushDelaySeconds)
Expand Down
22 changes: 20 additions & 2 deletions src/main/java/com/upserve/uppend/lookup/LongLookup.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class LongLookup implements AutoCloseable, Flushable {
public static final int DEFAULT_WRITE_CACHE_SIZE = DEFAULT_HASH_SIZE * 1;

private final Path dir;
private final int hashSize;
private final int hashBytes;
private final int hashFinalByteMask;
private final HashFunction hashFunction;
Expand Down Expand Up @@ -62,7 +61,6 @@ public LongLookup(Path dir, int hashSize, int writeCacheSize) {
throw new UncheckedIOException("unable to mkdirs: " + dir, e);
}

this.hashSize = hashSize;
String hashBinaryString = Integer.toBinaryString(hashSize - 1);
hashBytes = (hashBinaryString.length() + 7) / 8;
hashFinalByteMask = (1 << (hashBinaryString.length() % 8)) - 1;
Expand Down Expand Up @@ -95,11 +93,25 @@ protected synchronized boolean removeEldestEntry(Map.Entry<Path, LookupData> eld
};
}

/**
* Get the value associated with the given partition and key
*
* @param partition the partition to look up
* @param key the key to look up
* @return the value for the partition and key, or -1 if not found
*/
public long get(String partition, String key) {
validatePartition(partition);

LookupKey lookupKey = new LookupKey(key);
Path lenPath = hashAndLengthPath(partition, lookupKey);

LookupData data = loadFromWriteCacheIfExists(lenPath);
if (data != null) {
long value = data.get(lookupKey);
return value == Long.MIN_VALUE ? -1 : value;
}

Path metaPath = lenPath.resolve("meta");
if (!Files.exists(metaPath)) {
log.trace("no metadata for key {} at path {}", key, metaPath);
Expand Down Expand Up @@ -129,6 +141,12 @@ private LookupData loadFromWriteCache(Path lenPath) {
}
}

private LookupData loadFromWriteCacheIfExists(Path lenPath) {
synchronized (writeCache) {
return writeCache.get(lenPath);
}
}

public long putIfNotExists(String partition, String key, LongSupplier allocateLongFunc) {
validatePartition(partition);
LookupKey lookupKey = new LookupKey(key);
Expand Down
Loading

0 comments on commit 515ea11

Please sign in to comment.