Skip to content

Commit

Permalink
Merge pull request #532 from kingmorning/master
Browse files Browse the repository at this point in the history
I added sasl.mechanism, and sasl.jaas.config for each cluster. and fix some bugs [#528, #513,#502,#479,#477,#471]
  • Loading branch information
patelh authored Jul 28, 2018
2 parents cc17614 + f779d1f commit e27328c
Show file tree
Hide file tree
Showing 20 changed files with 201 additions and 51 deletions.
22 changes: 21 additions & 1 deletion app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
case Success(_) => Valid
}
}
val validateSASLmechanism: Constraint[Option[String]] = Constraint("validate SASL mechanism") { stringOption =>
Try {
stringOption.foreach(SASLmechanism.from)
} match {
case Failure(t) => Invalid(t.getMessage)
case Success(_) => Valid
}
}

val clusterConfigForm = Form(
mapping(
Expand Down Expand Up @@ -114,6 +122,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
, "saslMechanism" -> optional(text).verifying(validateSASLmechanism)
, "jaasConfig" -> optional(text)
)(ClusterConfig.apply)(ClusterConfig.customUnapply)
)

Expand Down Expand Up @@ -156,6 +166,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
, "saslMechanism" -> optional(text).verifying(validateSASLmechanism)
, "jaasConfig" -> optional(text)
)(ClusterOperation.apply)(ClusterOperation.customUnapply)
)

Expand All @@ -176,6 +188,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
,false
,Option(defaultTuning)
,PLAINTEXT
,None
,None
)
}

