Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable ssl client connections to brokers (#334) #383

Merged
merged 10 commits into from
Jun 29, 2016
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ Follow the [instructions](http://kafka.apache.org/documentation.html#quickstart)

# API
## Client
### Client(connectionString, clientId, [zkOptions], [noAckBatchOptions])
### Client(connectionString, clientId, [zkOptions], [noAckBatchOptions], [sslOptions])
* `connectionString`: Zookeeper connection string, default `localhost:2181/`
* `clientId`: This is a user-supplied identifier for the client application, default `kafka-node-client`
* `zkOptions`: **Object**, Zookeeper options, see [node-zookeeper-client](https://github.com/alexguan/node-zookeeper-client#client-createclientconnectionstring-options)
* `noAckBatchOptions`: **Object**, when requireAcks is disabled on Producer side we can define the batch properties, 'noAckBatchSize' in bytes and 'noAckBatchAge' in milliseconds. The default value is `{ noAckBatchSize: null, noAckBatchAge: null }` and it acts as if there was no batch
* `sslOptions`: **Object**, options to be passed to the tls broker sockets, ex. { rejectUnauthorized: false }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a blurb about how this is only supported for kafka 0.9 and above?


### close(cb)
Closes the connection to Zookeeper and the brokers so that the node process can exit gracefully.
Expand Down
9 changes: 9 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,19 @@ services:
image: wurstmeister/kafka:0.9.0.1
ports:
- "9092:9092"
- "9093:9093"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: ${KAFKA_ADVERTISED_HOST_NAME}
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: "PLAINTEXT://:9092,SSL://:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://192.168.64.2:9092,SSL://192.168.64.2:9093"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you will need to change the hardcoded IP to ${KAFKA_ADVERTISED_HOST_NAME}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix.

KAFKA_SSL_KEYSTORE_LOCATION: "/var/private/ssl/certs/server.keystore.jks"
KAFKA_SSL_KEYSTORE_PASSWORD: "password"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can generate keys w/o a password and skip these entries.

KAFKA_SSL_KEY_PASSWORD: "password"
KAFKA_SSL_TRUSTSTORE_LOCATION: "/var/private/ssl/certs/server.truststore.jks"
KAFKA_SSL_TRUSTSTORE_PASSWORD: "password"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- ./docker/certs:/var/private/ssl/certs
17 changes: 17 additions & 0 deletions docker/certs/ca-cert
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICsDCCAhmgAwIBAgIJAIyGnMEdl6tCMA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTYwNjIxMTQxMDA5WhcNNDMxMTA2MTQxMDA5WjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQCebmAuz/u/vaGmm+rd+bSu4xDqWOD0nTlHgVcPps1mGNy8I+Je5ft4r8UXN6B4
65rmJgXztg/HuBKU8mRO1VjwH7huA7NwsO+ve6i1eYQYBq3MtfxRj9gH4tE76147
ET5FoFc8xn/2K09Lc/W5zhJHLeUDlYKP/SUiw5dFqKURMQIDAQABo4GnMIGkMB0G
A1UdDgQWBBTOi9ZSqUCykQrG0fwkA91MsC8HDTB1BgNVHSMEbjBsgBTOi9ZSqUCy
kQrG0fwkA91MsC8HDaFJpEcwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgTClNvbWUt
U3RhdGUxITAfBgNVBAoTGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZIIJAIyGnMEd
l6tCMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQEFBQADgYEATpA/7+o740mEIVYY
xnZ3QR0TpW4SEudRBKoCH+obaSJH5LVxy59VohV8JeA0yQTRwn2SF1WQ5ZC5EuVO
daAfUwaYWwkGXrzS30KDuj5ospv6kUaIoSnHZvulFPQbqamO5mTsvoCZxKM79Ttz
EoM/vfy/Dehln4zt7ti1YdcU8ZM=
-----END CERTIFICATE-----
1 change: 1 addition & 0 deletions docker/certs/ca-cert.srl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
839412E1B55B2B15
18 changes: 18 additions & 0 deletions docker/certs/ca-key
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-----BEGIN RSA PRIVATE KEY-----
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels wrong to commit up certs and a private key, but they are only for development use.

Proc-Type: 4,ENCRYPTED
DEK-Info: DES-EDE3-CBC,6641B585EC61FD26

x6SWVdRDkrpsjLFwacn5vq1dOQwkwFvuGlJ9X1lf/lMYa//JUZtDRMi2Wea+WYhj
V31UEECgZHZxbzp74No5e4Bh05hN5id00Js0MRHMYDB3S0Hs7y75SQoAlqBqsGaC
VQX42xrNSzLW8jSvGxa+oX4YV54zdRdhmaNzEoWQ+qXMMdGezR1dRp6SGpwBnMv3
UXlSYP+v6EFeGiUxhqatM9Sm6oXAcg71bJrc8VmHz6IPOeXm9+kRKZze/lQYmH0e
m1V4H46+kVBWGiStNiQa5IgJyDd2vAVO/p+/v3bbm9+U7I7LAW09NSpTVgTCb0Vv
RXEyBiBT1P48Aa9zNzk+DCOsSugdOjDj6HqJHs0mqC8D9wNuRmIEZm3D6SkZGf7C
XXJF0fDlt3pDXwOyJEC96SEFoYOef4h7Z0IEfAcRSOkhgrywZg9IzrAHbCbsX5Ij
/8NgGAaKDhef0p0niAXT+KmNV4AO8KHQKBVTga1jxcnCQEhBD4J8RqS7B6WQ+GqB
IE/fgV41JRyGOO6Hw/Y/0hjKbmYJTC9sjoUJ0a9QdVK7gZxGpd4Svo2XyVy2PfdR
DDVF4UJYKLaEWxXvUK1p7Dx+n2dqmkCoxJzomk0hXUfk/DtfeFD5++6STtAQJVgb
QUPWC3Enfmf79Fv8tbHPM7KClsd6Gc3pPLHC7UPIc3/Pgip5jrEFu8Jgm4kB2dJy
RQO/MB6LtU3nexdWaI6VDD3eTmrSNyRK/lCWLy8QHKK95+/xNmnZKfeXSm0D6ErR
Tm6nomuO0YleeWImoLzvA7WKXjwpqaUTzHnpZEsktYRAzUOIiG4ZlA==
-----END RSA PRIVATE KEY-----
14 changes: 14 additions & 0 deletions docker/certs/cert-file
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-----BEGIN NEW CERTIFICATE REQUEST-----
MIICoDCCAl4CAQAwbDEQMA4GA1UEBhMHVW5rbm93bjEQMA4GA1UECBMHVW5rbm93bjEQMA4GA1UE
BxMHVW5rbm93bjEQMA4GA1UEChMHVW5rbm93bjEQMA4GA1UECxMHVW5rbm93bjEQMA4GA1UEAxMH
VW5rbm93bjCCAbcwggEsBgcqhkjOOAQBMIIBHwKBgQD9f1OBHXUSKVLfSpwu7OTn9hG3UjzvRADD
Hj+AtlEmaUVdQCJR+1k9jVj6v8X1ujD2y5tVbNeBO4AdNG/yZmC3a5lQpaSfn+gEexAiwk+7qdf+
t8Yb+DtX58aophUPBPuD9tPFHsMCNVQTWhaRMvZ1864rYdcq7/IiAxmd0UgBxwIVAJdgUI8VIwvM
spK5gqLrhAvwWBz1AoGBAPfhoIXWmz3ey7yrXDa4V7l5lK+7+jrqgvlXTAs9B4JnUVlXjrrUWU/m
cQcQgYC0SRZxI+hMKBYTt88JMozIpuE8FnqLVHyNKOCjrh4rs6Z1kW6jfwv6ITVi8ftiegEkO8yk
8b6oUZCJqIPf4VrlnwaSi2ZegHtVJWQBTDv+z0kqA4GEAAKBgDtgBe2nMFIxCeMGzzyiqyYxDWbp
BgQgsowJG3O1gq7grLDgokoo5nFd4YwoSGTFapnGKjU46y4jj7jkhMDn2oj9ufmxBoikUN0y39q0
IdFiB/6y4eIXe1p61FVtKaFXn+RZWARRlFW3lTSOaLqnplRuqMGtw75M9y1c1k+I0oheoDAwLgYJ
KoZIhvcNAQkOMSEwHzAdBgNVHQ4EFgQUSH9fbKplQZ0gqwB0h3Kk0vC442wwCwYHKoZIzjgEAwUA
Ay8AMCwCFGHc9tgI+oUoFuuTnyQHp4EH14UFAhRhUfVRQdo4jBA/h9I5wVsxdfZSPw==
-----END NEW CERTIFICATE REQUEST-----
20 changes: 20 additions & 0 deletions docker/certs/cert-signed
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDQTCCAqoCCQCDlBLhtVsrFTANBgkqhkiG9w0BAQUFADBFMQswCQYDVQQGEwJB
VTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0
cyBQdHkgTHRkMB4XDTE2MDYyMTE0MTIyN1oXDTQzMTEwNjE0MTIyN1owbDEQMA4G
A1UEBhMHVW5rbm93bjEQMA4GA1UECBMHVW5rbm93bjEQMA4GA1UEBxMHVW5rbm93
bjEQMA4GA1UEChMHVW5rbm93bjEQMA4GA1UECxMHVW5rbm93bjEQMA4GA1UEAxMH
VW5rbm93bjCCAbcwggEsBgcqhkjOOAQBMIIBHwKBgQD9f1OBHXUSKVLfSpwu7OTn
9hG3UjzvRADDHj+AtlEmaUVdQCJR+1k9jVj6v8X1ujD2y5tVbNeBO4AdNG/yZmC3
a5lQpaSfn+gEexAiwk+7qdf+t8Yb+DtX58aophUPBPuD9tPFHsMCNVQTWhaRMvZ1
864rYdcq7/IiAxmd0UgBxwIVAJdgUI8VIwvMspK5gqLrhAvwWBz1AoGBAPfhoIXW
mz3ey7yrXDa4V7l5lK+7+jrqgvlXTAs9B4JnUVlXjrrUWU/mcQcQgYC0SRZxI+hM
KBYTt88JMozIpuE8FnqLVHyNKOCjrh4rs6Z1kW6jfwv6ITVi8ftiegEkO8yk8b6o
UZCJqIPf4VrlnwaSi2ZegHtVJWQBTDv+z0kqA4GEAAKBgDtgBe2nMFIxCeMGzzyi
qyYxDWbpBgQgsowJG3O1gq7grLDgokoo5nFd4YwoSGTFapnGKjU46y4jj7jkhMDn
2oj9ufmxBoikUN0y39q0IdFiB/6y4eIXe1p61FVtKaFXn+RZWARRlFW3lTSOaLqn
plRuqMGtw75M9y1c1k+I0oheMA0GCSqGSIb3DQEBBQUAA4GBAGAK0H3OrcCe5K7c
yz9B/FBk40kjXhy0u2acVxaWh3+1avanzabo2NgbKv3q+Bkg6kNNzFAk5f2gbll2
zT1lHZcaiQcpgHw7z1zuQ2OTrSUcpXYZXEqoJufgiBJkRZLOoRXQFodN7I8MLKHU
m9cl0wJGujKuydSiZJ20h/ecpwuS
-----END CERTIFICATE-----
Binary file added docker/certs/client.truststore.jks
Binary file not shown.
Binary file added docker/certs/server.keystore.jks
Binary file not shown.
Binary file added docker/certs/server.truststore.jks
Binary file not shown.
59 changes: 50 additions & 9 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var net = require('net'),
tls = require('tls'),
util = require('util'),
_ = require('lodash'),
async = require('async'),
Expand All @@ -15,6 +16,7 @@ var net = require('net'),
Message = protocol.Message,
zk = require('./zookeeper'),
Zookeeper = zk.Zookeeper,
url = require('url'),
debug = require('debug')('kafka-node:Client');
/**
* Communicates with kafka brokers
Expand All @@ -37,11 +39,14 @@ var net = require('net'),
*
* @constructor
*/
var Client = function (connectionString, clientId, zkOptions, noAckBatchOptions) {
var Client = function (connectionString, clientId, zkOptions, noAckBatchOptions, sslOptions) {
if (this instanceof Client === false) {
return new Client(connectionString, clientId);
}

this.sslOptions = sslOptions;
this.ssl = !!sslOptions;

this.connectionString = connectionString || 'localhost:2181/';
this.clientId = clientId || 'kafka-node-client';
this.zkOptions = zkOptions;
Expand All @@ -66,13 +71,32 @@ Client.prototype.connect = function () {
zk.once('init', function (brokers) {
self.ready = true;
self.brokerMetadata = brokers;
self.brokerProfiles = {};
Object
.keys(brokers)
.some(function (key, index) {
var broker = brokers[key];
self.setupBroker(broker.host, broker.port, false, self.brokers);
.forEach(function (key, index) {
var brokerProfile = brokers[key];
var addr = brokerProfile.host + ':' + brokerProfile.port;
self.brokerProfiles[addr] = brokerProfile;
if (self.ssl) {
var sslAddress = brokerProfile.endpoints.find(function(endpoint) {
return url.parse(endpoint).protocol === 'ssl:';
});
if(!sslAddress) {
var err = new Error('No SSL kafka endpoint found');
self.emit('error', err);
return;
}
var sslUrl = url.parse(sslAddress);

brokerProfile.sslHost = sslUrl.hostName;
brokerProfile.sslPort = +sslUrl.port;
}

// Only connect one broker
return !index;
if (index === 0) {
self.setupBroker(brokerProfile.host, brokerProfile.port, false, self.brokers);
}
});
self.emit('ready');
});
Expand Down Expand Up @@ -477,15 +501,16 @@ Client.prototype.payloadsByLeader = function (payloads) {

Client.prototype.leaderByPartition = function (topic, partition) {
var topicMetadata = this.topicMetadata;
return topicMetadata[topic] && topicMetadata[topic][partition] && topicMetadata[topic][partition]['leader'];
return topicMetadata[topic] && topicMetadata[topic][partition] && topicMetadata[topic][partition].leader;
};

Client.prototype.brokerForLeader = function (leader, longpolling) {
var addr;
var brokers = this.getBrokers(longpolling);
// If leader is not give, choose the first broker as leader
if (typeof leader === 'undefined') {
if (!_.isEmpty(brokers)) {
var addr = Object.keys(brokers)[0];
addr = Object.keys(brokers)[0];
return brokers[addr];
} else if (!_.isEmpty(this.brokerMetadata)) {
leader = Object.keys(this.brokerMetadata)[0];
Expand All @@ -495,7 +520,8 @@ Client.prototype.brokerForLeader = function (leader, longpolling) {
}
}
var metadata = this.brokerMetadata[leader];
return brokers[metadata.host + ':' + metadata.port] || this.setupBroker(metadata.host, metadata.port, longpolling, brokers);
addr = metadata.host + ':' + metadata.port;
return brokers[addr] || this.setupBroker(metadata.host, metadata.port, longpolling, brokers);
};

Client.prototype.getBrokers = function(longpolling) {
Expand All @@ -509,8 +535,23 @@ Client.prototype.setupBroker = function (host, port, longpolling, brokers) {
};

Client.prototype.createBroker = function connect(host, port, longpolling) {

var addr = host + ':' + port;

//prefer host and port from broker discovery
//this gets populated by the initial discovery of brokers from kafka
var brokerProfile = this.brokerProfiles[addr];

host = brokerProfile[this.ssl ? 'sslHost' : 'host'] || host;
port = brokerProfile[this.ssl ? 'sslPort' : 'port'] || port;

var self = this;
var socket = net.createConnection(port, host);
var socket;
if (self.ssl) {
socket = tls.connect(port, host, self.sslOptions);
} else {
socket = net.createConnection(port, host);
}
socket.addr = host + ':' + port;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use addr here so we don't have to concat host and port again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addr may have changed because of :545/:546 it is important that we recalculate it here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, I see that now. 👍

socket.host = host;
socket.port = port;
Expand Down
Loading