Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding TLS support on the deployed Kafka cluster #487

Merged
merged 15 commits into from
Jul 2, 2018
Merged

Conversation

ppatierno
Copy link
Member

@ppatierno ppatierno commented Jun 11, 2018

Type of change

  • Enhancement / new feature

Description

This is a WIP PR for adding TLS support.
It provides a library for generating certificates and secrets and the CO mechanics for using it in order to add TLS support to the cluster.
As WIP it has some TODO and missing point. The PR is open in order to discuss about the way for doing this.

Checklist

Please go through this checklist and make sure all applicable tasks have been done

  • Write tests
  • Make sure all tests pass
  • Update documentation
  • Check RBAC rights for Kubernetes / OpenShift roles
  • Try your changes from Pod inside your Kubernetes and OpenShift cluster, not just locally
  • Reference relevant issue(s) and close them after merging

@ppatierno ppatierno requested review from tombentley and scholzj June 11, 2018 14:42
@ppatierno
Copy link
Member Author

@tombentley @scholzj pay attention that part of the PR is related to the library you already reviewed in the previous PR. It was closed automatically when I changed the branch name this morning. So please concentrate on the CO stuff :-)

Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good so far. The devil's in those TODOs though :-)

@@ -34,7 +35,7 @@
private final String metricsCmJson = "{\"animal\":\"wombat\"}";
private final String configurationJson = "{\"foo\":\"bar\"}";
private final ConfigMap cm = ResourceUtils.createKafkaClusterConfigMap(namespace, cluster, replicas, image, healthDelay, healthTimeout, metricsCmJson, configurationJson);
private final KafkaCluster kc = KafkaCluster.fromConfigMap(cm);
private final KafkaCluster kc = KafkaCluster.fromDescription(cm, Collections.EMPTY_LIST);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor point of style: I usually use Collection.emptyList(), which lets the compiler infer the type arguments for the resulting list, and avoids raw types leaking into the code.

@ppatierno
Copy link
Member Author

ppatierno commented Jun 11, 2018

@tombentley I know that the devil is elsewhere but I needed to have a starting point for opening the PR ... I'm walking around the devil for attacking it at right moment :P

@ppatierno
Copy link
Member Author

@scholzj @tombentley added logic for certificates and secrets generation until exposing them to the brokers through related volumes. Maybe it's better having another step in the review process for the overall solution. I'll fix conflicts with master later.

}

protected void setEncryptionEnabled(boolean isEncryptionEnabled) {
this.isEncryptionEnabled = isEncryptionEnabled;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wonding is there's any use case for just enabling encryption on interbroker or clients side and not the other. But I can't think of one.

@@ -317,6 +402,52 @@ public ConfigMap generateMetricsConfigMap() {
}
}

public Secret generateClientsCASecret() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we javadoc these generate*() methods to the extent that just from looking at the code we know how the CAs and certs are related?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it was on the plan ;)

return createSecret(name + "-clients-ca", data);
}

public Secret generateClientsPublicSecret() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Public secret' is an oxymoron. I think something like generateClientsPublicKeySecret() would be less jarring.


