Skip to content

Commit

Permalink
Applying code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Ievgenii Shepeliuk committed Sep 10, 2017
1 parent 39b1b76 commit 3345e96
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 63 deletions.
53 changes: 53 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
= ConstructR-ZooKeeper

image:https://travis-ci.org/typesafehub/constructr-zookeeper.svg?branch=master[Build Status,link=https://travis-ci.org/typesafehub/constructr-zookeeper]

This library enables to use https://zookeeper.apache.org/[ZooKeeper] as cluster coordinator in a https://github.com/hseeberger/constructr[ConstructR] based node.

https://github.com/hseeberger/constructr[ConstructR] aims at cluster bootstrapping (construction) by using a coordination service and provides etcd as the default one. By means of this library, you will be able to use https://zookeeper.apache.org/[ZooKeeper] as coordination service instead.

You will need to add the following dependency in your `build.sbt` in addition to the core ConstructR ones:

[source]
----
libraryDependencies += "com.lightbend.constructr" %% "constructr-coordination-zookeeper" % "0.3.3"
----

== Configuration

Check https://github.com/hseeberger/constructr#coordination[this section] in ConstructR for general information about configuration.

Check link:constructr-coordination-zookeeper/src/main/resources/reference.conf[reference.conf] for ZooKeeper related configuration.

=== Configuring ZK cluster nodes

The default configuration tries to establish a connection to ZooKeeper on `localhost:2181`.

Override the `constructr.coordination.nodes` configuration to specify another ZooKeeper node:

[source]
----
constructr.coordination.nodes = ["10.10.10.10:2181"]
----

The format per node `ip:port`.

You are also able to connect to a multi-node cluster by specifying multiple nodes, separated by a comma:

[source]
----
constructr.coordination.nodes = ["10.10.10.10:2181", "10.10.10.11:2181", "10.10.10.12:2181"]
----

Additionally, comma separated connection string format is supported

[source]
----
constructr.coordination.nodes = "10.10.10.10:2181,10.10.10.11:2181,10.10.10.12:2181"
----

== Testing

Run tests:

