diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala index 7a3427c8a..85faed1f2 100644 --- a/app/controllers/Cluster.scala +++ b/app/controllers/Cluster.scala @@ -75,6 +75,14 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag case Success(_) => Valid } } + val validateSASLmechanism: Constraint[Option[String]] = Constraint("validate SASL mechanism") { stringOption => + Try { + stringOption.foreach(SASLmechanism.from) + } match { + case Failure(t) => Invalid(t.getMessage) + case Success(_) => Valid + } + } val clusterConfigForm = Form( mapping( @@ -114,6 +122,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag )(ClusterTuning.apply)(ClusterTuning.unapply) ) , "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol) + , "saslMechanism" -> optional(text).verifying(validateSASLmechanism) + , "jaasConfig" -> optional(text) )(ClusterConfig.apply)(ClusterConfig.customUnapply) ) @@ -156,6 +166,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag )(ClusterTuning.apply)(ClusterTuning.unapply) ) , "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol) + , "saslMechanism" -> optional(text).verifying(validateSASLmechanism) + , "jaasConfig" -> optional(text) )(ClusterOperation.apply)(ClusterOperation.customUnapply) ) @@ -176,6 +188,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag ,false ,Option(defaultTuning) ,PLAINTEXT + ,None + ,None ) } @@ -223,7 +237,9 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, - cc.securityProtocol.stringId + cc.securityProtocol.stringId, + cc.saslMechanism.map(_.stringId), + cc.jaasConfig )) })) } @@ -247,6 +263,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag clusterConfig.filterConsumers, clusterConfig.tuning, clusterConfig.securityProtocol.stringId, + clusterConfig.saslMechanism.map(_.stringId), + clusterConfig.jaasConfig, clusterConfig.logkafkaEnabled, clusterConfig.activeOffsetCacheEnabled, clusterConfig.displaySizeEnabled @@ -316,6 +334,8 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag clusterOperation.clusterConfig.filterConsumers, clusterOperation.clusterConfig.tuning, clusterOperation.clusterConfig.securityProtocol.stringId, + clusterOperation.clusterConfig.saslMechanism.map(_.stringId), + clusterOperation.clusterConfig.jaasConfig, clusterOperation.clusterConfig.logkafkaEnabled, clusterOperation.clusterConfig.activeOffsetCacheEnabled, clusterOperation.clusterConfig.displaySizeEnabled diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index a0b39bb9c..08a0cf192 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -263,6 +263,8 @@ class KafkaManager(akkaConfig: Config) extends Logging { filterConsumers: Boolean, tuning: Option[ClusterTuning], securityProtocol: String, + saslMechanism: Option[String], + jaasConfig: Option[String], logkafkaEnabled: Boolean = false, activeOffsetCacheEnabled: Boolean = false, displaySizeEnabled: Boolean = false): Future[ApiError \/ Unit] = @@ -273,6 +275,8 @@ class KafkaManager(akkaConfig: Config) extends Logging { zkHosts, tuning = tuning, securityProtocol = securityProtocol, + saslMechanism = saslMechanism, + jaasConfig = jaasConfig, jmxEnabled = jmxEnabled, jmxUser = jmxUser, jmxPass = jmxPass, @@ -298,6 +302,8 @@ class KafkaManager(akkaConfig: Config) extends Logging { filterConsumers: Boolean, tuning: Option[ClusterTuning], securityProtocol: String, + saslMechanism: Option[String], + jaasConfig: Option[String], logkafkaEnabled: Boolean = false, activeOffsetCacheEnabled: Boolean = false, displaySizeEnabled: Boolean = false): Future[ApiError \/ Unit] = @@ -308,6 +314,8 @@ class KafkaManager(akkaConfig: Config) extends Logging { zkHosts, tuning = tuning, securityProtocol = securityProtocol, + saslMechanism = saslMechanism, + jaasConfig = jaasConfig, jmxEnabled = jmxEnabled, jmxUser = jmxUser, jmxPass = jmxPass, diff --git a/app/kafka/manager/actor/KafkaManagerActor.scala b/app/kafka/manager/actor/KafkaManagerActor.scala index 9751f11b3..c898a78e2 100644 --- a/app/kafka/manager/actor/KafkaManagerActor.scala +++ b/app/kafka/manager/actor/KafkaManagerActor.scala @@ -484,6 +484,7 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) && newConfig.jmxEnabled == currentConfig.jmxEnabled && newConfig.jmxUser == currentConfig.jmxUser && newConfig.jmxPass == currentConfig.jmxPass + && newConfig.jmxSsl == currentConfig.jmxSsl && newConfig.logkafkaEnabled == currentConfig.logkafkaEnabled && newConfig.pollConsumers == currentConfig.pollConsumers && newConfig.filterConsumers == currentConfig.filterConsumers @@ -491,6 +492,8 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig) && newConfig.displaySizeEnabled == currentConfig.displaySizeEnabled && newConfig.tuning == currentConfig.tuning && newConfig.securityProtocol == currentConfig.securityProtocol + && newConfig.saslMechanism == currentConfig.saslMechanism + && newConfig.jaasConfig == currentConfig.jaasConfig ) { //nothing changed false diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 5d17469db..5cb0ffe66 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -45,6 +45,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} import org.apache.kafka.clients.consumer.ConsumerConfig._ import org.apache.kafka.clients.consumer.internals.ConsumerProtocol +import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.protocol.Errors import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG @@ -100,6 +101,14 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba } props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.clusterContext.config.securityProtocol.stringId) props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListStr) + if(config.clusterContext.config.saslMechanism.nonEmpty){ + props.put(SaslConfigs.SASL_MECHANISM, config.clusterContext.config.saslMechanism.get.stringId) + log.info(s"SASL Mechanism =${config.clusterContext.config.saslMechanism.get}") + } + if(config.clusterContext.config.jaasConfig.nonEmpty){ + props.put(SaslConfigs.SASL_JAAS_CONFIG, config.clusterContext.config.jaasConfig.get) + log.info(s"SASL JAAS config=${config.clusterContext.config.jaasConfig.get}") + } log.info(s"Creating admin client with security protocol=${config.clusterContext.config.securityProtocol.stringId} , broker list : $brokerListStr") AdminClient.create(props) } @@ -257,6 +266,14 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext cp => props.putAll(cp) } props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clusterContext.config.securityProtocol.stringId) + if(clusterContext.config.saslMechanism.nonEmpty){ + props.put(SaslConfigs.SASL_MECHANISM, clusterContext.config.saslMechanism.get.stringId) + info(s"SASL Mechanism =${clusterContext.config.saslMechanism.get}") + if(clusterContext.config.jaasConfig.nonEmpty){ + props.put(SaslConfigs.SASL_JAAS_CONFIG, clusterContext.config.jaasConfig.get) + info(s"SASL JAAS config=${clusterContext.config.jaasConfig.get}") + } + } Try { info("Constructing new kafka consumer client using these properties: ") props.asScala.foreach { @@ -1459,6 +1476,15 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom val port: Int = broker.endpoints(securityProtocol) consumerProperties.put(BOOTSTRAP_SERVERS_CONFIG, s"${broker.host}:$port") consumerProperties.put(SECURITY_PROTOCOL_CONFIG, securityProtocol.stringId) + // Use secure endpoint if available + if(kaConfig.clusterContext.config.saslMechanism.nonEmpty){ + consumerProperties.put(SaslConfigs.SASL_MECHANISM, kaConfig.clusterContext.config.saslMechanism.get.stringId) + log.info(s"SASL Mechanism =${kaConfig.clusterContext.config.saslMechanism.get}") + } + if(kaConfig.clusterContext.config.jaasConfig.nonEmpty){ + consumerProperties.put(SaslConfigs.SASL_JAAS_CONFIG, kaConfig.clusterContext.config.jaasConfig.get) + log.info(s"SASL JAAS config=${kaConfig.clusterContext.config.jaasConfig.get}") + } var kafkaConsumer: Option[KafkaConsumer[Any, Any]] = None try { kafkaConsumer = Option(new KafkaConsumer(consumerProperties)) diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index 1d24d5c6f..6c25cf79d 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -167,6 +167,8 @@ object ClusterConfig { , displaySizeEnabled: Boolean = false , tuning: Option[ClusterTuning] , securityProtocol: String + , saslMechanism: Option[String] + , jaasConfig: Option[String] ) : ClusterConfig = { val kafkaVersion = KafkaVersion(version) //validate cluster name @@ -190,15 +192,17 @@ object ClusterConfig { , displaySizeEnabled , tuning , SecurityProtocol(securityProtocol) + , saslMechanism.flatMap(SASLmechanism.from) + , jaasConfig ) } def customUnapply(cc: ClusterConfig) : Option[( - String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String)] = { + String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = { Some(( cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry, cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.pollConsumers, cc.filterConsumers, - cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId + cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId, cc.saslMechanism.map(_.stringId), cc.jaasConfig ) ) } @@ -242,6 +246,8 @@ object ClusterConfig { :: ("displaySizeEnabled" -> toJSON(config.displaySizeEnabled)) :: ("tuning" -> toJSON(config.tuning)) :: ("securityProtocol" -> toJSON(config.securityProtocol.stringId)) + :: ("saslMechanism" -> toJSON(config.saslMechanism.map(_.stringId))) + :: ("jaasConfig" -> toJSON(config.jaasConfig)) :: Nil) compact(render(json)).getBytes(StandardCharsets.UTF_8) } @@ -267,6 +273,9 @@ object ClusterConfig { val clusterTuning = fieldExtended[Option[ClusterTuning]]("tuning")(json) val securityProtocolString = fieldExtended[String]("securityProtocol")(json) val securityProtocol = securityProtocolString.map(SecurityProtocol.apply).getOrElse(PLAINTEXT) + val saslMechanismString = fieldExtended[Option[String]]("saslMechanism")(json) + val saslMechanism = saslMechanismString.map(_.flatMap(SASLmechanism.from)) + val jaasConfig = fieldExtended[Option[String]]("jaasConfig")(json) ClusterConfig.apply( name, @@ -282,7 +291,9 @@ object ClusterConfig { activeOffsetCacheEnabled.getOrElse(false), displaySizeEnabled.getOrElse(false), clusterTuning.getOrElse(None), - securityProtocol + securityProtocol, + saslMechanism.getOrElse(None), + jaasConfig.getOrElse(None) ) } @@ -413,6 +424,8 @@ case class ClusterConfig (name: String , displaySizeEnabled: Boolean , tuning: Option[ClusterTuning] , securityProtocol: SecurityProtocol + , saslMechanism: Option[SASLmechanism] + , jaasConfig: Option[String] ) sealed trait SecurityProtocol { @@ -446,3 +459,37 @@ object SecurityProtocol { val formSelectList : IndexedSeq[(String,String)] = typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId)) def apply(s: String) : SecurityProtocol = typesMap(s.toUpperCase) } + +sealed trait SASLmechanism { + def stringId: String +} +case object SASL_MECHANISM_PLAIN extends SASLmechanism { + val stringId = "PLAIN" +} + +case object SASL_MECHANISM_GSSAPI extends SASLmechanism { + val stringId = "GSSAPI" +} + +case object SASL_MECHANISM_SCRAM256 extends SASLmechanism { + val stringId = "SCRAM-SHA-256" +} +case object SASL_MECHANISM_SCRAM512 extends SASLmechanism { + val stringId = "SCRAM-SHA-512" +} + +object SASLmechanism { + private[this] val typesMap: Map[String, SASLmechanism] = Map( + SASL_MECHANISM_PLAIN.stringId -> SASL_MECHANISM_PLAIN + , SASL_MECHANISM_GSSAPI.stringId -> SASL_MECHANISM_GSSAPI + , SASL_MECHANISM_SCRAM256.stringId -> SASL_MECHANISM_SCRAM256 + , SASL_MECHANISM_SCRAM512.stringId -> SASL_MECHANISM_SCRAM512 + ) + + val formSelectList : IndexedSeq[(String,String)] = IndexedSeq(("DEFAULT", "DEFAULT")) ++ typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId)) + private def apply(s: String) : SASLmechanism = typesMap(s.toUpperCase) + def from(s: String) : Option[SASLmechanism] = s.toUpperCase match { + case "DEFAULT" => None + case other => Option(apply(other)) + } +} diff --git a/app/models/form/ClusterOperation.scala b/app/models/form/ClusterOperation.scala index ebfe57bac..558a0adac 100644 --- a/app/models/form/ClusterOperation.scala +++ b/app/models/form/ClusterOperation.scala @@ -5,7 +5,7 @@ package models.form -import kafka.manager.model.{ClusterConfig, ClusterTuning, SecurityProtocol} +import kafka.manager.model.{ClusterConfig, ClusterTuning, SASLmechanism, SecurityProtocol} /** * @author hiral @@ -46,17 +46,21 @@ object ClusterOperation { , displaySizeEnabled: Boolean , tuning: Option[ClusterTuning] , securityProtocol: String + , saslMechanism: Option[String] + , jaasConfig: Option[String] ): ClusterOperation = { ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, jmxUser, jmxPass, jmxSsl, - pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol)) + pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol, saslMechanism, jaasConfig)) } - def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String)] = { + def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = { Option((co.op.toString, co.clusterConfig.name, co.clusterConfig.version.toString, co.clusterConfig.curatorConfig.zkConnect, co.clusterConfig.curatorConfig.zkMaxRetry, co.clusterConfig.jmxEnabled, co.clusterConfig.jmxUser, co.clusterConfig.jmxPass, co.clusterConfig.jmxSsl, co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled, - co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId)) + co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId, + co.clusterConfig.saslMechanism.map(_.stringId), + co.clusterConfig.jaasConfig)) } } diff --git a/app/views/cluster/addCluster.scala.html b/app/views/cluster/addCluster.scala.html index 56ab2a44d..6042e274d 100644 --- a/app/views/cluster/addCluster.scala.html +++ b/app/views/cluster/addCluster.scala.html @@ -56,6 +56,8 @@ @b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize") @b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays") @b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" ) + @b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism (only applies to SASL based security)" ) + @b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config (only applies to SASL based security)") @b3.submit('class -> "submit-button btn btn-primary"){ Save } Cancel diff --git a/app/views/cluster/updateCluster.scala.html b/app/views/cluster/updateCluster.scala.html index 87a1ad288..5d3045247 100644 --- a/app/views/cluster/updateCluster.scala.html +++ b/app/views/cluster/updateCluster.scala.html @@ -59,6 +59,8 @@ @b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize") @b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays") @b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" ) + @b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism (only applies to SASL based security)" ) + @b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config (only applies to SASL based security)") @b3.submit('class -> "submit-button btn btn-primary btn"){ Save } Cancel diff --git a/test/controller/api/TestKafkaStateCheck.scala b/test/controller/api/TestKafkaStateCheck.scala index b9de16471..f8e212ebb 100644 --- a/test/controller/api/TestKafkaStateCheck.scala +++ b/test/controller/api/TestKafkaStateCheck.scala @@ -76,7 +76,7 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M private[this] def createCluster() = { val future = kafkaManagerContext.get.getKafkaManager.addCluster( - testClusterName,"1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol="PLAINTEXT" + testClusterName,"1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None ) val result = Await.result(future,duration) result.toEither.left.foreach(apiError => sys.error(apiError.msg)) diff --git a/test/kafka/manager/TestBrokerViewCacheActor.scala b/test/kafka/manager/TestBrokerViewCacheActor.scala index 144da4a3d..b9aa67227 100644 --- a/test/kafka/manager/TestBrokerViewCacheActor.scala +++ b/test/kafka/manager/TestBrokerViewCacheActor.scala @@ -38,12 +38,12 @@ class TestBrokerViewCacheActor extends KafkaServerInTest with BaseTest { private[this] implicit val timeout: Timeout = 10.seconds private[this] var brokerViewCacheActor : Option[ActorRef] = None - private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism=None, jaasConfig=None) private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig) override protected def beforeAll(): Unit = { super.beforeAll() - val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism=None, jaasConfig=None) val clusterContext = ClusterContext(ClusterFeatures.from(clusterConfig), clusterConfig) val ksConfig = KafkaStateActorConfig(sharedCurator, "pinned-dispatcher", clusterContext, LongRunningPoolConfig(2,100), LongRunningPoolConfig(2,100), 5, 10000, None, KafkaManagedOffsetCacheConfig()) val props = Props(classOf[KafkaStateActor],ksConfig) diff --git a/test/kafka/manager/TestClusterManagerActor.scala b/test/kafka/manager/TestClusterManagerActor.scala index 49f27ed66..cd25403f5 100644 --- a/test/kafka/manager/TestClusterManagerActor.scala +++ b/test/kafka/manager/TestClusterManagerActor.scala @@ -46,7 +46,7 @@ class TestClusterManagerActor extends CuratorAwareTest with BaseTest { override protected def beforeAll(): Unit = { super.beforeAll() - val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism=None, jaasConfig=None) val curatorConfig = CuratorConfig(testServer.getConnectString) val config = ClusterManagerActorConfig( "pinned-dispatcher" diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala index e96eb35e7..084798d31 100644 --- a/test/kafka/manager/TestKafkaManager.scala +++ b/test/kafka/manager/TestKafkaManager.scala @@ -9,10 +9,10 @@ import java.util.concurrent.atomic.AtomicBoolean import com.typesafe.config.{Config, ConfigFactory} import kafka.manager.features.KMDeleteTopicFeature -import kafka.manager.model.{Kafka_0_8_1_1, ActorModel} +import kafka.manager.model._ import kafka.manager.utils.CuratorAwareTest -import kafka.manager.model.ActorModel.{KafkaManagedConsumer, ZKManagedConsumer, TopicList} -import kafka.test.{NewKafkaManagedConsumer, SimpleProducer, HighLevelConsumer, SeededBroker} +import kafka.manager.model.ActorModel.{KafkaManagedConsumer, TopicList, ZKManagedConsumer} +import kafka.test.{HighLevelConsumer, NewKafkaManagedConsumer, SeededBroker, SimpleProducer} import scala.concurrent.Await import scala.concurrent.duration._ @@ -97,7 +97,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { simpleProducerThread.foreach(_.start()) Thread.sleep(1000) - //val future = kafkaManager.addCluster("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT") + //val future = kafkaManager.addCluster("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = "PLAIN", jaasConfig = None) //val result = Await.result(future,duration) //assert(result.isRight === true) //Thread.sleep(2000) @@ -125,7 +125,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("add cluster") { - val future = kafkaManager.addCluster("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT") + val future = kafkaManager.addCluster("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) val result = Await.result(future,duration) assert(result.isRight === true) Thread.sleep(2000) @@ -376,7 +376,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("update cluster zkhost") { - val future = kafkaManager.updateCluster("dev","1.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val future = kafkaManager.updateCluster("dev","1.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val result = Await.result(future,duration) assert(result.isRight === true) @@ -411,7 +411,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("update cluster version") { - val future = kafkaManager.updateCluster("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val future = kafkaManager.updateCluster("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val result = Await.result(future,duration) assert(result.isRight === true) Thread.sleep(2000) @@ -433,7 +433,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("update cluster logkafka enabled and activeOffsetCache enabled") { - val future = kafkaManager.updateCluster("dev","1.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val future = kafkaManager.updateCluster("dev","1.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val result = Await.result(future,duration) assert(result.isRight === true) @@ -447,6 +447,34 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { Thread.sleep(2000) } + test("update cluster security protocol and sasl mechanism") { + val future = kafkaManager.updateCluster("dev","1.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val result = Await.result(future,duration) + assert(result.isRight === true) + + Thread.sleep(2000) + + val future2 = kafkaManager.getClusterList + val result2 = Await.result(future2,duration) + assert(result2.isRight === true) + assert((result2.toOption.get.active.find(c => c.name == "dev").get.securityProtocol === SASL_PLAINTEXT) && + (result2.toOption.get.active.find(c => c.name == "dev").get.saslMechanism === Option(SASL_MECHANISM_PLAIN))) + Thread.sleep(2000) + + val future3 = kafkaManager.updateCluster("dev","1.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) + val result3 = Await.result(future3,duration) + assert(result3.isRight === true) + + Thread.sleep(2000) + + val future4 = kafkaManager.getClusterList + val result4 = Await.result(future4,duration) + assert(result4.isRight === true) + assert((result4.toOption.get.active.find(c => c.name == "dev").get.securityProtocol === PLAINTEXT) && + (result4.toOption.get.active.find(c => c.name == "dev").get.saslMechanism === None)) + Thread.sleep(2000) + } + /* test("get consumer list active mode") { val future = kafkaManager.getConsumerListExtended("dev") diff --git a/test/kafka/manager/TestKafkaManagerActor.scala b/test/kafka/manager/TestKafkaManagerActor.scala index 4416ec84c..b7d8a59db 100644 --- a/test/kafka/manager/TestKafkaManagerActor.scala +++ b/test/kafka/manager/TestKafkaManagerActor.scala @@ -68,7 +68,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("add cluster") { - val cc = ClusterConfig("dev","1.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val cc = ClusterConfig("dev","1.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -79,7 +79,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster zkhost") { - val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -111,7 +111,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster version") { - val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -138,7 +138,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { println(result) result.msg.contains("dev") } - val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -155,7 +155,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster logkafka enabled") { - val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -168,7 +168,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { test("update cluster tuning") { val newTuning = getClusterTuning(3, 101, 11, 10000, 10000, 1) val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, - tuning = Option(newTuning), securityProtocol="PLAINTEXT" + tuning = Option(newTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None ) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get @@ -184,7 +184,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster security protocol") { - val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT") + val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) diff --git a/test/kafka/manager/TestKafkaStateActor.scala b/test/kafka/manager/TestKafkaStateActor.scala index 7134c9280..b7cc2d574 100644 --- a/test/kafka/manager/TestKafkaStateActor.scala +++ b/test/kafka/manager/TestKafkaStateActor.scala @@ -38,7 +38,7 @@ class TestKafkaStateActor extends KafkaServerInTest with BaseTest { override val kafkaServerZkPath = broker.getZookeeperConnectionString private[this] var kafkaStateActor : Option[ActorRef] = None private[this] implicit val timeout: Timeout = 10.seconds - private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism=None, jaasConfig=None) private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig) override protected def beforeAll(): Unit = { diff --git a/test/kafka/manager/TestLogkafkaStateActor.scala b/test/kafka/manager/TestLogkafkaStateActor.scala index 51056d0d4..281e385ed 100644 --- a/test/kafka/manager/TestLogkafkaStateActor.scala +++ b/test/kafka/manager/TestLogkafkaStateActor.scala @@ -37,7 +37,7 @@ class TestLogkafkaStateActor extends KafkaServerInTest with BaseTest { override val kafkaServerZkPath = broker.getZookeeperConnectionString private[this] var logkafkaStateActor : Option[ActorRef] = None private[this] implicit val timeout: Timeout = 10.seconds - private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism=None, jaasConfig=None) private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig) override protected def beforeAll(): Unit = { diff --git a/test/kafka/manager/TestLogkafkaViewCacheActor.scala b/test/kafka/manager/TestLogkafkaViewCacheActor.scala index ccb32c6e3..e24ff9293 100644 --- a/test/kafka/manager/TestLogkafkaViewCacheActor.scala +++ b/test/kafka/manager/TestLogkafkaViewCacheActor.scala @@ -39,12 +39,12 @@ class TestLogkafkaViewCacheActor extends KafkaServerInTest with BaseTest { private[this] implicit val timeout: Timeout = 10.seconds private[this] var logkafkaViewCacheActor : Option[ActorRef] = None - private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism=None, jaasConfig=None) private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig) override protected def beforeAll(): Unit = { super.beforeAll() - val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism=None, jaasConfig=None) val clusterContext = ClusterContext(ClusterFeatures.from(clusterConfig), clusterConfig) val props = Props(classOf[KafkaStateActor],sharedCurator, defaultClusterContext) diff --git a/test/kafka/manager/utils/TestClusterConfig.scala b/test/kafka/manager/utils/TestClusterConfig.scala index 6b86a4044..f465bdd6e 100644 --- a/test/kafka/manager/utils/TestClusterConfig.scala +++ b/test/kafka/manager/utils/TestClusterConfig.scala @@ -14,18 +14,18 @@ class TestClusterConfig extends FunSuite with Matchers { test("invalid name") { intercept[IllegalArgumentException] { - ClusterConfig("qa!","0.8.1.1","localhost",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + ClusterConfig("qa!","0.8.1.1","localhost",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) } } test("invalid kafka version") { intercept[IllegalArgumentException] { - ClusterConfig("qa","0.8.1","localhost:2181",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + ClusterConfig("qa","0.8.1","localhost:2181",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) } } test("serialize and deserialize 0.8.1.1") { - val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -33,7 +33,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.8.2.0 +jmx credentials") { - val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, jmxUser = Some("mario"), jmxPass = Some("rossi"), jmxSsl = false, pollConsumers = true, filterConsumers = true, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, jmxUser = Some("mario"), jmxPass = Some("rossi"), jmxSsl = false, pollConsumers = true, filterConsumers = true, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -41,7 +41,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.8.2.0") { - val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -49,7 +49,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.8.2.1") { - val cc = ClusterConfig("qa","0.8.2.1","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa","0.8.2.1","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -57,7 +57,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.8.2.2") { - val cc = ClusterConfig("qa","0.8.2.2","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa","0.8.2.2","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -65,7 +65,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("deserialize without version, jmxEnabled, and security protocol") { - val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val noverison = serialize.replace(""","kafkaVersion":"0.8.2.0"""","").replace(""","jmxEnabled":false""","").replace(""","jmxSsl":false""","") assert(!noverison.contains("kafkaVersion")) @@ -77,7 +77,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("deserialize from 0.8.2-beta as 0.8.2.0") { - val cc = ClusterConfig("qa","0.8.2-beta","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa","0.8.2-beta","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val noverison = serialize.replace(""","kafkaVersion":"0.8.2.0"""",""","kafkaVersion":"0.8.2-beta"""") val deserialize = ClusterConfig.deserialize(noverison) @@ -108,6 +108,8 @@ class TestClusterConfig extends FunSuite with Matchers { , Option(18) )) , securityProtocol = "PLAINTEXT" + , saslMechanism = None + , jaasConfig = None ) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) @@ -116,7 +118,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.10.0.0") { - val cc = ClusterConfig("qa", "0.10.0.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa", "0.10.0.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -124,7 +126,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.10.1.0") { - val cc = ClusterConfig("qa", "0.10.1.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa", "0.10.1.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -132,7 +134,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.10.1.1") { - val cc = ClusterConfig("qa", "0.10.1.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa", "0.10.1.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -140,7 +142,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.10.2.0") { - val cc = ClusterConfig("qa", "0.10.2.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa", "0.10.2.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -148,7 +150,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.10.2.1") { - val cc = ClusterConfig("qa", "0.10.2.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa", "0.10.2.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -156,7 +158,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.11.0.0") { - val cc = ClusterConfig("qa", "0.11.0.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa", "0.11.0.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -164,7 +166,7 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 0.11.0.2") { - val cc = ClusterConfig("qa", "0.11.0.2", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa", "0.11.0.2", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) @@ -172,7 +174,15 @@ class TestClusterConfig extends FunSuite with Matchers { } test("serialize and deserialize 1.0.0") { - val cc = ClusterConfig("qa", "1.0.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT") + val cc = ClusterConfig("qa", "1.0.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } + + test("serialize and deserialize 1.1.0") { + val cc = ClusterConfig("qa", "1.1.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) val serialize: String = ClusterConfig.serialize(cc) val deserialize = ClusterConfig.deserialize(serialize) assert(deserialize.isSuccess === true) diff --git a/test/kafka/manager/utils/TestCreateLogkafka.scala b/test/kafka/manager/utils/TestCreateLogkafka.scala index 696c41885..6f647beea 100644 --- a/test/kafka/manager/utils/TestCreateLogkafka.scala +++ b/test/kafka/manager/utils/TestCreateLogkafka.scala @@ -22,7 +22,7 @@ class TestCreateLogkafka extends CuratorAwareTest with BaseTest { import logkafka82.LogkafkaConfigErrors._ private[this] val adminUtils = new LogkafkaAdminUtils(Kafka_0_8_2_0) - private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig) private[this] val createLogkafkaLogkafkaId = "km-unit-test-logkafka-logkafka_id" private[this] val createLogkafkaLogPath = "/km-unit-test-logkafka-logpath" diff --git a/test/kafka/manager/utils/TestCreateTopic.scala b/test/kafka/manager/utils/TestCreateTopic.scala index f953de692..399efd3ea 100644 --- a/test/kafka/manager/utils/TestCreateTopic.scala +++ b/test/kafka/manager/utils/TestCreateTopic.scala @@ -20,7 +20,7 @@ import scala.concurrent.Future class TestCreateTopic extends CuratorAwareTest with BaseTest { private[this] val adminUtils = new AdminUtils(Kafka_0_8_2_0) - private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig) test("create topic with empty name") { diff --git a/test/kafka/manager/utils/TestReassignPartitions.scala b/test/kafka/manager/utils/TestReassignPartitions.scala index 1cfba3188..8e6187ae3 100644 --- a/test/kafka/manager/utils/TestReassignPartitions.scala +++ b/test/kafka/manager/utils/TestReassignPartitions.scala @@ -29,7 +29,7 @@ class TestReassignPartitions extends CuratorAwareTest with BaseTest { private[this] val brokerList = Set(1,2,3) - private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT") + private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig) private[this] def mytopic1 : TopicIdentity = getTopicIdentity("mytopic1")