Expand Down Expand Up @@ -223,7 +237,9 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
cc.activeOffsetCacheEnabled,
cc.displaySizeEnabled,
cc.tuning,
cc.securityProtocol.stringId
cc.securityProtocol.stringId,
cc.saslMechanism.map(_.stringId),
cc.jaasConfig
))
}))
}
Expand All @@ -247,6 +263,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
clusterConfig.filterConsumers,
clusterConfig.tuning,
clusterConfig.securityProtocol.stringId,
clusterConfig.saslMechanism.map(_.stringId),
clusterConfig.jaasConfig,
clusterConfig.logkafkaEnabled,
clusterConfig.activeOffsetCacheEnabled,
clusterConfig.displaySizeEnabled
Expand Down Expand Up @@ -316,6 +334,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
clusterOperation.clusterConfig.filterConsumers,
clusterOperation.clusterConfig.tuning,
clusterOperation.clusterConfig.securityProtocol.stringId,
clusterOperation.clusterConfig.saslMechanism.map(_.stringId),
clusterOperation.clusterConfig.jaasConfig,
clusterOperation.clusterConfig.logkafkaEnabled,
clusterOperation.clusterConfig.activeOffsetCacheEnabled,
clusterOperation.clusterConfig.displaySizeEnabled
Expand Down
8 changes: 8 additions & 0 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
securityProtocol: String,
saslMechanism: Option[String],
jaasConfig: Option[String],
logkafkaEnabled: Boolean = false,
activeOffsetCacheEnabled: Boolean = false,
displaySizeEnabled: Boolean = false): Future[ApiError \/ Unit] =
Expand All @@ -273,6 +275,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
zkHosts,
tuning = tuning,
securityProtocol = securityProtocol,
saslMechanism = saslMechanism,
jaasConfig = jaasConfig,
jmxEnabled = jmxEnabled,
jmxUser = jmxUser,
jmxPass = jmxPass,
Expand All @@ -298,6 +302,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
securityProtocol: String,
saslMechanism: Option[String],
jaasConfig: Option[String],
logkafkaEnabled: Boolean = false,
activeOffsetCacheEnabled: Boolean = false,
displaySizeEnabled: Boolean = false): Future[ApiError \/ Unit] =
Expand All @@ -308,6 +314,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
zkHosts,
tuning = tuning,
securityProtocol = securityProtocol,
saslMechanism = saslMechanism,
jaasConfig = jaasConfig,
jmxEnabled = jmxEnabled,
jmxUser = jmxUser,
jmxPass = jmxPass,
Expand Down
3 changes: 3 additions & 0 deletions app/kafka/manager/actor/KafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,16 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
&& newConfig.jmxEnabled == currentConfig.jmxEnabled
&& newConfig.jmxUser == currentConfig.jmxUser
&& newConfig.jmxPass == currentConfig.jmxPass
&& newConfig.jmxSsl == currentConfig.jmxSsl
&& newConfig.logkafkaEnabled == currentConfig.logkafkaEnabled
&& newConfig.pollConsumers == currentConfig.pollConsumers
&& newConfig.filterConsumers == currentConfig.filterConsumers
&& newConfig.activeOffsetCacheEnabled == currentConfig.activeOffsetCacheEnabled
&& newConfig.displaySizeEnabled == currentConfig.displaySizeEnabled
&& newConfig.tuning == currentConfig.tuning
&& newConfig.securityProtocol == currentConfig.securityProtocol
&& newConfig.saslMechanism == currentConfig.saslMechanism
&& newConfig.jaasConfig == currentConfig.jaasConfig
) {
//nothing changed
false
Expand Down
26 changes: 26 additions & 0 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import org.apache.kafka.clients.consumer.ConsumerConfig._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG

Expand Down Expand Up @@ -100,6 +101,14 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.clusterContext.config.securityProtocol.stringId)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListStr)
if(config.clusterContext.config.saslMechanism.nonEmpty){
props.put(SaslConfigs.SASL_MECHANISM, config.clusterContext.config.saslMechanism.get.stringId)
log.info(s"SASL Mechanism =${config.clusterContext.config.saslMechanism.get}")
}
if(config.clusterContext.config.jaasConfig.nonEmpty){
props.put(SaslConfigs.SASL_JAAS_CONFIG, config.clusterContext.config.jaasConfig.get)
log.info(s"SASL JAAS config=${config.clusterContext.config.jaasConfig.get}")
}
log.info(s"Creating admin client with security protocol=${config.clusterContext.config.securityProtocol.stringId} , broker list : $brokerListStr")
AdminClient.create(props)
}
Expand Down Expand Up @@ -257,6 +266,14 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
cp => props.putAll(cp)
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clusterContext.config.securityProtocol.stringId)
if(clusterContext.config.saslMechanism.nonEmpty){
props.put(SaslConfigs.SASL_MECHANISM, clusterContext.config.saslMechanism.get.stringId)
info(s"SASL Mechanism =${clusterContext.config.saslMechanism.get}")
if(clusterContext.config.jaasConfig.nonEmpty){
props.put(SaslConfigs.SASL_JAAS_CONFIG, clusterContext.config.jaasConfig.get)
info(s"SASL JAAS config=${clusterContext.config.jaasConfig.get}")
}
}
Try {
info("Constructing new kafka consumer client using these properties: ")
props.asScala.foreach {
Expand Down Expand Up @@ -1459,6 +1476,15 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
val port: Int = broker.endpoints(securityProtocol)
consumerProperties.put(BOOTSTRAP_SERVERS_CONFIG, s"${broker.host}:$port")
consumerProperties.put(SECURITY_PROTOCOL_CONFIG, securityProtocol.stringId)
// Use secure endpoint if available
if(kaConfig.clusterContext.config.saslMechanism.nonEmpty){
consumerProperties.put(SaslConfigs.SASL_MECHANISM, kaConfig.clusterContext.config.saslMechanism.get.stringId)
log.info(s"SASL Mechanism =${kaConfig.clusterContext.config.saslMechanism.get}")
}
if(kaConfig.clusterContext.config.jaasConfig.nonEmpty){
consumerProperties.put(SaslConfigs.SASL_JAAS_CONFIG, kaConfig.clusterContext.config.jaasConfig.get)
log.info(s"SASL JAAS config=${kaConfig.clusterContext.config.jaasConfig.get}")
}
var kafkaConsumer: Option[KafkaConsumer[Any, Any]] = None
try {
kafkaConsumer = Option(new KafkaConsumer(consumerProperties))
Expand Down
53 changes: 50 additions & 3 deletions app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ object ClusterConfig {
, displaySizeEnabled: Boolean = false
, tuning: Option[ClusterTuning]
, securityProtocol: String
, saslMechanism: Option[String]
, jaasConfig: Option[String]
) : ClusterConfig = {
val kafkaVersion = KafkaVersion(version)
//validate cluster name
Expand All @@ -190,15 +192,17 @@ object ClusterConfig {
, displaySizeEnabled
, tuning
, SecurityProtocol(securityProtocol)
, saslMechanism.flatMap(SASLmechanism.from)
, jaasConfig
)
}

def customUnapply(cc: ClusterConfig) : Option[(
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String)] = {
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Some((
cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry,
cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.pollConsumers, cc.filterConsumers,
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId, cc.saslMechanism.map(_.stringId), cc.jaasConfig
)
)
}
Expand Down Expand Up @@ -242,6 +246,8 @@ object ClusterConfig {
:: ("displaySizeEnabled" -> toJSON(config.displaySizeEnabled))
:: ("tuning" -> toJSON(config.tuning))
:: ("securityProtocol" -> toJSON(config.securityProtocol.stringId))
:: ("saslMechanism" -> toJSON(config.saslMechanism.map(_.stringId)))
:: ("jaasConfig" -> toJSON(config.jaasConfig))
:: Nil)
compact(render(json)).getBytes(StandardCharsets.UTF_8)
}
Expand All @@ -267,6 +273,9 @@ object ClusterConfig {
val clusterTuning = fieldExtended[Option[ClusterTuning]]("tuning")(json)
val securityProtocolString = fieldExtended[String]("securityProtocol")(json)
val securityProtocol = securityProtocolString.map(SecurityProtocol.apply).getOrElse(PLAINTEXT)
val saslMechanismString = fieldExtended[Option[String]]("saslMechanism")(json)
val saslMechanism = saslMechanismString.map(_.flatMap(SASLmechanism.from))
val jaasConfig = fieldExtended[Option[String]]("jaasConfig")(json)

ClusterConfig.apply(
name,
Expand All @@ -282,7 +291,9 @@ object ClusterConfig {
activeOffsetCacheEnabled.getOrElse(false),
displaySizeEnabled.getOrElse(false),
clusterTuning.getOrElse(None),
securityProtocol
securityProtocol,
saslMechanism.getOrElse(None),
jaasConfig.getOrElse(None)
)
}

Expand Down Expand Up @@ -413,6 +424,8 @@ case class ClusterConfig (name: String
, displaySizeEnabled: Boolean
, tuning: Option[ClusterTuning]
, securityProtocol: SecurityProtocol
, saslMechanism: Option[SASLmechanism]
, jaasConfig: Option[String]
)

sealed trait SecurityProtocol {
Expand Down Expand Up @@ -446,3 +459,37 @@ object SecurityProtocol {
val formSelectList : IndexedSeq[(String,String)] = typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId))
def apply(s: String) : SecurityProtocol = typesMap(s.toUpperCase)
}

sealed trait SASLmechanism {
def stringId: String
}
case object SASL_MECHANISM_PLAIN extends SASLmechanism {
val stringId = "PLAIN"
}

case object SASL_MECHANISM_GSSAPI extends SASLmechanism {
val stringId = "GSSAPI"
}

case object SASL_MECHANISM_SCRAM256 extends SASLmechanism {
val stringId = "SCRAM-SHA-256"
}
case object SASL_MECHANISM_SCRAM512 extends SASLmechanism {
val stringId = "SCRAM-SHA-512"
}

object SASLmechanism {
private[this] val typesMap: Map[String, SASLmechanism] = Map(
SASL_MECHANISM_PLAIN.stringId -> SASL_MECHANISM_PLAIN
, SASL_MECHANISM_GSSAPI.stringId -> SASL_MECHANISM_GSSAPI
, SASL_MECHANISM_SCRAM256.stringId -> SASL_MECHANISM_SCRAM256
, SASL_MECHANISM_SCRAM512.stringId -> SASL_MECHANISM_SCRAM512
)

val formSelectList : IndexedSeq[(String,String)] = IndexedSeq(("DEFAULT", "DEFAULT")) ++ typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId))
private def apply(s: String) : SASLmechanism = typesMap(s.toUpperCase)
def from(s: String) : Option[SASLmechanism] = s.toUpperCase match {
case "DEFAULT" => None
case other => Option(apply(other))
}
}
12 changes: 8 additions & 4 deletions app/models/form/ClusterOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package models.form

