From 01ea55a8b17fa870f995748cffe31ae280f71fca Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Thu, 30 May 2013 23:02:18 +0200 Subject: [PATCH] Added support for a configurable stand-alone REST server. --- README.md | 25 ++++----- TODO.md | 6 +++ .../crdt/{convergent => }/RestServer.scala | 52 +++++++++---------- ...ConvergentReplicatedDataTypeDatabase.scala | 4 +- .../akka/crdt/convergent/LevelDbStorage.scala | 2 +- 5 files changed, 47 insertions(+), 42 deletions(-) rename src/main/scala/akka/crdt/{convergent => }/RestServer.scala (90%) diff --git a/README.md b/README.md index a747a58..09261e3 100644 --- a/README.md +++ b/README.md @@ -25,13 +25,24 @@ CvRDTs are _state-based_ and do not require a fully reliable broadcast since eve CmRDT are _operations-based_ and do require a fully reliable broadcast since only the events are stored and a CmRDT is brought up to its current state by replaying the event log. This implementation is based on a persistent transaction log realised through the [eventsourced](https://github.com/eligosource/eventsourced) library. +## REST Server + +You can run the REST server in two different ways. + +1. Run it in stand-alone mode from the command line. Here you run it by invoking ``sbt run`` in the project root directory. You configure it through JVM options, which will override the default configuration values. Example: ``sbt run -Dakka.crdt.rest-server.port=9999`` + +2. Embedded in an Akka application. Just create the extension using ``val storage = ConvergentReplicatedDataTypeDatabase(system)`` and off you go. The REST server will be started automatically if the ``akka.crdt.rest-server.run`` is set to ``on``. + + +Each CRDT has a read-only JSON view representation which is used in the REST API for querying data. For details on the REST API and the different JSON views see the section with the different CRDT descriptions below. + ## Scala Server API You can create the ``ConvergentReplicatedDataTypeDatabase`` Extension like this (from within an actor): val storage = ConvergentReplicatedDataTypeDatabase(context.system) -Get (or create a new) CRDT ``g-counter`` by id: +Get (or create a new) CRDT by id: val nrOfUsers = storage.getOrCreate[GCounter]("users") @@ -43,20 +54,10 @@ Store the updated CRDT: storage update updatedNrOfUsers -Shut down the ``ConvergentReplicatedDataTypeDatabase`` Extension: +Shut down the database: storage.shutdown() - -Although it will also be shut down automatically when the actor system is shut down. -## REST Server - -The REST server will be started automatically by the ``ConvergentReplicatedDataTypeDatabase`` Extension if the ``akka.crdt.rest-server.run`` is set to ``on``. - -There is also a demo server you can run by invoking ``sbt run`` in the project root directory or by other means starting up the ``akka.crdt.convergent.DemoRestServer`` main class. If needed we can create fully fledged configurable command line server as well for those that just want a REST based CRDT storage and are not using it from within/alongside an Akka application. - -Each CRDT has a read-only JSON view representation which is used in the REST API for querying data. For details on the REST API and the different JSON views see the section with the different CRDT descriptions below. - ## Convergent Replicated Data Types (CvRDTs) **State-based** diff --git a/TODO.md b/TODO.md index 4996202..2f74b7a 100644 --- a/TODO.md +++ b/TODO.md @@ -22,6 +22,12 @@ Write docs for: * More background and concepts around CRDT, CAP and CALM * Explain difference between CvRDT and CmRDT +## Query API + +Support in both Scala and REST API: + +* def keys(): Set[String] +* def values(): Iterator[CRDT] ## CvRDTs (state-based) diff --git a/src/main/scala/akka/crdt/convergent/RestServer.scala b/src/main/scala/akka/crdt/RestServer.scala similarity index 90% rename from src/main/scala/akka/crdt/convergent/RestServer.scala rename to src/main/scala/akka/crdt/RestServer.scala index 30517ff..0cc7297 100644 --- a/src/main/scala/akka/crdt/convergent/RestServer.scala +++ b/src/main/scala/akka/crdt/RestServer.scala @@ -2,41 +2,33 @@ * Copyright (C) 2009-2013 Typesafe Inc. */ -package akka.crdt.convergent +package akka.crdt -import com.typesafe.config.ConfigFactory +import akka.crdt.convergent._ import akka.actor._ import scala.concurrent.{ Future, future, ExecutionContext } import scala.util.{ Success, Failure } import scala.util.control.NonFatal -import play.api.libs.json.{ Json, JsValue } -import Json.{ toJson, parse, stringify } +import play.api.libs.json.Json +import play.api.libs.json.Json.{ toJson, parse, stringify } +import com.typesafe.config.ConfigFactory import unfiltered.Async import unfiltered.request._ import unfiltered.response._ import unfiltered.netty._ import unfiltered.util._ -import QParams._ +import unfiltered.request.QParams._ /** - * Used to run as a main server in demos etc. Starts up on port 9000 on 0.0.0.0. + * Main REST server. Starts up on port 9000 on 0.0.0.0 by default. Configure it to run on other port and address. * - * POST: - *
- * 		curl -i -H "Accept: application/json" -X POST -d "node=darkstar" -d "delta=1" http://127.0.0.1:9000/g-counter/jonas
- * 
- * - * GET: - *
- * 		curl -i -H "Accept: application/json" http://127.0.0.1:9000/g-counter/jonas
- * 
+ * Run using ``sbt run -Dakka.crdt.rest-server.port=9999`` + * or as a regular main class ``java -Dakka.crdt.rest-server.port=9999 -cp ... akka.crdt.RestServer``. */ -object DemoRestServer { +object RestServer { def main(args: Array[String]): Unit = { - - // FIXME make config configurable for RestServer - even if it is only for demo purposes, to be able to change ports, debug level etc. - val config = ConfigFactory.parseString(""" + val config = ConfigFactory.defaultOverrides.withFallback(ConfigFactory.parseString(""" akka { actor.provider = akka.cluster.ClusterActorRefProvider loglevel = INFO @@ -55,21 +47,25 @@ object DemoRestServer { port = 9000 } } - """) + """)) + val system = ActorSystem("crdt", config) val storage = ConvergentReplicatedDataTypeDatabase(system) + println(s""" - ===================================================================================== - ★ ★ ★ CRDT Database Server listening on port: ${config.getInt("akka.crdt.rest-server.port")}. Press any key to exit... ★ ★ ★ - =====================================================================================""") - System.in.read() - storage.shutdown() - system.shutdown() + ======================================================================================= + ★ ★ ★ CRDT Database Server listening on port: ${config.getInt("akka.crdt.rest-server.port")}. Press Control-C to exit... ★ ★ ★ + =======================================================================================""") + + Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { + def run = { + storage.shutdown() + system.shutdown() + } + })) } } -// FIXME Create a generic configurable server to run in Akka Microkernel - /** * Rest server for CRDT storage. * diff --git a/src/main/scala/akka/crdt/convergent/ConvergentReplicatedDataTypeDatabase.scala b/src/main/scala/akka/crdt/convergent/ConvergentReplicatedDataTypeDatabase.scala index 50e7701..030eddd 100644 --- a/src/main/scala/akka/crdt/convergent/ConvergentReplicatedDataTypeDatabase.scala +++ b/src/main/scala/akka/crdt/convergent/ConvergentReplicatedDataTypeDatabase.scala @@ -18,6 +18,7 @@ import scala.concurrent.duration._ import java.util.UUID import akka.cluster.ClusterEvent._ import akka.cluster.Member +import akka.crdt.RestServer object ConvergentReplicatedDataTypeDatabase extends ExtensionId[ConvergentReplicatedDataTypeDatabase] @@ -125,12 +126,13 @@ class ConvergentReplicatedDataTypeDatabase(sys: ExtendedActorSystem) extends Ext } def shutdown(): Unit = { - log.info("Shutting down ConvergentReplicatedDataTypeDatabase") + log.info("Shutting down ConvergentReplicatedDataTypeDatabase...") restServer foreach { _.shutdown() } system.stop(subscriber) system.stop(publisher) system.stop(clusterListener) storage.destroy() + log.info("ConvergentReplicatedDataTypeDatabase shut down successfully") } private def publish(json: JsValue): Unit = publisher ! json diff --git a/src/main/scala/akka/crdt/convergent/LevelDbStorage.scala b/src/main/scala/akka/crdt/convergent/LevelDbStorage.scala index 25e2590..01cfdcc 100644 --- a/src/main/scala/akka/crdt/convergent/LevelDbStorage.scala +++ b/src/main/scala/akka/crdt/convergent/LevelDbStorage.scala @@ -103,7 +103,7 @@ class LevelDbStorage( } override def destroy(): Unit = { - log.info("Destroying LevelDB storage") + log.info("Destroying LevelDB storage(s)") databases foreach { case (filename, _) ⇒ factory.destroy(new File(filename), new Options)