Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…o#471.  update add/update cluster page, add sasl.machenisam and sasl.jaas.config inputs.
  • Loading branch information
steven.wang committed Jul 16, 2018
1 parent 8dcdbf8 commit 466f059
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 17 deletions.
22 changes: 21 additions & 1 deletion app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
case Success(_) => Valid
}
}
val validateSASLmechanism: Constraint[String] = Constraint("validate SASL mechanism") { string =>
Try {
SASLmechanism(string)
} match {
case Failure(t) => Invalid(t.getMessage)
case Success(_) => Valid
}
}

val clusterConfigForm = Form(
mapping(
Expand Down Expand Up @@ -114,6 +122,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
, "saslMechanism" -> nonEmptyText.verifying(validateSASLmechanism)
, "jaasConfig" -> optional(text)
)(ClusterConfig.apply)(ClusterConfig.customUnapply)
)

Expand Down Expand Up @@ -156,6 +166,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
, "saslMechanism" -> nonEmptyText.verifying(validateSASLmechanism)
, "jaasConfig" -> optional(text)
)(ClusterOperation.apply)(ClusterOperation.customUnapply)
)

Expand All @@ -176,6 +188,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
,false
,Option(defaultTuning)
,PLAINTEXT
,PLAIN
,None
)
}

Expand Down Expand Up @@ -223,7 +237,9 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
cc.activeOffsetCacheEnabled,
cc.displaySizeEnabled,
cc.tuning,
cc.securityProtocol.stringId
cc.securityProtocol.stringId,
cc.saslMechanism.stringId,
cc.jaasConfig
))
}))
}
Expand All @@ -247,6 +263,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
clusterConfig.filterConsumers,
clusterConfig.tuning,
clusterConfig.securityProtocol.stringId,
clusterConfig.saslMechanism.stringId,
clusterConfig.jaasConfig,
clusterConfig.logkafkaEnabled,
clusterConfig.activeOffsetCacheEnabled,
clusterConfig.displaySizeEnabled
Expand Down Expand Up @@ -316,6 +334,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
clusterOperation.clusterConfig.filterConsumers,
clusterOperation.clusterConfig.tuning,
clusterOperation.clusterConfig.securityProtocol.stringId,
clusterOperation.clusterConfig.saslMechanism.stringId,
clusterOperation.clusterConfig.jaasConfig,
clusterOperation.clusterConfig.logkafkaEnabled,
clusterOperation.clusterConfig.activeOffsetCacheEnabled,
clusterOperation.clusterConfig.displaySizeEnabled
Expand Down
8 changes: 8 additions & 0 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
securityProtocol: String,
saslMechanism: String,
jaasConfig: Option[String],
logkafkaEnabled: Boolean = false,
activeOffsetCacheEnabled: Boolean = false,
displaySizeEnabled: Boolean = false): Future[ApiError \/ Unit] =
Expand All @@ -273,6 +275,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
zkHosts,
tuning = tuning,
securityProtocol = securityProtocol,
saslMechanism = saslMechanism,
jaasConfig = jaasConfig,
jmxEnabled = jmxEnabled,
jmxUser = jmxUser,
jmxPass = jmxPass,
Expand All @@ -298,6 +302,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
securityProtocol: String,
saslMechanism: String,
jaasConfig: Option[String],
logkafkaEnabled: Boolean = false,
activeOffsetCacheEnabled: Boolean = false,
displaySizeEnabled: Boolean = false): Future[ApiError \/ Unit] =
Expand All @@ -308,6 +314,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
zkHosts,
tuning = tuning,
securityProtocol = securityProtocol,
saslMechanism = saslMechanism,
jaasConfig = jaasConfig,
jmxEnabled = jmxEnabled,
jmxUser = jmxUser,
jmxPass = jmxPass,
Expand Down
19 changes: 17 additions & 2 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import org.apache.kafka.clients.consumer.ConsumerConfig._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.protocol.Errors

