Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I added sasl.mechanism, and sasl.jaas.config for each cluster. and fix some bugs [#528, #513,#502,#479,#477,#471] #532

Merged
merged 5 commits into from
Jul 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
case Success(_) => Valid
}
}
val validateSASLmechanism: Constraint[Option[String]] = Constraint("validate SASL mechanism") { stringOption =>
Try {
stringOption.foreach(SASLmechanism.from)
} match {
case Failure(t) => Invalid(t.getMessage)
case Success(_) => Valid
}
}

val clusterConfigForm = Form(
mapping(
Expand Down Expand Up @@ -114,6 +122,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
, "saslMechanism" -> optional(text).verifying(validateSASLmechanism)
, "jaasConfig" -> optional(text)
)(ClusterConfig.apply)(ClusterConfig.customUnapply)
)

Expand Down Expand Up @@ -156,6 +166,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
, "saslMechanism" -> optional(text).verifying(validateSASLmechanism)
, "jaasConfig" -> optional(text)
)(ClusterOperation.apply)(ClusterOperation.customUnapply)
)

Expand All @@ -176,6 +188,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
,false
,Option(defaultTuning)
,PLAINTEXT
,None
,None
)
}

Expand Down Expand Up @@ -223,7 +237,9 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
cc.activeOffsetCacheEnabled,
cc.displaySizeEnabled,
cc.tuning,
cc.securityProtocol.stringId
cc.securityProtocol.stringId,
cc.saslMechanism.map(_.stringId),
cc.jaasConfig
))
}))
}
Expand All @@ -247,6 +263,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
clusterConfig.filterConsumers,
clusterConfig.tuning,
clusterConfig.securityProtocol.stringId,
clusterConfig.saslMechanism.map(_.stringId),
clusterConfig.jaasConfig,
clusterConfig.logkafkaEnabled,
clusterConfig.activeOffsetCacheEnabled,
clusterConfig.displaySizeEnabled
Expand Down Expand Up @@ -316,6 +334,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
clusterOperation.clusterConfig.filterConsumers,
clusterOperation.clusterConfig.tuning,
clusterOperation.clusterConfig.securityProtocol.stringId,
clusterOperation.clusterConfig.saslMechanism.map(_.stringId),
clusterOperation.clusterConfig.jaasConfig,
clusterOperation.clusterConfig.logkafkaEnabled,
clusterOperation.clusterConfig.activeOffsetCacheEnabled,
clusterOperation.clusterConfig.displaySizeEnabled
Expand Down
8 changes: 8 additions & 0 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
securityProtocol: String,
saslMechanism: Option[String],
jaasConfig: Option[String],
logkafkaEnabled: Boolean = false,
activeOffsetCacheEnabled: Boolean = false,
displaySizeEnabled: Boolean = false): Future[ApiError \/ Unit] =
Expand All @@ -273,6 +275,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
zkHosts,
tuning = tuning,
securityProtocol = securityProtocol,
saslMechanism = saslMechanism,
jaasConfig = jaasConfig,
jmxEnabled = jmxEnabled,
jmxUser = jmxUser,
jmxPass = jmxPass,
Expand All @@ -298,6 +302,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
securityProtocol: String,
saslMechanism: Option[String],
jaasConfig: Option[String],
logkafkaEnabled: Boolean = false,
activeOffsetCacheEnabled: Boolean = false,
displaySizeEnabled: Boolean = false): Future[ApiError \/ Unit] =
Expand All @@ -308,6 +314,8 @@ class KafkaManager(akkaConfig: Config) extends Logging {
zkHosts,
tuning = tuning,
securityProtocol = securityProtocol,
saslMechanism = saslMechanism,
jaasConfig = jaasConfig,
jmxEnabled = jmxEnabled,
jmxUser = jmxUser,
jmxPass = jmxPass,
Expand Down
3 changes: 3 additions & 0 deletions app/kafka/manager/actor/KafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,16 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
&& newConfig.jmxEnabled == currentConfig.jmxEnabled
&& newConfig.jmxUser == currentConfig.jmxUser
&& newConfig.jmxPass == currentConfig.jmxPass
&& newConfig.jmxSsl == currentConfig.jmxSsl
&& newConfig.logkafkaEnabled == currentConfig.logkafkaEnabled
&& newConfig.pollConsumers == currentConfig.pollConsumers
&& newConfig.filterConsumers == currentConfig.filterConsumers
&& newConfig.activeOffsetCacheEnabled == currentConfig.activeOffsetCacheEnabled
&& newConfig.displaySizeEnabled == currentConfig.displaySizeEnabled
&& newConfig.tuning == currentConfig.tuning
&& newConfig.securityProtocol == currentConfig.securityProtocol
&& newConfig.saslMechanism == currentConfig.saslMechanism
&& newConfig.jaasConfig == currentConfig.jaasConfig
) {
//nothing changed
false
Expand Down
26 changes: 26 additions & 0 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import org.apache.kafka.clients.consumer.ConsumerConfig._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG

Expand Down Expand Up @@ -100,6 +101,14 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.clusterContext.config.securityProtocol.stringId)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListStr)
if(config.clusterContext.config.saslMechanism.nonEmpty){
props.put(SaslConfigs.SASL_MECHANISM, config.clusterContext.config.saslMechanism.get.stringId)
log.info(s"SASL Mechanism =${config.clusterContext.config.saslMechanism.get}")
}
if(config.clusterContext.config.jaasConfig.nonEmpty){
props.put(SaslConfigs.SASL_JAAS_CONFIG, config.clusterContext.config.jaasConfig.get)
log.info(s"SASL JAAS config=${config.clusterContext.config.jaasConfig.get}")
}
log.info(s"Creating admin client with security protocol=${config.clusterContext.config.securityProtocol.stringId} , broker list : $brokerListStr")
AdminClient.create(props)
}
Expand Down Expand Up @@ -257,6 +266,14 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
cp => props.putAll(cp)
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clusterContext.config.securityProtocol.stringId)
if(clusterContext.config.saslMechanism.nonEmpty){
props.put(SaslConfigs.SASL_MECHANISM, clusterContext.config.saslMechanism.get.stringId)
info(s"SASL Mechanism =${clusterContext.config.saslMechanism.get}")
if(clusterContext.config.jaasConfig.nonEmpty){
props.put(SaslConfigs.SASL_JAAS_CONFIG, clusterContext.config.jaasConfig.get)
info(s"SASL JAAS config=${clusterContext.config.jaasConfig.get}")
}
}
Try {
info("Constructing new kafka consumer client using these properties: ")
props.asScala.foreach {
Expand Down Expand Up @@ -1459,6 +1476,15 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
val port: Int = broker.endpoints(securityProtocol)
consumerProperties.put(BOOTSTRAP_SERVERS_CONFIG, s"${broker.host}:$port")
consumerProperties.put(SECURITY_PROTOCOL_CONFIG, securityProtocol.stringId)
// Use secure endpoint if available
if(kaConfig.clusterContext.config.saslMechanism.nonEmpty){
consumerProperties.put(SaslConfigs.SASL_MECHANISM, kaConfig.clusterContext.config.saslMechanism.get.stringId)
log.info(s"SASL Mechanism =${kaConfig.clusterContext.config.saslMechanism.get}")
}
if(kaConfig.clusterContext.config.jaasConfig.nonEmpty){
consumerProperties.put(SaslConfigs.SASL_JAAS_CONFIG, kaConfig.clusterContext.config.jaasConfig.get)
log.info(s"SASL JAAS config=${kaConfig.clusterContext.config.jaasConfig.get}")
}
var kafkaConsumer: Option[KafkaConsumer[Any, Any]] = None
try {
kafkaConsumer = Option(new KafkaConsumer(consumerProperties))
Expand Down
53 changes: 50 additions & 3 deletions app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ object ClusterConfig {
, displaySizeEnabled: Boolean = false
, tuning: Option[ClusterTuning]
, securityProtocol: String
, saslMechanism: Option[String]
, jaasConfig: Option[String]
) : ClusterConfig = {
val kafkaVersion = KafkaVersion(version)
//validate cluster name
Expand All @@ -190,15 +192,17 @@ object ClusterConfig {
, displaySizeEnabled
, tuning
, SecurityProtocol(securityProtocol)
, saslMechanism.flatMap(SASLmechanism.from)
, jaasConfig
)
}

def customUnapply(cc: ClusterConfig) : Option[(
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String)] = {
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Some((
cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry,
cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.pollConsumers, cc.filterConsumers,
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId, cc.saslMechanism.map(_.stringId), cc.jaasConfig
)
)
}
Expand Down Expand Up @@ -242,6 +246,8 @@ object ClusterConfig {
:: ("displaySizeEnabled" -> toJSON(config.displaySizeEnabled))
:: ("tuning" -> toJSON(config.tuning))
:: ("securityProtocol" -> toJSON(config.securityProtocol.stringId))
:: ("saslMechanism" -> toJSON(config.saslMechanism.map(_.stringId)))
:: ("jaasConfig" -> toJSON(config.jaasConfig))
:: Nil)
compact(render(json)).getBytes(StandardCharsets.UTF_8)
}
Expand All @@ -267,6 +273,9 @@ object ClusterConfig {
val clusterTuning = fieldExtended[Option[ClusterTuning]]("tuning")(json)
val securityProtocolString = fieldExtended[String]("securityProtocol")(json)
val securityProtocol = securityProtocolString.map(SecurityProtocol.apply).getOrElse(PLAINTEXT)
val saslMechanismString = fieldExtended[Option[String]]("saslMechanism")(json)
val saslMechanism = saslMechanismString.map(_.flatMap(SASLmechanism.from))
val jaasConfig = fieldExtended[Option[String]]("jaasConfig")(json)

ClusterConfig.apply(
name,
Expand All @@ -282,7 +291,9 @@ object ClusterConfig {
activeOffsetCacheEnabled.getOrElse(false),
displaySizeEnabled.getOrElse(false),
clusterTuning.getOrElse(None),
securityProtocol
securityProtocol,
saslMechanism.getOrElse(None),
jaasConfig.getOrElse(None)
)
}

Expand Down Expand Up @@ -413,6 +424,8 @@ case class ClusterConfig (name: String
, displaySizeEnabled: Boolean
, tuning: Option[ClusterTuning]
, securityProtocol: SecurityProtocol
, saslMechanism: Option[SASLmechanism]
, jaasConfig: Option[String]
)

sealed trait SecurityProtocol {
Expand Down Expand Up @@ -446,3 +459,37 @@ object SecurityProtocol {
val formSelectList : IndexedSeq[(String,String)] = typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId))
def apply(s: String) : SecurityProtocol = typesMap(s.toUpperCase)
}

sealed trait SASLmechanism {
def stringId: String
}
case object SASL_MECHANISM_PLAIN extends SASLmechanism {
val stringId = "PLAIN"
}

case object SASL_MECHANISM_GSSAPI extends SASLmechanism {
val stringId = "GSSAPI"
}

case object SASL_MECHANISM_SCRAM256 extends SASLmechanism {
val stringId = "SCRAM-SHA-256"
}
case object SASL_MECHANISM_SCRAM512 extends SASLmechanism {
val stringId = "SCRAM-SHA-512"
}

object SASLmechanism {
private[this] val typesMap: Map[String, SASLmechanism] = Map(
SASL_MECHANISM_PLAIN.stringId -> SASL_MECHANISM_PLAIN
, SASL_MECHANISM_GSSAPI.stringId -> SASL_MECHANISM_GSSAPI
, SASL_MECHANISM_SCRAM256.stringId -> SASL_MECHANISM_SCRAM256
, SASL_MECHANISM_SCRAM512.stringId -> SASL_MECHANISM_SCRAM512
)

val formSelectList : IndexedSeq[(String,String)] = IndexedSeq(("DEFAULT", "DEFAULT")) ++ typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId))
private def apply(s: String) : SASLmechanism = typesMap(s.toUpperCase)
def from(s: String) : Option[SASLmechanism] = s.toUpperCase match {
case "DEFAULT" => None
case other => Option(apply(other))
}
}
12 changes: 8 additions & 4 deletions app/models/form/ClusterOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package models.form

