Skip to content

Commit

Permalink
Added support for a configurable stand-alone REST server.
Browse files Browse the repository at this point in the history
  • Loading branch information
jboner committed May 30, 2013
1 parent ad865a1 commit 01ea55a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 42 deletions.
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,24 @@ CvRDTs are _state-based_ and do not require a fully reliable broadcast since eve

CmRDT are _operations-based_ and do require a fully reliable broadcast since only the events are stored and a CmRDT is brought up to its current state by replaying the event log. This implementation is based on a persistent transaction log realised through the [eventsourced](https://github.com/eligosource/eventsourced) library.

## REST Server

You can run the REST server in two different ways.

1. Run it in stand-alone mode from the command line. Here you run it by invoking ``sbt run`` in the project root directory. You configure it through JVM options, which will override the default configuration values. Example: ``sbt run -Dakka.crdt.rest-server.port=9999``

2. Embedded in an Akka application. Just create the extension using ``val storage = ConvergentReplicatedDataTypeDatabase(system)`` and off you go. The REST server will be started automatically if the ``akka.crdt.rest-server.run`` is set to ``on``.


Each CRDT has a read-only JSON view representation which is used in the REST API for querying data. For details on the REST API and the different JSON views see the section with the different CRDT descriptions below.

## Scala Server API

You can create the ``ConvergentReplicatedDataTypeDatabase`` Extension like this (from within an actor):

val storage = ConvergentReplicatedDataTypeDatabase(context.system)

Get (or create a new) CRDT ``g-counter`` by id:
Get (or create a new) CRDT by id:

val nrOfUsers = storage.getOrCreate[GCounter]("users")

Expand All @@ -43,20 +54,10 @@ Store the updated CRDT:

storage update updatedNrOfUsers

Shut down the ``ConvergentReplicatedDataTypeDatabase`` Extension:
Shut down the database:

storage.shutdown()

Although it will also be shut down automatically when the actor system is shut down.

## REST Server

The REST server will be started automatically by the ``ConvergentReplicatedDataTypeDatabase`` Extension if the ``akka.crdt.rest-server.run`` is set to ``on``.

There is also a demo server you can run by invoking ``sbt run`` in the project root directory or by other means starting up the ``akka.crdt.convergent.DemoRestServer`` main class. If needed we can create fully fledged configurable command line server as well for those that just want a REST based CRDT storage and are not using it from within/alongside an Akka application.

Each CRDT has a read-only JSON view representation which is used in the REST API for querying data. For details on the REST API and the different JSON views see the section with the different CRDT descriptions below.

## Convergent Replicated Data Types (CvRDTs)

**State-based**
Expand Down
6 changes: 6 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ Write docs for:
* More background and concepts around CRDT, CAP and CALM
* Explain difference between CvRDT and CmRDT

## Query API

Support in both Scala and REST API:

* def keys(): Set[String]
* def values(): Iterator[CRDT]

## CvRDTs (state-based)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,33 @@
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.crdt.convergent
package akka.crdt

import com.typesafe.config.ConfigFactory
import akka.crdt.convergent._
import akka.actor._
import scala.concurrent.{ Future, future, ExecutionContext }
import scala.util.{ Success, Failure }
import scala.util.control.NonFatal
import play.api.libs.json.{ Json, JsValue }
import Json.{ toJson, parse, stringify }
import play.api.libs.json.Json
import play.api.libs.json.Json.{ toJson, parse, stringify }
import com.typesafe.config.ConfigFactory
import unfiltered.Async
import unfiltered.request._
import unfiltered.response._
import unfiltered.netty._
import unfiltered.util._
import QParams._
import unfiltered.request.QParams._

/**
* Used to run as a main server in demos etc. Starts up on port 9000 on 0.0.0.0.
* Main REST server. Starts up on port 9000 on 0.0.0.0 by default. Configure it to run on other port and address.
*
* POST:
* <pre>
* curl -i -H "Accept: application/json" -X POST -d "node=darkstar" -d "delta=1" http://127.0.0.1:9000/g-counter/jonas
* </pre>
*
* GET:
* <pre>
* curl -i -H "Accept: application/json" http://127.0.0.1:9000/g-counter/jonas
* </pre>
* Run using ``sbt run -Dakka.crdt.rest-server.port=9999``
* or as a regular main class ``java -Dakka.crdt.rest-server.port=9999 -cp ... akka.crdt.RestServer``.
*/
object DemoRestServer {
object RestServer {

def main(args: Array[String]): Unit = {

// FIXME make config configurable for RestServer - even if it is only for demo purposes, to be able to change ports, debug level etc.
val config = ConfigFactory.parseString("""
val config = ConfigFactory.defaultOverrides.withFallback(ConfigFactory.parseString("""
akka {
actor.provider = akka.cluster.ClusterActorRefProvider
loglevel = INFO
Expand All @@ -55,21 +47,25 @@ object DemoRestServer {
port = 9000
}
}
""")
"""))

val system = ActorSystem("crdt", config)
val storage = ConvergentReplicatedDataTypeDatabase(system)

println(s"""
=====================================================================================
★ ★ ★ CRDT Database Server listening on port: ${config.getInt("akka.crdt.rest-server.port")}. Press any key to exit... ★ ★ ★
=====================================================================================""")
System.in.read()
storage.shutdown()
system.shutdown()
=======================================================================================
★ ★ ★ CRDT Database Server listening on port: ${config.getInt("akka.crdt.rest-server.port")}. Press Control-C to exit... ★ ★ ★
=======================================================================================""")

Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
def run = {
storage.shutdown()
system.shutdown()
}
}))
}
}

// FIXME Create a generic configurable server to run in Akka Microkernel

/**
* Rest server for CRDT storage.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import scala.concurrent.duration._
import java.util.UUID
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.crdt.RestServer

object ConvergentReplicatedDataTypeDatabase
extends ExtensionId[ConvergentReplicatedDataTypeDatabase]
Expand Down Expand Up @@ -125,12 +126,13 @@ class ConvergentReplicatedDataTypeDatabase(sys: ExtendedActorSystem) extends Ext
}

def shutdown(): Unit = {
log.info("Shutting down ConvergentReplicatedDataTypeDatabase")
log.info("Shutting down ConvergentReplicatedDataTypeDatabase...")
restServer foreach { _.shutdown() }
system.stop(subscriber)
system.stop(publisher)
system.stop(clusterListener)
storage.destroy()
log.info("ConvergentReplicatedDataTypeDatabase shut down successfully")
}

private def publish(json: JsValue): Unit = publisher ! json
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/akka/crdt/convergent/LevelDbStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class LevelDbStorage(
}

override def destroy(): Unit = {
log.info("Destroying LevelDB storage")
log.info("Destroying LevelDB storage(s)")
databases foreach {
case (filename, _)
factory.destroy(new File(filename), new Options)
Expand Down

0 comments on commit 01ea55a

Please sign in to comment.