import kafka.manager.model.{ClusterConfig, ClusterTuning, SecurityProtocol}
import kafka.manager.model.{ClusterConfig, ClusterTuning, SASLmechanism, SecurityProtocol}

/**
* @author hiral
Expand Down Expand Up @@ -46,17 +46,21 @@ object ClusterOperation {
, displaySizeEnabled: Boolean
, tuning: Option[ClusterTuning]
, securityProtocol: String
, saslMechanism: Option[String]
, jaasConfig: Option[String]
): ClusterOperation = {
ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, jmxUser, jmxPass, jmxSsl,
pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol))
pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol, saslMechanism, jaasConfig))
}

def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String)] = {
def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Option((co.op.toString, co.clusterConfig.name, co.clusterConfig.version.toString,
co.clusterConfig.curatorConfig.zkConnect, co.clusterConfig.curatorConfig.zkMaxRetry,
co.clusterConfig.jmxEnabled, co.clusterConfig.jmxUser, co.clusterConfig.jmxPass, co.clusterConfig.jmxSsl,
co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled,
co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId))
co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId,
co.clusterConfig.saslMechanism.map(_.stringId),
co.clusterConfig.jaasConfig))
}
}

Expand Down
2 changes: 2 additions & 0 deletions app/views/cluster/addCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
@b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize")
@b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays")
@b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" )
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism (only applies to SASL based security)" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config (only applies to SASL based security)")
@b3.submit('class -> "submit-button btn btn-primary"){ Save }
<a href="@routes.Application.index()" class="cancel-button btn btn-default" role="button">Cancel</a>
</fieldset>
Expand Down
2 changes: 2 additions & 0 deletions app/views/cluster/updateCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
@b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize")
@b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays")
@b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" )
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism (only applies to SASL based security)" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config (only applies to SASL based security)")
@b3.submit('class -> "submit-button btn btn-primary btn"){ Save }
<a href="@routes.Application.index()" class="cancel-button btn btn-default" role="button">Cancel</a>
</fieldset>
Expand Down
2 changes: 1 addition & 1 deletion test/controller/api/TestKafkaStateCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M

private[this] def createCluster() = {
val future = kafkaManagerContext.get.getKafkaManager.addCluster(
testClusterName,"1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol="PLAINTEXT"
testClusterName,"1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None
)
val result = Await.result(future,duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
Expand Down
Loading

0 comments on commit e27328c

Please sign in to comment.