Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Add support for task operations (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
jwang47 authored and Andrew Stevenson committed Dec 21, 2017
1 parent 333c3d4 commit 25899b9
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 63 deletions.
123 changes: 83 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ dealing with configuration expect or produce data in .properties
style: `key=value` lines and comments start with a `#`.

connect-cli 1.0.5
Usage: connect-cli [ps|get|rm|create|run|diff|status|plugins|describe|validate|restart|pause|resume] [options] [<connector-name>]
Usage: connect-cli [ps|get|rm|create|run|diff|status|plugins|describe|validate|restart|pause|resume] [options] [<connector-name>] [<task-id>]

--help
prints this usage text
Expand All @@ -37,46 +37,54 @@ style: `key=value` lines and comments start with a `#`.
-f <value> | --format <value>
Format of the config, default is PROPERTIES. Valid options are 'properties' and 'json'.

Command: ps
list active connectors names.

Command: get
get the configuration of the specified connector.

Command: rm
remove the specified connector.

Command: create
create the specified connector with the config from stdin; the connector cannot already exist.

Command: run
create or update the specified connector with the config from stdin.

Command: diff
diff the specified connector with the config from stdin.

Command: status
get connector and it's task(s) state(s).

Command: plugins
list the available connector class plugins on the classpath.

Command: describe
list the configurations for a connector class plugin on the classpath.

Command: pause
pause the specified connector.

Command: restart
restart the specified connector.

Command: resume
resume the specified connector.
Command: ps
list active connectors names.
Command: get
get the configuration of the specified connector.
Command: rm
remove the specified connector.
Command: create
create the specified connector with the config from stdin; the connector cannot already exist.
Command: run
create or update the specified connector with the config from stdin.
Command: diff
diff the specified connector with the config from stdin.
Command: status
get connector and it's task(s) state(s).
Command: plugins
list the available connector class plugins on the classpath.
Command: describe
list the configurations for a connector class plugin on the classpath.
Command: pause
pause the specified connector.
Command: restart
restart the specified connector.
Command: resume
resume the specified connector.
Command: validate
validate the connector config from stdin against a connector class plugin on the classpath.
Command: task_ps
list the tasks belonging to a connector.
Command: task_status
get the status of a connector task.
Command: task_restart
restart the specified connector task.

Command: validate
validate the connector config from stdin against a connector class plugin on the classpath.
You can override the default endpoint by setting an environment variable `KAFKA_CONNECT_REST` i.e.

export KAFKA_CONNECT_REST="http://myserver:myport"
Expand Down Expand Up @@ -364,6 +372,41 @@ Example:
taskState: RUNNING
workerId: 10.0.0.9:8083

List all Tasks of a Connector
-----------------------------

Command: `task_ps`

Example:

bin/connect-cli tasks_ps cassandra-sink
- cassandra-sink task 0
connector.class: com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
bootstrap.servers: kafka-broker1:6667,kafka-broker2:6667,kafka-broker3:6667
producer.schema.registry.url:http://schema-registry:8081
...
Get the status of a Connector Task
----------------------------------

Command: `task_status`

Example:

bin/connect-cli task_status cassandra-sink 0
taskId: 0
taskState: RUNNING
workerId: 10.0.0.9:8083

Restart a Connector Task
------------------------

Command: `task_restart`

Example:

bin/connect-cli task_restart cassandra-sink 0

Misc
====

Expand Down
20 changes: 18 additions & 2 deletions src/main/scala/com/datamountaineer/connect/tools/Cli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import scala.collection.JavaConverters
/** Enumeration of CLI commands */
object AppCommand extends Enumeration {
type AppCommand = Value
val NONE, LIST_ACTIVE, GET, DELETE, CREATE, RUN, DIFF, STATUS, PLUGINS, DESCRIBE, RESTART, PAUSE, RESUME, VALIDATE = Value
val NONE, LIST_ACTIVE, GET, DELETE, CREATE, RUN, DIFF, STATUS, PLUGINS, DESCRIBE, RESTART, PAUSE, RESUME, VALIDATE,
TASK_LIST, TASK_STATUS, TASK_RESTART = Value
}
import com.datamountaineer.connect.tools.AppCommand._

Expand All @@ -37,8 +38,11 @@ object Defaults {
* @param url the url of the REST service, defaults to Defaults.BaseUrl
* @param format the format of the config, defaults to Defaults.Format (can be "PROPERTIES" or "JSON")
* @param connectorName an optional connector name that is the subject of the command
* @param taskId an optional task id that is the subject of the command
*/
case class Arguments(cmd: AppCommand = NONE, url: String = Defaults.BaseUrl, format: Formats = Defaults.Format, connectorName: Option[String] = None)
case class Arguments(cmd: AppCommand = NONE, url: String = Defaults.BaseUrl, format: Formats = Defaults.Format,
connectorName: Option[String] = None,
taskId: Option[Int] = None)

/** Performs the action contained in the Arguments on RestKafkaConnectApi */
object ExecuteCommand {
Expand All @@ -55,6 +59,7 @@ object ExecuteCommand {
val fmt = new PropertiesFormatter()
val cmd = cfg.cmd
lazy val connectorName = cfg.connectorName.get
lazy val taskId = cfg.taskId.get

lazy val configuration = coherentConfig(configToMap(allStdIn.toSeq, cfg.format), connectorName, cmd)

Expand All @@ -72,6 +77,9 @@ object ExecuteCommand {
case PAUSE => api.connectorPause(connectorName).map(fmt.connectorStatus).map(Some(_))
case RESTART => api.connectorRestart(connectorName).map(fmt.connectorStatus).map(Some(_))
case RESUME => api.connectorResume(connectorName).map(fmt.connectorStatus).map(Some(_))
case TASK_LIST => api.tasks(connectorName).map(fmt.tasks).map(Some(_))
case TASK_STATUS => api.taskStatus(connectorName, taskId).map(fmt.taskStatus).map(Some(_))
case TASK_RESTART => api.taskRestart(connectorName, taskId).map(_ => None)
}
res.recover{
case ApiErrorException(e) => Some(e)
Expand Down Expand Up @@ -170,14 +178,22 @@ object Cli {
cmd("restart") action { (_,c) => c.copy(cmd = RESTART) } text "restart the specified connector.\n" children()
cmd("resume") action { (_,c) => c.copy(cmd = RESUME) } text "resume the specified connector.\n" children()
cmd("validate") action { (_,c) => c.copy(cmd = VALIDATE) } text "validate the connector config from stdin against a connector class plugin on the classpath.\n" children()
cmd("task_ps") action { (_,c) => c.copy(cmd = TASK_LIST) } text "list the tasks belonging to a connector.\n" children()
cmd("task_status") action { (_,c) => c.copy(cmd = TASK_STATUS) } text "get the status of a connector task.\n" children()
cmd("task_restart") action { (_,c) => c.copy(cmd = TASK_RESTART) } text "restart the specified connector task.\n" children()

arg[String]("<connector-name>") optional() action { (x, c) =>
c.copy(connectorName = Some(x))
} text ("connector name")

arg[Int]("<task-id>") optional() action { (x, c) =>
c.copy(taskId = Some(x))
} text ("task id")

checkConfig { c =>
if (c.cmd == NONE) failure("Command expected.")
else if ((c.cmd != LIST_ACTIVE && c.cmd != PLUGINS && c.cmd != DESCRIBE) && c.connectorName.isEmpty) failure("Please specify the connector-name")
else if ((c.cmd == TASK_LIST || c.cmd == TASK_STATUS || c.cmd == TASK_RESTART) && c.connectorName.isEmpty) failure("Please specify the task-id")
else success
}
}.parse(args, Arguments())
Expand Down
4 changes: 4 additions & 0 deletions src/main/scala/com/datamountaineer/connect/tools/Domain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ case class ConnectorStatus(state:String,worker_id:String, trace:Option[String])
case class TaskStatus(id:Int, state:String,worker_id:String,trace:Option[String])
case class ConnectorTaskStatus(name:String, connector: ConnectorStatus, tasks: List[TaskStatus])
case class ConnectorPlugins(`class` :String, `type`: String, version: Option[String])
case class TaskInfo(id:TaskId, config:Map[String,String])
case class TaskId(connector:String, task:Int)

case class Definition(name: String, `type`: String, required: Boolean, default_value: Option[String],
importance: Option[String], group: Option[String], display_name: Option[String],
Expand All @@ -35,6 +37,8 @@ object MyJsonProtocol extends DefaultJsonProtocol {
implicit val errormsg = jsonFormat2(ErrorMessage)
implicit val connectorstatus = jsonFormat3(ConnectorStatus)
implicit val taskstatus = jsonFormat4(TaskStatus)
implicit val taskid = jsonFormat2(TaskId)
implicit val taskinfo = jsonFormat2(TaskInfo)
implicit val connectortaskstatus = jsonFormat3(ConnectorTaskStatus)
implicit val connectorplugins = jsonFormat3(ConnectorPlugins)
implicit val values = jsonFormat5(Values)
Expand Down
30 changes: 26 additions & 4 deletions src/main/scala/com/datamountaineer/connect/tools/Formatters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ trait Formatter {
def connectorStatus(s:ConnectorTaskStatus): String
def connectorPlugins(s: Seq[ConnectorPlugins]) : String
def connectorPluginsValidate(s: ConnectorPluginsValidate, validate: Boolean, props: Map[String, String] = Map.empty) : String
def tasks(tasks: List[TaskInfo]): String
def taskStatus(t:TaskStatus): String
}

/** A collection of methods that translate the output of the API into a string representation that pleases the human eye. */
Expand All @@ -57,7 +59,8 @@ class HumanFormatter extends Formatter {
def connectorStatus(s:ConnectorTaskStatus): String = ???
def connectorPlugins(s: Seq[ConnectorPlugins]): String = ???
def connectorPluginsValidate(s: ConnectorPluginsValidate, validate: Boolean, props: Map[String, String] = Map.empty) : String = ???

def tasks(tasks: List[TaskInfo]): String = ???
def taskStatus(t:TaskStatus): String = ???
}

/** A collection of methods that translate the output of the API into a string representation that is compatible with the .properties format. */
Expand Down Expand Up @@ -100,24 +103,43 @@ class PropertiesFormatter extends Formatter {
|#entries only in provided config:
|${diff.entriesOnlyOnRight.asScala.mkString("\n")}""".stripMargin

def trace(t:Option[String], indent:String="") =
private def trace(t:Option[String], indent:String="") =
t match {
case Some(trace) => s"${indent}trace: ${Console.RED} ${trace}\n ${Console.RESET}"
case None => ""
}

def taskStatus(t:TaskStatus) =
override def taskStatus(t:TaskStatus) =
s"taskId: ${t.id}\n" +
s"taskState: ${if (t.state.equals("RUNNING")) Console.GREEN else Console.RED}${t.state}${Console.RESET}\n" + trace(t.trace," ") +
s"workerId: ${t.worker_id}\n"

private def childTaskStatus(t:TaskStatus) =
s" - taskId: ${t.id}\n" +
s" taskState: ${if (t.state.equals("RUNNING")) Console.GREEN else Console.RED}${t.state}${Console.RESET}\n" + trace(t.trace," ") +
s" workerId: ${t.worker_id}\n"

private def childTaskInfo(t:TaskInfo) =
s" - ${t.id.connector} task ${t.id.task}\n" +
s"${childTaskInfoConfig(t.config)}\n\n"

private def childTaskInfoConfig(config: Map[String, String]) =
config.map(e => (" " + e._1, e._2))
.map(_.productIterator.mkString(": "))
.mkString("\n")

override def tasks(tasks: List[TaskInfo]) =
s"""
|${tasks.map(childTaskInfo).mkString("")}
""".stripMargin

override def connectorStatus(s:ConnectorTaskStatus): String =
s"connectorState: ${if (s.connector.state.equals("RUNNING")) Console.GREEN else Console.RED} ${s.connector.state}${Console.RESET}\n"+
s"workerId: ${s.connector.worker_id}\n" +
trace(s.connector.trace) +
s"numberOfTasks: ${s.tasks.length}\n"+
s"tasks:\n"+
s"${s.tasks.map(taskStatus).mkString("")}"
s"${s.tasks.map(childTaskStatus).mkString("")}"

override def connectorPlugins(s: Seq[ConnectorPlugins]): String = {
s.map(s => s"Class name: ${s.`class`}, Type: ${s.`type`}, Version: ${s.version.getOrElse("")}").mkString("\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ trait KafkaConnectApi {
def connectorPause(name: String) : Try[ConnectorTaskStatus]
def connectorRestart(name: String) : Try[ConnectorTaskStatus]
def connectorResume(name: String) : Try[ConnectorTaskStatus]

def taskRestart(connector: String, taskId: Int) : Try[Unit]
}

/** Kafka Connect Api interface */
Expand Down Expand Up @@ -206,7 +208,7 @@ class RestKafkaConnectApi(baseUrl: java.net.URI, httpClient: HttpClient = Scalaj
def diffConnector(name: String, config: Map[String,String]) : Try[(Map[String, String], Map[String, String], MapDifference[String,String])] = {
println("Validating connector properties before diffing")
connectorPluginsValidate(name, config)
println(s"Connector properties valid. Diffing connector $name against provided config")
println(s"Connector properties valid. Diffing connector ${name} against provided config")
connectorInfo(name).map(info => {
(info.config, config, Maps.difference[String,String](info.config.asJava, config.asJava))
})
Expand Down Expand Up @@ -275,4 +277,19 @@ class RestKafkaConnectApi(baseUrl: java.net.URI, httpClient: HttpClient = Scalaj
Try(req[ConnectorTaskStatus](s"/connectors/${name}/status").get)
}

def tasks(connector: String): Try[List[TaskInfo]] = {
import MyJsonProtocol._
Try(req[List[TaskInfo]](s"/connectors/$connector/tasks").get)
}

def taskStatus(connector: String, taskId: Int): Try[TaskStatus] = {
import MyJsonProtocol._
Try(req[TaskStatus](s"/connectors/$connector/tasks/$taskId/status").get)
}

def taskRestart(connector: String, taskId: Int): Try[Unit] = {
import MyJsonProtocol._
Try(req[Unit](s"/connectors/$connector/tasks/$taskId/restart", "POST"))
}

}
Loading

0 comments on commit 25899b9

Please sign in to comment.