/**
Expand Down Expand Up @@ -99,7 +100,12 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.clusterContext.config.securityProtocol.stringId)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListStr)
log.info(s"Creating admin client with security protocol=${config.clusterContext.config.securityProtocol.stringId} , broker list : $brokerListStr")
props.put(SaslConfigs.SASL_MECHANISM, config.clusterContext.config.saslMechanism.stringId)
if(!config.clusterContext.config.jaasConfig.isEmpty){
props.put(SaslConfigs.SASL_JAAS_CONFIG,config.clusterContext.config.jaasConfig.get);
log.info(s"SASL JAAS config=${config.clusterContext.config.jaasConfig}");
}
log.info(s"Creating admin client with security protocol=${config.clusterContext.config.securityProtocol.stringId} , mechanism=${config.clusterContext.config.saslMechanism.stringId}, broker list : $brokerListStr")
AdminClient.create(props)
}

Expand Down Expand Up @@ -256,6 +262,10 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
cp => props.putAll(cp)
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clusterContext.config.securityProtocol.stringId)
props.put(SaslConfigs.SASL_MECHANISM, clusterContext.config.saslMechanism.stringId)
if(!clusterContext.config.jaasConfig.isEmpty){
props.put(SaslConfigs.SASL_JAAS_CONFIG, clusterContext.config.jaasConfig.get);
}
Try {
info("Constructing new kafka consumer client using these properties: ")
props.asScala.foreach {
Expand Down Expand Up @@ -1453,7 +1463,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
}

val tpList = broker2TopicPartitionMap(broker)
val port: Int = broker.endpoints(PLAINTEXT)
val port: Int = broker.endpoints(kaConfig.clusterContext.config.securityProtocol)
val consumerProperties = kaConfig.consumerProperties.getOrElse(getDefaultConsumerProperties(s"${broker.host}:$port"))
var kafkaConsumer: Option[KafkaConsumer[Any, Any]] = None
try {
Expand Down Expand Up @@ -1508,6 +1518,11 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
properties.put(GROUP_ID_CONFIG, getClass.getCanonicalName)
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,kaConfig.clusterContext.config.securityProtocol.stringId)
properties.put(SaslConfigs.SASL_MECHANISM,kaConfig.clusterContext.config.saslMechanism.stringId)
if(!kaConfig.clusterContext.config.jaasConfig.isEmpty){
properties.put(SaslConfigs.SASL_JAAS_CONFIG,kaConfig.clusterContext.config.jaasConfig.get);
}
properties
}
}
Expand Down
49 changes: 46 additions & 3 deletions app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ object ClusterConfig {
, displaySizeEnabled: Boolean = false
, tuning: Option[ClusterTuning]
, securityProtocol: String
, saslMechanism: String
, jaasConfig: Option[String]
) : ClusterConfig = {
val kafkaVersion = KafkaVersion(version)
//validate cluster name
Expand All @@ -190,15 +192,17 @@ object ClusterConfig {
, displaySizeEnabled
, tuning
, SecurityProtocol(securityProtocol)
, SASLmechanism(saslMechanism)
, jaasConfig
)
}

def customUnapply(cc: ClusterConfig) : Option[(
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String)] = {
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, String, Option[String])] = {
Some((
cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry,
cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.pollConsumers, cc.filterConsumers,
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId, cc.saslMechanism.stringId, cc.jaasConfig
)
)
}
Expand Down Expand Up @@ -242,6 +246,8 @@ object ClusterConfig {
:: ("displaySizeEnabled" -> toJSON(config.displaySizeEnabled))
:: ("tuning" -> toJSON(config.tuning))
:: ("securityProtocol" -> toJSON(config.securityProtocol.stringId))
:: ("saslMechanism" -> toJSON(config.saslMechanism.stringId))
:: ("jaasConfig" -> toJSON(config.jaasConfig))
:: Nil)
compact(render(json)).getBytes(StandardCharsets.UTF_8)
}
Expand All @@ -267,6 +273,9 @@ object ClusterConfig {
val clusterTuning = fieldExtended[Option[ClusterTuning]]("tuning")(json)
val securityProtocolString = fieldExtended[String]("securityProtocol")(json)
val securityProtocol = securityProtocolString.map(SecurityProtocol.apply).getOrElse(PLAINTEXT)
val saslMechanismString = fieldExtended[String]("saslMechanism")(json)
val saslMechanism = saslMechanismString.map(SASLmechanism.apply).getOrElse(PLAIN)
val jaasConfig = fieldExtended[Option[String]]("jaasConfig")(json)

ClusterConfig.apply(
name,
Expand All @@ -282,7 +291,9 @@ object ClusterConfig {
activeOffsetCacheEnabled.getOrElse(false),
displaySizeEnabled.getOrElse(false),
clusterTuning.getOrElse(None),
securityProtocol
securityProtocol,
saslMechanism,
jaasConfig.getOrElse(None)
)
}

Expand Down Expand Up @@ -413,6 +424,8 @@ case class ClusterConfig (name: String
, displaySizeEnabled: Boolean
, tuning: Option[ClusterTuning]
, securityProtocol: SecurityProtocol
, saslMechanism: SASLmechanism
, jaasConfig: Option[String]
)

sealed trait SecurityProtocol {
Expand Down Expand Up @@ -446,3 +459,33 @@ object SecurityProtocol {
val formSelectList : IndexedSeq[(String,String)] = typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId))
def apply(s: String) : SecurityProtocol = typesMap(s.toUpperCase)
}

sealed trait SASLmechanism {
def stringId: String
}
case object PLAIN extends SASLmechanism {
val stringId = "PLAIN"
}

case object GSSAPI extends SASLmechanism {
val stringId = "GSSAPI"
}

case object SCRAM256 extends SASLmechanism {
val stringId = "SCRAM-SHA-256"
}
case object SCRAM512 extends SASLmechanism {
val stringId = "SCRAM-SHA-512"
}

object SASLmechanism {
private[this] val typesMap: Map[String, SASLmechanism] = Map(
PLAIN.stringId -> PLAIN
, GSSAPI.stringId -> GSSAPI
, SCRAM256.stringId -> SCRAM256
, SCRAM512.stringId -> SCRAM512
)

val formSelectList : IndexedSeq[(String,String)] = typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId))
def apply(s: String) : SASLmechanism = typesMap(s.toUpperCase)
}
12 changes: 8 additions & 4 deletions app/models/form/ClusterOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package models.form

import kafka.manager.model.{ClusterConfig, ClusterTuning, SecurityProtocol}
import kafka.manager.model.{ClusterConfig, ClusterTuning, SASLmechanism, SecurityProtocol}

/**
* @author hiral
Expand Down Expand Up @@ -46,17 +46,21 @@ object ClusterOperation {
, displaySizeEnabled: Boolean
, tuning: Option[ClusterTuning]
, securityProtocol: String
, saslMechanism: String
, jaasConfig: Option[String]
): ClusterOperation = {
ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, jmxUser, jmxPass, jmxSsl,
pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol))
pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol, saslMechanism, jaasConfig))
}

def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String)] = {
def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, String, Option[String])] = {
Option((co.op.toString, co.clusterConfig.name, co.clusterConfig.version.toString,
co.clusterConfig.curatorConfig.zkConnect, co.clusterConfig.curatorConfig.zkMaxRetry,
co.clusterConfig.jmxEnabled, co.clusterConfig.jmxUser, co.clusterConfig.jmxPass, co.clusterConfig.jmxSsl,
co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled,
co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId))
co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId,
co.clusterConfig.saslMechanism.stringId,
co.clusterConfig.jaasConfig))
}
}

Expand Down
2 changes: 2 additions & 0 deletions app/views/cluster/addCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
@b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize")
@b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays")
@b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" )
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL mechanism" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config")
@b3.submit('class -> "submit-button btn btn-primary"){ Save }
<a href="@routes.Application.index()" class="cancel-button btn btn-default" role="button">Cancel</a>
</fieldset>
Expand Down
3 changes: 3 additions & 0 deletions app/views/cluster/clusterList.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<input type="hidden" name="kafkaVersion" value="@cluster.version.toString">
<input type="hidden" name="zkHosts" value="@cluster.curatorConfig.zkConnect">
<input type="hidden" name="securityProtocol" value="@cluster.securityProtocol.stringId">
<input type="hidden" name="saslMechanism" value="@cluster.saslMechanism.stringId">
<input type="hidden" name="operation" value="Disable">
@b3.submit('class -> "btn btn-warning ops-button"){ Disable }
}
Expand All @@ -39,6 +40,7 @@
<input type="hidden" name="kafkaVersion" value="@cluster.version.toString">
<input type="hidden" name="zkHosts" value="@cluster.curatorConfig.zkConnect">
<input type="hidden" name="securityProtocol" value="@cluster.securityProtocol.stringId">
<input type="hidden" name="saslMechanism" value="@cluster.saslMechanism.stringId">
<input type="hidden" name="operation" value="Enable">
@b3.submit('class -> "btn btn-success ops-button"){ Enable }
}
Expand All @@ -47,6 +49,7 @@
<input type="hidden" name="kafkaVersion" value="@cluster.version.toString">
<input type="hidden" name="zkHosts" value="@cluster.curatorConfig.zkConnect">
<input type="hidden" name="securityProtocol" value="@cluster.securityProtocol.stringId">
<input type="hidden" name="saslMechanism" value="@cluster.saslMechanism.stringId">
<input type="hidden" name="operation" value="Delete">
@b3.submit('class -> "btn btn-danger ops-button"){ Delete }
}
Expand Down
2 changes: 2 additions & 0 deletions app/views/cluster/updateCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
@b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize")
@b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays")
@b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" )
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL mechanism" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config")
@b3.submit('class -> "submit-button btn btn-primary btn"){ Save }
<a href="@routes.Application.index()" class="cancel-button btn btn-default" role="button">Cancel</a>
</fieldset>
Expand Down
4 changes: 2 additions & 2 deletions test/kafka/manager/TestBrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ class TestBrokerViewCacheActor extends KafkaServerInTest with BaseTest {
private[this] implicit val timeout: Timeout = 10.seconds

private[this] var brokerViewCacheActor : Option[ActorRef] = None
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism="PLAIN", jaasConfig=None)
private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig)

override protected def beforeAll(): Unit = {
super.beforeAll()
val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism="PLAIN", jaasConfig=None)
val clusterContext = ClusterContext(ClusterFeatures.from(clusterConfig), clusterConfig)
val ksConfig = KafkaStateActorConfig(sharedCurator, "pinned-dispatcher", clusterContext, LongRunningPoolConfig(2,100), LongRunningPoolConfig(2,100), 5, 10000, None, KafkaManagedOffsetCacheConfig())
val props = Props(classOf[KafkaStateActor],ksConfig)
Expand Down
Loading

0 comments on commit 466f059

Please sign in to comment.