import kafka.manager.model.{ClusterConfig, ClusterTuning, SecurityProtocol}
import kafka.manager.model.{ClusterConfig, ClusterTuning, SASLmechanism, SecurityProtocol}

/**
* @author hiral
Expand Down Expand Up @@ -46,17 +46,21 @@ object ClusterOperation {
, displaySizeEnabled: Boolean
, tuning: Option[ClusterTuning]
, securityProtocol: String
, saslMechanism: Option[String]
, jaasConfig: Option[String]
): ClusterOperation = {
ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, jmxUser, jmxPass, jmxSsl,
pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol))
pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol, saslMechanism, jaasConfig))
}

def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String)] = {
def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Option((co.op.toString, co.clusterConfig.name, co.clusterConfig.version.toString,
co.clusterConfig.curatorConfig.zkConnect, co.clusterConfig.curatorConfig.zkMaxRetry,
co.clusterConfig.jmxEnabled, co.clusterConfig.jmxUser, co.clusterConfig.jmxPass, co.clusterConfig.jmxSsl,
co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled,
co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId))
co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId,
co.clusterConfig.saslMechanism.map(_.stringId),
co.clusterConfig.jaasConfig))
}
}

Expand Down
2 changes: 2 additions & 0 deletions app/views/cluster/addCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
@b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize")
@b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays")
@b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" )
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism (only applies to SASL based security)" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config (only applies to SASL based security)")
@b3.submit('class -> "submit-button btn btn-primary"){ Save }
<a href="@routes.Application.index()" class="cancel-button btn btn-default" role="button">Cancel</a>
</fieldset>
Expand Down
2 changes: 2 additions & 0 deletions app/views/cluster/updateCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
@b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize")
@b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays")
@b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" )
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism (only applies to SASL based security)" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config (only applies to SASL based security)")
@b3.submit('class -> "submit-button btn btn-primary btn"){ Save }
<a href="@routes.Application.index()" class="cancel-button btn btn-default" role="button">Cancel</a>
</fieldset>
Expand Down
2 changes: 1 addition & 1 deletion test/controller/api/TestKafkaStateCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M

private[this] def createCluster() = {
val future = kafkaManagerContext.get.getKafkaManager.addCluster(
testClusterName,"1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol="PLAINTEXT"
testClusterName,"1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None
)
val result = Await.result(future,duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
Expand Down
Loading