$ sbt test
45 changes: 0 additions & 45 deletions README.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec}
import akka.stream.ActorMaterializer
import akka.testkit.TestDuration
import akka.util.Timeout
import com.lightbend.constructr.coordination.zookeeper.ZookeeperNodes.Nodes
import com.typesafe.config.ConfigFactory
import de.heikoseeberger.constructr.ConstructrExtension
import de.heikoseeberger.constructr.coordination.Coordination
Expand Down Expand Up @@ -68,7 +69,7 @@ abstract class MultiNodeZookeeperConstructrBaseSpec(coordinationPort: Int, clust
implicit val mat: ActorMaterializer = ActorMaterializer()

private val zookeeperClient = CuratorFrameworkFactory.builder()
.connectString(system.settings.config.getString("constructr.coordination.nodes"))
.connectString(system.settings.config.getString(Nodes))
.retryPolicy(new RetryNTimes(0, 0))
.build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import scala.concurrent.{ Future, blocking }
private object ZookeeperCoordination {

object Converters {

implicit class InstantOps(instant: Instant) {
def encode: Array[Byte] = {
val bytes = java.nio.ByteBuffer.allocate(java.lang.Long.BYTES).putLong(instant.toEpochMilli).array()
Expand Down Expand Up @@ -70,9 +69,7 @@ private object ZookeeperCoordination {
def decodeNode: Address =
AddressFromURIString(new String(Base64.getUrlDecoder.decode(s), UTF_8))
}

}

}

/**
Expand All @@ -88,14 +85,14 @@ private object ZookeeperCoordination {
* The TTL in milliseconds represents the time elapsed since 1970-01-01T00:00:00 UTC.
* Because TTL value is always converted into the UTC time zone, it can be safely used across different time zones.
*/
final class ZookeeperCoordination(clusterName: String, actorSystem: ActorSystem) extends Coordination with ZookeeperNodes {
final class ZookeeperCoordination(clusterName: String, system: ActorSystem) extends Coordination with ZookeeperNodes {

import ZookeeperCoordination.Converters._

private implicit val ec = actorSystem.dispatcher
private implicit val ec = system.dispatcher

private val RootPath =
actorSystem.settings.config.getString("constructr.coordination.zookeeper.rootpath")
system.settings.config.getString("constructr.coordination.zookeeper.rootpath")

private val BasePath = s"$RootPath/$clusterName"
private val NodesPath = s"$BasePath/nodes"
Expand All @@ -105,7 +102,7 @@ final class ZookeeperCoordination(clusterName: String, actorSystem: ActorSystem)

private val client =
CuratorFrameworkFactory.builder()
.connectString(nodesConnectionString(actorSystem))
.connectString(nodesConnectionString(system))
.retryPolicy(new RetryNTimes(0, 0))
.build()

Expand All @@ -114,11 +111,11 @@ final class ZookeeperCoordination(clusterName: String, actorSystem: ActorSystem)

private def run(): Unit = {
def shutdown(): Unit = {
actorSystem.log.info("Zookeeper client closes connection to nodes [{}]..", nodesConnectionString(actorSystem))
system.log.info("Zookeeper client closes connection to nodes [{}]..", nodesConnectionString(system))
client.close()
}

actorSystem.log.info("Zookeeper client tries to establish a connection to nodes [{}]..", nodesConnectionString(actorSystem))
system.log.info("Zookeeper client tries to establish a connection to nodes [{}]..", nodesConnectionString(system))
client.start()
client.blockUntilConnected()
sys.addShutdownHook(shutdown())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,28 @@
package com.lightbend.constructr.coordination.zookeeper

import akka.actor.ActorSystem
import com.lightbend.constructr.coordination.zookeeper.ZookeeperNodes.Nodes
import com.typesafe.config.ConfigException.WrongType

import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.util.Try

object ZookeeperNodes {
val Nodes : String = "constructr.coordination.nodes"
}

/**
* Helper for extracting Zookeeper nodes configuration from {@link akka.actor.ActorSystem ActorSystem} settings.
*
* First, tries to get comma-saparated list of nodes from `String` settings,
* if not found then falls back to parsing `List` of strings.
*/
trait ZookeeperNodes {
def nodesConnectionString(actorSystem: ActorSystem): String = {
def nodesConnectionString(system: ActorSystem): String = {
Try(
actorSystem.settings.config.getString("constructr.coordination.nodes")
system.settings.config.getString(Nodes)
).recover {
case ex: WrongType => actorSystem.settings.config.getStringList("constructr.coordination.nodes").asScala.mkString(",")
case ex: WrongType => system.settings.config.getStringList(Nodes).asScala.mkString(",")
}.get
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package com.lightbend.constructr.coordination.zookeeper

import akka.actor.ActorSystem
import com.lightbend.constructr.coordination.zookeeper.ZookeeperNodes.Nodes
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
import org.scalatest.{Matchers, WordSpec}

class ZookeeperNodesSpec extends WordSpec with Matchers with ZookeeperNodes {

Expand All @@ -31,9 +32,9 @@ class ZookeeperNodesSpec extends WordSpec with Matchers with ZookeeperNodes {
)
val actorSystem = ActorSystem("default", config)

config.getStringList("constructr.coordination.nodes").size() shouldBe 2
config.getStringList("constructr.coordination.nodes") should contain("host1:2181")
config.getStringList("constructr.coordination.nodes") should contain("host2:2181")
config.getStringList(Nodes).size() shouldBe 2
config.getStringList(Nodes) should contain("host1:2181")
config.getStringList(Nodes) should contain("host2:2181")

nodesConnectionString(actorSystem) shouldBe "host1:2181,host2:2181"
}
Expand All @@ -46,7 +47,7 @@ class ZookeeperNodesSpec extends WordSpec with Matchers with ZookeeperNodes {
)
val actorSystem = ActorSystem("default", config)

config.getString("constructr.coordination.nodes") shouldBe "host1:2181,host2:2181"
config.getString(Nodes) shouldBe "host1:2181,host2:2181"
nodesConnectionString(actorSystem) shouldBe "host1:2181,host2:2181"
}
}
Expand Down

0 comments on commit 3345e96

Please sign in to comment.