Skip to content

Commit

Permalink
Support MIN/MAX FDB mutation and expose little-endian value serdes fo…
Browse files Browse the repository at this point in the history
…r Long and YearMonth
  • Loading branch information
nktpro committed Feb 7, 2024
1 parent 0ccbffd commit e2be969
Show file tree
Hide file tree
Showing 13 changed files with 200 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,30 @@ final class KvdbOperationFactory[BCF[A, B] <: ColumnFamily[A, B]] {
value = v
)
}

def mutateMin[CF[A, B] <: ColumnFamily[A, B], CF2 <: BCF[K, V], K, V](
column: CF[K, V] with CF2,
key: K,
value: V
): TransactionMutateMin = {
val (k, v) = column.serialize(key, value)
TransactionMutateMin(
columnId = column.id,
key = k,
value = v
)
}

def mutateMax[CF[A, B] <: ColumnFamily[A, B], CF2 <: BCF[K, V], K, V](
column: CF[K, V] with CF2,
key: K,
value: V
): TransactionMutateMax = {
val (k, v) = column.serialize(key, value)
TransactionMutateMax(
columnId = column.id,
key = k,
value = v
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ object KvdbWriteTransactionBuilder {
extends TransactionWrite
final case class TransactionMutateAdd(columnId: String, key: Array[Byte], value: Array[Byte])
extends TransactionWrite
final case class TransactionMutateMin(columnId: String, key: Array[Byte], value: Array[Byte])
extends TransactionWrite
final case class TransactionMutateMax(columnId: String, key: Array[Byte], value: Array[Byte])
extends TransactionWrite
}

final class KvdbWriteTransactionBuilder[BCF[A, B] <: ColumnFamily[A, B]] {
Expand Down Expand Up @@ -86,6 +90,22 @@ final class KvdbWriteTransactionBuilder[BCF[A, B] <: ColumnFamily[A, B]] {
add(factory.mutateAdd(column, key, value))
}

def mutateMin[CF[A, B] <: ColumnFamily[A, B], CF2 <: BCF[K, V], K, V](
column: CF[K, V] with CF2,
key: K,
value: V
): this.type = {
add(factory.mutateMin(column, key, value))
}

def mutateMax[CF[A, B] <: ColumnFamily[A, B], CF2 <: BCF[K, V], K, V](
column: CF[K, V] with CF2,
key: K,
value: V
): this.type = {
add(factory.mutateMax(column, key, value))
}

def nextVersion: Int = currentVersion.getAndIncrement()

def result: List[TransactionWrite] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ package dev.chopsticks.kvdb.codec
import dev.chopsticks.kvdb.codec.ValueDeserializer.{GenericValueDeserializationException, ValueDeserializationResult}
import cats.syntax.either._

trait ValueSerdes[T] extends ValueSerializer[T] with ValueDeserializer[T]
trait ValueSerdes[T] extends ValueSerializer[T] with ValueDeserializer[T] {
def bimap[A](to: T => A)(from: A => T): ValueSerdes[A] = {
val that = this
new ValueSerdes[A] {
override def deserialize(bytes: Array[Byte]): ValueDeserializationResult[A] = that.deserialize(bytes).map(to)
override def serialize(value: A): Array[Byte] = that.serialize(from(value))
}
}
}

object ValueSerdes {
def apply[T](implicit f: ValueSerdes[T]): ValueSerdes[T] = f
Expand Down Expand Up @@ -39,5 +47,6 @@ object ValueSerdes {

implicit val byteArrayValueSerdes: ValueSerdes[Array[Byte]] =
ValueSerdes.create[Array[Byte]](identity, bytes => Right(bytes))

implicit val unitValueSerdes: ValueSerdes[Unit] = ValueSerdes.create[Unit](_ => Array.emptyByteArray, _ => Right(()))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package dev.chopsticks.kvdb.codec

import java.nio.{ByteBuffer, ByteOrder}
import java.time.YearMonth

package object little_endian {
implicit val longValueSerdes: ValueSerdes[Long] =
ValueSerdes.create[Long](
value => ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(value).array(),
bytes => Right(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong())
)
implicit val yearMonthValueSerdes: ValueSerdes[YearMonth] = longValueSerdes.bimap[YearMonth](v => {
if (v == 0) YearMonth.of(0, 1)
else {
val year = v / 100
val month = v - year * 100
YearMonth.of(year.toInt, month.toInt)
}
})(ym => ym.getYear.toLong * 100 + ym.getMonthValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,29 @@ package dev.chopsticks.kvdb

import dev.chopsticks.kvdb.api.KvdbDatabaseApi

import java.time.YearMonth

object TestDatabase extends KvdbDefinition {
type TestDbCf[K, V] = BaseCf[K, V]

trait PlainCf extends TestDbCf[String, String]
trait LookupCf extends TestDbCf[String, String]
trait CounterCf extends TestDbCf[String, Long]
trait MinCf extends TestDbCf[String, YearMonth]
trait MaxCf extends TestDbCf[String, YearMonth]

sealed trait AnotherDbCf[K, V] extends ColumnFamily[K, V]
trait AnotherCf1 extends AnotherDbCf[String, String]
trait AnotherCf2 extends AnotherDbCf[String, Int]

type CfSet = PlainCf with LookupCf with CounterCf
type CfSet = PlainCf with LookupCf with CounterCf with MinCf with MaxCf

trait Materialization extends KvdbMaterialization[BaseCf, CfSet] {
def plain: PlainCf
def lookup: LookupCf
def counter: CounterCf
def min: MinCf
def max: MaxCf
}

type DbApi = KvdbDatabaseApi[TestDbCf]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,12 @@ final class ZFdbKeyspaceWriteApi[BCF[A, B] <: ColumnFamily[A, B], CF <: BCF[K, V
def mutateAdd(key: K, value: V): Unit = {
transact(operationFactory.mutateAdd(keyspace, key, value))
}

def mutateMin(key: K, value: V): Unit = {
transact(operationFactory.mutateMin(keyspace, key, value))
}

def mutateMax(key: K, value: V): Unit = {
transact(operationFactory.mutateMax(keyspace, key, value))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ final class FdbWriteApi[BCF[A, B] <: ColumnFamily[A, B]](

case TransactionMutateAdd(columnId, key, value) =>
tx.mutate(MutationType.ADD, dbContext.prefixKey(columnId, key), value)

case TransactionMutateMin(columnId, key, value) =>
tx.mutate(MutationType.MIN, dbContext.prefixKey(columnId, key), value)

case TransactionMutateMax(columnId, key, value) =>
tx.mutate(MutationType.MAX, dbContext.prefixKey(columnId, key), value)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,24 @@ package dev.chopsticks.kvdb.fdb

import dev.chopsticks.fp.ZAkkaApp.ZAkkaAppEnv
import dev.chopsticks.fp.iz_logging.IzLogging
import dev.chopsticks.kvdb.TestDatabase.{BaseCf, CfSet, CounterCf, LookupCf, PlainCf}
import dev.chopsticks.kvdb.codec.ValueSerdes
import dev.chopsticks.kvdb.TestDatabase.{BaseCf, CfSet, CounterCf, LookupCf, MaxCf, MinCf, PlainCf}
import dev.chopsticks.kvdb.util.KvdbIoThreadPool
import dev.chopsticks.kvdb.{ColumnFamilySet, KvdbDatabaseTest, TestDatabase}
import zio.{RManaged, ZIO, ZManaged}
import dev.chopsticks.kvdb.codec.primitive._

import java.nio.{ByteBuffer, ByteOrder}
import dev.chopsticks.kvdb.codec.little_endian.{longValueSerdes, yearMonthValueSerdes}
import java.util.UUID

object FdbDatabaseTest {
implicit val littleIndianLongValueSerdes: ValueSerdes[Long] =
ValueSerdes.create[Long](
value => ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(value).array(),
bytes => Right(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong())
)

object dbMaterialization extends TestDatabase.Materialization with FdbMaterialization[TestDatabase.BaseCf] {
object plain extends PlainCf
object lookup extends LookupCf
object counter extends CounterCf
object min extends MinCf
object max extends MaxCf

val columnFamilySet: ColumnFamilySet[BaseCf, CfSet] = {
ColumnFamilySet[BaseCf].of(plain).and(lookup).and(counter)
ColumnFamilySet[BaseCf].of(plain).and(lookup).and(counter).and(min).and(max)
}
//noinspection TypeAnnotation
override val keyspacesWithVersionstampKey = Set.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.scalatest.wordspec.AsyncWordSpecLike
import zio.{Promise, Task, UIO, ZIO, ZRef}

import java.nio.charset.StandardCharsets
import java.time.YearMonth
import java.util.concurrent.CompletableFuture
import java.util.concurrent.atomic.AtomicLong

Expand All @@ -27,6 +28,7 @@ final class SpecificFdbDatabaseTest

private lazy val defaultCf = dbMat.plain
private lazy val counterCf = dbMat.counter
private lazy val minCf = dbMat.min

private lazy val withDb =
createTestRunner(FdbDatabaseTest.managedDb) { effect =>
Expand Down Expand Up @@ -202,4 +204,64 @@ final class SpecificFdbDatabaseTest
}
}
}

"min mutation" should {
"work with YearMonth value" in withDb { db =>
for {
transaction <- UIO(ZFdbTransaction(db))
_ <- transaction.write { tx =>
Task {
val cf = tx.keyspace(minCf)

cf.mutateMin("foo", YearMonth.of(2024, 9))
cf.put("bar", YearMonth.of(2024, 6))
cf.mutateMin("bar", YearMonth.of(2024, 8))
}
}
currentFoo <- transaction
.read { tx =>
val cf = tx.keyspace(minCf)
cf.get(_ is "foo")
}
currentBar <- transaction
.read { tx =>
val cf = tx.keyspace(minCf)
cf.get(_ is "bar")
}
} yield {
currentFoo shouldEqual Some("foo" -> YearMonth.of(2024, 9))
currentBar shouldEqual Some("bar" -> YearMonth.of(2024, 6))
}
}
}

"max mutation" should {
"work with YearMonth value" in withDb { db =>
for {
transaction <- UIO(ZFdbTransaction(db))
_ <- transaction.write { tx =>
Task {
val cf = tx.keyspace(minCf)

cf.mutateMax("foo", YearMonth.of(2024, 9))
cf.put("bar", YearMonth.of(2024, 6))
cf.mutateMax("bar", YearMonth.of(2024, 8))
}
}
currentFoo <- transaction
.read { tx =>
val cf = tx.keyspace(minCf)
cf.get(_ is "foo")
}
currentBar <- transaction
.read { tx =>
val cf = tx.keyspace(minCf)
cf.get(_ is "bar")
}
} yield {
currentFoo shouldEqual Some("foo" -> YearMonth.of(2024, 9))
currentBar shouldEqual Some("bar" -> YearMonth.of(2024, 8))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,9 @@ final class LmdbDatabase[BCF[A, B] <: ColumnFamily[A, B], +CFS <: BCF[_, _]] pri
case TransactionDeleteRange(columnId, fromKey, toKey) =>
val _ =
doDeleteRange(txn, refs.getKvdbi(columnFamilyWithId(columnId).get), fromKey, toKey)
case _: TransactionMutateAdd =>
???
case _: TransactionMutateAdd => ???
case _: TransactionMutateMin => ???
case _: TransactionMutateMax => ???
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,27 @@ package dev.chopsticks.kvdb.lmdb

import dev.chopsticks.fp.ZAkkaApp.ZAkkaAppEnv
import dev.chopsticks.kvdb.KvdbDatabase.KvdbClientOptions
import dev.chopsticks.kvdb.TestDatabase.{BaseCf, CfSet, CounterCf, LookupCf, PlainCf}
import dev.chopsticks.kvdb.codec.ValueSerdes
import dev.chopsticks.kvdb.TestDatabase.{BaseCf, CfSet, CounterCf, LookupCf, MaxCf, MinCf, PlainCf}
import dev.chopsticks.kvdb.codec.little_endian.{longValueSerdes, yearMonthValueSerdes}
import dev.chopsticks.kvdb.util.{KvdbIoThreadPool, KvdbTestSuite}
import dev.chopsticks.kvdb.{ColumnFamilySet, KvdbDatabaseTest, TestDatabase}
import eu.timepit.refined.auto._
import eu.timepit.refined.types.string.NonEmptyString
import squants.information.InformationConversions._
import zio.ZManaged
import dev.chopsticks.kvdb.codec.primitive._

import java.nio.{ByteBuffer, ByteOrder}
import scala.concurrent.duration._

object LmdbDatabaseTest {
implicit val littleIndianLongValueSerdes: ValueSerdes[Long] =
ValueSerdes.create[Long](
value => ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(value).array(),
bytes => Right(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).getLong())
)

object dbMaterialization extends TestDatabase.Materialization {
object plain extends PlainCf
object lookup extends LookupCf
object counter extends CounterCf
object min extends MinCf
object max extends MaxCf

val columnFamilySet: ColumnFamilySet[BaseCf, CfSet] = {
ColumnFamilySet[BaseCf] of plain and lookup and counter
ColumnFamilySet[BaseCf] of plain and lookup and counter and min and max
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,9 @@ final class RocksdbDatabase[BCF[A, B] <: ColumnFamily[A, B], +CFS <: BCF[_, _]]
toKey
)

case _: TransactionMutateAdd =>
???
case _: TransactionMutateAdd => ???
case _: TransactionMutateMin => ???
case _: TransactionMutateMax => ???
}

val writeOptions = newWriteOptions()
Expand Down Expand Up @@ -829,6 +830,8 @@ final class RocksdbDatabase[BCF[A, B] <: ColumnFamily[A, B], +CFS <: BCF[_, _]]
)

case _: TransactionMutateAdd => ???
case _: TransactionMutateMin => ???
case _: TransactionMutateMax => ???
}

try {
Expand Down
Loading

0 comments on commit e2be969

Please sign in to comment.