Labels labels = Labels.forCluster(assemblyName);
List<Secret> secrets = secretOperations.list(namespace, labels);
secrets.add(secretOperations.get(namespace, "internal-ca"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what's going on here, maybe a comment is needed: I assume the "internal-ca" isn't labelled with the assemblyName (and so has to be added to the list explicitly), but I don't understand why.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "internal-ca" CA certificate (key + cert) is related to the all clusters deployed by the operator. I mean that if you deploy a kafka and a kafka-connect cluster we are going to use the same CA for signing certificates related to internal communication (i.e. a kafka-connect connector connecting to the kafka cluster).
One doubt that I have now is that the same CA cert could be used even by different Kafka clusters deployed by the CO which should not be the right thing to do. @scholzj wdyt ?
I was thinking about a specific label for that but I didn't come up with a good proposal.

} else {
handler.handle(createResult);
log.error(certResult.cause().toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to release the lock in this branch?

private Secret clientsCASecret;
private Secret clientsPublicSecret;
private Secret brokersClientsSecret;
private Secret brokersInternalSecret;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can to this. The KafkaAssemblyOperator is stateless, and while the lock prevents it running concurrently on the same cluster, I don't think there's anything preventing it running simultaneously across different clusters.

I think you need to make the chained compose() pass this data via the (mostly currently ignored) lambda arguments. You could just simple java bean (hell, I'm not sure even getters are really necessary) to do that. It could be immutable in the above fields, but would need to be mutable in diffs and scale. That way you could just pass the one thing though the chain of compose()s. Wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I have got your idea ....

First of all, all the above fields plus service, headlessService and so on seems to be not needed because they come from kafka.generateXXXXX which can be used in the compose() chain.
It seems to me that just the kafka instance is needed.
The getKafkaCluster could return a Future<KafkaCluster> in order to have the kafka instance available to the first compose for calling kafka.getName() and kafka.getReplicas().
The first compose could return a Future<MyBean> where MyBean should bring the kafka instance and the result of the reconcile in the first compose; in this way the kafka instance can be used in the next compose and so on.
Did I get your point correctly ?

@ppatierno
Copy link
Member Author

@scholzj @tombentley I have added the secrets deletion on cluster deletion and the certificates handling when a reconciliation starts (no need to re-generate certs), a scale-up (generate the missing certs and update secrets), a scale-up (leaving just the needed certs and update secrets). A new review is really welcome. I'll fix conflicts with master later.


int replicasInSecret = 0;
Optional<Secret> internalSecret = secrets.stream().filter(s -> s.getMetadata().getName().equals(name + "-brokers-internal")).findFirst();
Copy link
Member

@tombentley tombentley Jun 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long line (breaks after filter() and findFirst())


if (internalSecret.isPresent() ^ clientsSecret.isPresent()) {
log.error("Inconsistency!!!");
// TODO: what to do ? raising exception ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just regenerate the missing one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that it's a good way to fix it. Instead, the other case where sizes are different should never happen; I'll raise an exception for that one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to generate certificates separately factoring out a method for that and fixing all scenario with missing certs just generating them.

// TODO: what to do ? raising exception ?
} else {

if (internalSecret.isPresent() && clientsSecret.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be an else if of the enclosing if

} else {

if (internalSecret.isPresent() && clientsSecret.isPresent()) {
if ((internalSecret.get().getData().size() - 1) != (clientsSecret.get().getData().size() - 2)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on with the - 1 and - 2? It probably deserves a comment.

internalCerts.put(
KafkaCluster.kafkaPodName(cluster, i),
new Cert(
decoder.decode(internalSecret.get().getData().get(KafkaCluster.kafkaPodName(cluster, i) + ".key")),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should refactor the decoder.decode(internalSecret.get().getData().get(KafkaCluster.kafkaPodName(cluster, i) + ".key")) stuff into separate methods (one for .key, andother for .crt). You seem to be repeating this code a lot.

@@ -302,15 +357,12 @@ public void generateCertificates(List<Secret> secrets) {
new Cert(Files.readAllBytes(brokerKeyFile.toPath()), Files.readAllBytes(brokerCertFile.toPath())));

certManager.generateCsr(brokerKeyFile, brokerCsrFile, sbj);
certManager.generateCert(brokerCsrFile, clientsCAkeyFile, clientsCAcertFile, brokerCertFile, 365);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

365 should at least be a constant. Should it be configurable?

private final Future<Void> getKafkaCluster(ConfigMap assemblyCm, List<Secret> assemblySecrets) {
Future<Void> fut = Future.future();
/**
* Brings the description of a Kafka cluster entity
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth a sentence or two about how this is used in the compose() chains, because just reading those compose chains it's not completely obvious what's going on.

Copy link
Member

@scholzj scholzj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added bunch of comments, most of them should be fairly minor.

But there is one big issue I have with this. We might have split the tasks in a wrong way. But without adding TLS support to Kafka Connect / Connect S2I and Topic Controller, this will actually break them. So I think we need to add that within this PR.

if (isEncryptionEnabled()) {
Map<String, String> data = new HashMap<>();
data.put("clients-ca.crt", Base64.getEncoder().encodeToString(clientsCA.cert()));
return createSecret(name + "-clients-ca-cert", data);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a resource which will be used by the users. Should we think of a shorter (and thus more user friendly name)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May you suggest a better name please? It was the best I had in mind :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just name it -cert? Right now it is the only cert needed for talking with Kafka for users.

vertx.createSharedWorkerExecutor("kubernetes-ops-pool").executeBlocking(
future -> {

if (secretOperations.get(namespace, "internal-ca") == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to enable / disable SSL ... how do we get this information to this place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As agreed on Strimzi/Kafka channel, we decided to create this cert in any case even if SSL was disabled because we don't have the enable/disable information here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, right ... my bad ... I forgot it already.

Copy link
Member

@tombentley tombentley Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it was me, I would pass the cm into this method, and have the handler be a Handler<AsyncResult<List<Secret>>>. In the case the feature was enabled (which it could glean from the CM) it would be a noop and would just return an empty list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a solution but we are giving the logic to get information from the cm to the secretOperations (so SecretOperator class) which in my understanding should not have this kind of knowledge but just handling Secret resources and nothing more. Or maybe I was wrong to think that.

Copy link
Member Author

@ppatierno ppatierno Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to avoid to generate the cert, the AbstractOperatorAssembly has the ConfigMap in the reconcileAssembly so it should be able to get the TLS enable status as well from there; even in this case it sounded to me as breaking some rules to have the operator assembly looking into something it shouldn't be interested in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes you're right. Then I guess we should use a compose at the call site then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley wdym ?

future -> {

if (secretOperations.get(namespace, "internal-ca") == null) {
log.info("Generating internal CA certificate ...");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I personally tend to use ... quite often, it looks bit strange in logs. Especially the second one starting with ....

}
log.info("... end generating certificate");
} else {
log.info("The internal CA certificate already exists");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming this runs on every reconciliation (since the certificates will exist most of the time) ... is the INFO level really needed? I think we should put it to lower log level.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I left this level by mistake. I needed it for testing.

Labels labels = Labels.forCluster(assemblyName);
List<Secret> secrets = secretOperations.list(namespace, labels);
secrets.add(secretOperations.get(namespace, "internal-ca"));
//Map<String, Secret> map = secrets.stream().collect(Collectors.toMap(s -> s.getMetadata().getName(), s -> s));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be deleted?

@@ -136,6 +251,10 @@ public void createOrUpdate(Reconciliation reconciliation, ConfigMap assemblyCm,
result.add(serviceOperations.reconcile(namespace, KafkaCluster.kafkaClusterName(name), null));
result.add(serviceOperations.reconcile(namespace, KafkaCluster.headlessName(name), null));
result.add(kafkaSetOperations.reconcile(namespace, KafkaCluster.kafkaClusterName(name), null));
result.add(secretOperations.reconcile(namespace, KafkaCluster.kafkaClusterName(name) + "-clients-ca", null));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above ... the secret suffixes should be defined centrally.

this.serviceOperations = serviceOperations;
this.deploymentOperations = deploymentOperations;
}

@Override
protected void createOrUpdate(Reconciliation reconciliation, ConfigMap assemblyCm, Handler<AsyncResult<Void>> handler) {
protected void createOrUpdate(Reconciliation reconciliation, ConfigMap assemblyCm, List<Secret> assemblySecrets, Handler<AsyncResult<Void>> handler) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit unfortunate design that this signature is shared across all AssemblyOperators. But probably nothing to change within this PR.

TBH, from the previous discussions I understood that this will be the place where the Description class will be used to make it possible to wrap the different requirements for each *Cluster class.

# Generate the SSL healthcheck config file
./kafka_healthcheck_ssl_config.sh | tee /tmp/healthcheck.properties

export KAFKA_SECURITY="listener.name.replication.ssl.keystore.location=/var/lib/kafka/replication.keystore.jks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/var/lib/kafka is the PV (or emptyDir). We definitely donm't want to store the certificates / keys here. This should be probably somewhere in /tmp. This applies to all the certs.

./kafka_healthcheck_ssl_config.sh | tee /tmp/healthcheck.properties

export KAFKA_SECURITY="listener.name.replication.ssl.keystore.location=/var/lib/kafka/replication.keystore.jks
listener.name.replication.ssl.keystore.password=${ENC_PASSWORD}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The password is the same. Can't we just use it once on the upper configuration level? (ssl.keystore.password)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you are right it always fallback to this one if the specific listener one is not defined.

keytool -keystore /var/lib/kafka/replication.truststore.jks -storepass $ENC_PASSWORD -noprompt -alias internal-ca -import -file /var/lib/kafka/internal-certs/internal-ca.crt
openssl pkcs12 -export -in /var/lib/kafka/internal-certs/$HOSTNAME.crt -inkey /var/lib/kafka/internal-certs/$HOSTNAME.key -chain -CAfile /var/lib/kafka/internal-certs/internal-ca.crt -name $HOSTNAME -password pass:$ENC_PASSWORD -out /var/lib/kafka/$HOSTNAME-internal.p12
keytool -importkeystore -deststorepass $ENC_PASSWORD -destkeystore /var/lib/kafka/replication.keystore.jks -srcstorepass $ENC_PASSWORD -srckeystore /var/lib/kafka/$HOSTNAME-internal.p12 -srcstoretype PKCS12
echo "... end importing certificates"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I don't think this style of log messages looks so great. Should we change it?

@scholzj
Copy link
Member

scholzj commented Jun 19, 2018

Just realised during the current conference call - you are missing RBAC rules update for secrets, or?

@ppatierno ppatierno force-pushed the tls-support branch 3 times, most recently from 131f1cc to 65d7ab1 Compare June 19, 2018 15:42
@ppatierno
Copy link
Member Author

@scholzj right. I have just fixed that.

@ppatierno
Copy link
Member Author

@scholzj @tombentley this PR lacks the documentation only.

Copy link
Member

@scholzj scholzj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starts taking a good shape. Just few minor comments. Looking forward to the docu.

@@ -113,7 +113,7 @@
</module>
<module name="NPathComplexity">
<!-- default is 200 -->
<property name="max" value="500"/>
<property name="max" value="900"/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really a question for this PR but more for the community. Why the hell do we have these rules if every second PR increases them to pass? Should we either follow them or remove them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good question and maybe we should discuss about it with the entire team. Sometimes I don't see any reason to spend more time to be compliant with the current rules (so I just increase them).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wiling to bet this was increased because of AbstractModel. I know that because I've also had to deal with AbstractModel getting increasingly cumbersome, and having to increase these parameters. So in that sense checkstyle has fulfilled its purpose of highlighting to us that there's a problem with AbstractModel. I'm hoping we'll be able to refactor AbstractModel soon and revert these parameters.

Really it's incumbent on whoever increases these to open (or ensure there is already open) a PR to address the complexity in a future sprint.

@@ -58,6 +62,8 @@

protected static final Logger log = LogManager.getLogger(AbstractModel.class.getName());

protected static final int DEFAULT_CERTS_EXPIRATION_DAYS = 365;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently do not support any changes to this. So it should not be called DEFAULT_CERTS_EXPIRATION_DAYS but just CERTS_EXPIRATION_DAYS? When I see DEFAULT_ I normally expect that the actual used value would be somewhere else.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agree on that.

@@ -40,22 +47,30 @@
private static final Logger log = LogManager.getLogger(AbstractAssemblyOperator.class.getName());

protected static final int LOCK_TIMEOUT = 60000;
protected static final int DEFAULT_CERTS_EXPIRATION_DAYS = 365;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not define this here - we should reuse the value from AbstractModel, or? To have the value defined in single place only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have any specific use case in mind but maybe could be useful to have the "general" internal CA certificates having a different expiration date from the other "cluster specific" certificates?

@@ -46,9 +57,16 @@
protected static final int REPLICATION_PORT = 9091;
protected static final String REPLICATION_PORT_NAME = "replication";

protected static final int CLIENT_ENC_PORT = 9093;
protected static final String CLIENT_ENC_PORT_NAME = "clientsenc";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a while to figure out what it means. Is it just me? Wouldn't something like clientstls be better? Ot maybe just name them plaintext and ssl to follow Kafka terminology.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ENC stays for "ENCrypted". I thought a lot about it if using SEC for "SECurity" was better or using TLS as you are suggesting. I have no strong opinion on that so TLS works fine. I'll change everywhere ENC is used and using TLS instead.

Fixed some minor changes after feedback

Dealing with "boring" checkstyle rules
Run getting Kafka cluster description in a worker thread
Added generation of CA certificate for clients communication
Added generation of the needed secrets for hosting certs/keys related to the Kafka cluster
Added volumes and mount points for exposing certs/keys to each Kafka broker
Added/Changed few overloads on ssl library + tests
Fixed changes about Collections emptylist
Added some Javadoc to generate secret methods
Added certificates and secrets handling on reconcilian, scale up/down checking certificates exist or not
Added some Javadoc
Defined a constant for certificates expiration days

Addressed review feedback about secret suffixes, certs location and logging style

Reverted back to fromConfigMap method

New ResourceTester ctor for Secrets list

Dealing with "boring" findbugs warnings
Updated the REPLICATION listener to be encrypted by default
Removed enable/disable encryption
Added utility methods for generating Secret collection for the Kafka cluster
Adapted KafkaAssemblyOperator tests in order to use Kafka cluster Secrets

Updated and fixed Kafka and Connect assembly operator tests using certificates Secrets

Added build Secrets to the MockKube

Added assertions for Secrets
@ppatierno ppatierno force-pushed the tls-support branch 2 times, most recently from a65e87a to 2e35d88 Compare June 27, 2018 10:26
Fixed clients encrypted listener parameters for keystore and truststore

Fixed path for the findbugs-exclude.xml (for Travis CI)
@ppatierno ppatierno changed the title WIP : adding TLS support Adding TLS support on the deployed Kafka cluster Jun 27, 2018
@ppatierno
Copy link
Member Author

@tombentley @scholzj I wrote the documentation in a new Security chapter. Please check if we want to dig into the feature more than that or we have to add something that I forgot.
PS : the PR is not WIP anymore.

@ppatierno ppatierno added this to the 0.5.0 milestone Jun 27, 2018
Copy link
Member

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to review the docs later.

@@ -113,7 +113,7 @@
</module>
<module name="NPathComplexity">
<!-- default is 200 -->
<property name="max" value="500"/>
<property name="max" value="900"/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wiling to bet this was increased because of AbstractModel. I know that because I've also had to deal with AbstractModel getting increasingly cumbersome, and having to increase these parameters. So in that sense checkstyle has fulfilled its purpose of highlighting to us that there's a problem with AbstractModel. I'm hoping we'll be able to refactor AbstractModel soon and revert these parameters.

Really it's incumbent on whoever increases these to open (or ensure there is already open) a PR to address the complexity in a future sprint.

<Bug pattern="EI_EXPOSE_REP,EI_EXPOSE_REP2"/>
</Match>
<Match>
<Package name="io.strimzi.certs"/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think (but would be wrong) that this is excluding those bugs (from all packages) and all bugs in that package (i.e. the union), which isn't what you meant. I think you need both the Bug and the Package inside a single Match to get their intersection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It should work as you said. I moved Package into the first Match and it works.

*/
package io.strimzi.certs;

public class Cert {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling I've asked this already, but if this class has both a key and a cert is Cert really the right name for it? Should it be CertAndKey?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only doubt is if it could have something more than just key and cert in the future. Maybe in that case even Cert can be wrong ... something with `Bundle' in the name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to give it the best name for the code as it is today, I think. It would be easy enough to change the class name at the time we added the extra thing. Or maybe if there was a need for an extra thing in the future that would be a different class (maybe a subclass).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if (sbj.subjectAltNames() != null && sbj.subjectAltNames().size() > 0) {

// subject alt names need to be in an openssl configuration file
File file = new File(getClass().getClassLoader().getResource("openssl.conf").getFile());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this works because you'll've tested it, but really getFile() on a URL does something different that what you're assuming here -- it returns the URL's path as a File. I don't see how that would work for a resource in a jar on a file system, because File doesn't understand about the !/ (or is ir /!) separator in the resource path used to delimit the end of the file on the filesystem, the remaining path being the path within the zip archive. It might not work on all platforms though. Really you should be getting the resource as a stream, rather than assuming you can access it as a File.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works at least in the unit tests because we don't have a real use case for now but in any case I changed to load data as resource stream. In any case I put them into a File used by the methods exposed in the cert manager class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I guess maven runs the tests with the .class and resource files on the filesystem and the -classpath pointing to the directory containing them, so the resource URL would be a file:// URL and so it works. I suspect if you could jar up the test .class and resource files and run the tests from that jar it would fail.

PrintWriter out = null;
try {
out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sna, true), "UTF8")));
out.append(sb.toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you building the sb only to write it to out? Why not just skip sb and write it to out directly? Also PrintWriter will swallow exceptions and you need to invoke checkError() to determine if it did so. Personally I'd just stick with calling write() on the BufferedWriter, since you're having to catch the IOException anyway.

// CA private key + self-signed certificate for clients communications
Optional<Secret> clientsCAsecret = secrets.stream().filter(s -> s.getMetadata().getName().equals(KafkaCluster.clientsCASecretName(cluster))).findFirst();
if (!clientsCAsecret.isPresent()) {
log.info("Clients CA to generate");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug

// scale up -> it will copy all certificates
// scale down -> it will copy just the requested number of replicas
for (int i = 0; i < Math.min(replicasInSecret, replicas); i++) {
log.info("{} already exists", KafkaCluster.kafkaPodName(cluster, i));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a weird and not-very-useful log message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I think that maybe just the log level is wrong and debug should be ok. With log on the next for, it provides information on what certs are going to be "reused" and what certs are going to be created.

secretOperations.reconcile(namespace, INTERNAL_CA_NAME, secret)
.compose(future::complete, future);

} catch (IOException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch Throwable, I would have thought.

kafka.generateBrokersClientsSecret(), kafka.generateBrokersInternalSecret());

future.complete(desc);
} catch (Exception e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch Throwable

if (res.succeeded()) {
fut.complete((KafkaClusterDescription) res.result());
} else {
fut.fail("");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fut.fail(res.cause())

@tombentley
Copy link
Member

tombentley commented Jun 28, 2018 via email

@tombentley
Copy link
Member

tombentley commented Jun 28, 2018 via email

@ppatierno
Copy link
Member Author

@tombentley @scholzj this is the final PR. I have handled all your feedback.


The Apache Kafka project supports data encryption by means of the SSL/TLS protocol.
This makes it possible to encrypt data transferred between brokers (interbroker communication) and between clients and brokers.
Leveraging the SSL/TLS support, it is also possible to have mutual authentication, where the Kafka broker authenticates the client by verifying its certificate, and the client does likewise with the server's certificate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually support this in this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's out of the scope for this PR. Do you think we should remove this statement because it sounds that we are going to support client auth right now?

Copy link
Member

@scholzj scholzj Jul 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is how I would interpret it ... that we support TLC client auth. So I would probably remove it.

The public key has to be signed by a certificate authority (CA) in order to have a related X.509 certificate for providing server authentication and encrypting the communication channel with the client (which could be another broker as well).
More precisely, each broker has two different certificates (and related private keys), one for interbroker communication and one for communication with clients.
These broker certificates are signed using different CAs: The "internal-ca" signs the certificates used for interbroker communication, and the "clients-ca" signs the certificates used for client communication.
The CAs themselves use self-signed certificates.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CA is always self-signed. So I'm, not sure we need to mention this here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well in a PKI you can have different CA certificates at different levels. The first one is self-signed but the other ones down the chain could be signed by the previous one (so not be self-signed), or ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but if you say CA, I expect the root certificate ... and that is always self-signed if its a root, right? Anyway ... I don't mind this, it just seemed strange.

More precisely, each broker has two different certificates (and related private keys), one for interbroker communication and one for communication with clients.
These broker certificates are signed using different CAs: The "internal-ca" signs the certificates used for interbroker communication, and the "clients-ca" signs the certificates used for client communication.
The CAs themselves use self-signed certificates.
The "internal-ca" is used across multiple clusters, that is, there is a single "internal-ca" for each instance of a cluster operator.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? I thought the original installation had it per cluster. Having it per CO is IMHO a risk. We should have at least an issue for fixing it.

Also, how does the internal CA work when it is in a different namespace than the Kafka cluster?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we agreed on having this way otherwise there were no reason for having the AbstractAssemblyOperator creating the internal-ca (which was something we agreed as well in the previous reviews); it's done in this way because it was not related to the specific cluster. Moving to be cluster specific means changing the current logic and having KafkaCluster handling this creation as all the other certificates.
I think that at this point an issue is better than changing this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it cannot be part of KafkaCluster, because the first one using it will be ZookeeperCluster. So IMHO it needs to be created in KafkaAssemblyOperator or even in AbstractAssemblyOperator, but based on a cluster. But I don't think it should be CO wide. That would mean that the clusters are not secured against each other (if I have access to cluster A I can use the keys to authenticate to cluster B).

I also didn't saw anywhere any handling of the namespace issue - that would be much easier with per cluster internal CA.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'm going to open an issue about that as soon as we'll merge this one. I'll work on this issue but maybe it's better for now merging a few stuff with CRD and Zk/Kafka log levels instead of adding code which generates more conflicts. It'll avoid to add pains on ... pains :-)

* `<cluster-name>-kafka-brokers-internal`: contains all the brokers private and public keys (certificates signed with "internal-ca") used for interbroker communication.
* `<cluster-name>-kafka-brokers-clients`: contains all the brokers private and public keys (certificates signed with specific cluster "clients-ca") used for communicating with clients.

All the keys are 2048 bits in size and are valid for 365 days from initial generation.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably mention that certificate rotation is on the backlog.

[source,shell]
keytool -keystore client.truststore.jks -alias CARoot -import -file clients-ca.crt

Finally, the minimal content for the properties file used by the Kafka client is the following (the password can be omitted if not needed).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are people actually using properties files to configure the clients?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean providing an example on passing this parameters to the console consumer on command line?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably just leave out the file ... and say something like To configure the Kafka Consumer or Producer APIs, following properties should be specified.

[source]
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/client.truststore.jks
ssl.truststore.password=test1234
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably use some placeholder instead of test1234.


mkdir -p /tmp/kafka

# Import certificates into keystore and truststore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about moving these 3 sections into a separate file?


assertEquals(String.format("CN=%s, O=%s", sbj.commonName(), sbj.organizationName()), p.getName());

// subject alternative names are not transferred automatically from a CSR to the final certificate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I asked it already somewhere, but I'm not sure I got an answer.What does this mean for us? Does it mean that we need to disable hostname verification?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the current iteration yes. I guess it's the same problem EnMasse team has today, right? I opened an issue for investigating on how having SAN working support when using openssl.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, if the clients need to disable hostname verification, it probably should be mentioned in the docu (ideally with a record in the properties example). There was a KIP which enabled it by default which I guess ended in Kafka 2.0 - so users will need to disable it. We should also hjave an issue for it - because this is quite an important problem.

}

} catch (IOException e) {
e.printStackTrace();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good idea. If nothing else it will be much more informative in the logs.

Copy link
Member

@scholzj scholzj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Removed TODO about copy_extensions

Added raising exception if internal CA certificate Secret is missing
Changed log level to DEBUG for the openssl command output

Doc improvments? (#523)

* Doc improvments?

* Update security.adoc

* Update security.adoc

Splitted in different scripts certs import into keystore/truststore
Addresses feedback about security doc

Last doc fix
@ppatierno
Copy link
Member Author

So I'm going to merge. I have already squashed "minor" commits in the the main ones steps representing "mailstones" in the PR.

@ppatierno ppatierno merged commit 06ac3b0 into master Jul 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants