Skip to content

Commit

Permalink
feat: implement our own semaphore
Browse files Browse the repository at this point in the history
Replacing 'await-semaphore' since we need the `test()` function
and there does not appear to be a widely used/maintained semaphore
implementation that provides it and doesn't also depend on the
`async/await` keywords at runtime.
  • Loading branch information
lance committed Jun 20, 2017
1 parent b3d7b6f commit 4d0c057
Show file tree
Hide file tree
Showing 7 changed files with 4,870 additions and 21 deletions.
25 changes: 16 additions & 9 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const EventEmitter = require('events');
const Status = require('./status');
const HystrixStats = require('./hystrix-stats');
const Semaphore = require('await-semaphore').Semaphore;
const Semaphore = require('./semaphore');

const STATE = Symbol('state');
const OPEN = Symbol('open');
Expand Down Expand Up @@ -79,6 +79,7 @@ class CircuitBreaker extends EventEmitter {
this.on('cacheMiss', increment('cacheMisses'));
this.on('open', _ => this[STATUS].open());
this.on('close', _ => this[STATUS].close());
this.on('semaphore-locked', increment('semaphoreRejections'));

/**
* Emitted after `options.resetTimeout` has elapsed, allowing for
Expand Down Expand Up @@ -277,9 +278,9 @@ class CircuitBreaker extends EventEmitter {

let timeout;
let timeoutError = false;
return this.semaphore.acquire().then((release) => {
return new Promise((resolve, reject) => {
const latencyStartTime = Date.now();
return new Promise((resolve, reject) => {
const latencyStartTime = Date.now();
if (this.semaphore.test()) {
timeout = setTimeout(
() => {
timeoutError = true;
Expand All @@ -290,7 +291,7 @@ class CircuitBreaker extends EventEmitter {
* @event CircuitBreaker#timeout
*/
const latency = Date.now() - latencyStartTime;
release();
this.semaphore.release();
this.emit('timeout', error, latency);
resolve(handleError(error, this, timeout, args, latency, resolve, reject));
}, this.options.timeout);
Expand All @@ -309,24 +310,30 @@ class CircuitBreaker extends EventEmitter {
* @event CircuitBreaker#success
*/
this.emit('success', result, (Date.now() - latencyStartTime));
release();
this.semaphore.release();
resolve(result);
if (this.options.cache) {
CACHE.set(this, promise);
}
}
})
.catch((error) => {
release();
this.semaphore.release();
const latencyEndTime = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latencyEndTime, resolve, reject);
});
} catch (error) {
release();
this.semaphore.release();
const latency = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latency, resolve, reject);
}
});
} else {
const latency = Date.now() - latencyStartTime;
const err = new Error('Semaphore locked');
err.code = 'ESEMLOCKED';
this.emit('semaphore-locked', err, latency);
handleError(err, this, timeout, args, latency, resolve, reject);
}
});
}

Expand Down
54 changes: 54 additions & 0 deletions lib/semaphore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

module.exports = exports = semaphore;

function semaphore (count) {
const resolvers = [];
let counter = count;

let sem = {
take,
release,
test
};

Object.defineProperty(sem, 'count', {
get: _ => counter,
enumerable: true
});

return sem;

function take (timeout) {
if (counter > 0) {
--counter;
return Promise.resolve(release);
}
return new Promise((resolve, reject) => {
resolvers.push(_ => {
--counter;
resolve(release);
});
if (timeout) {
setTimeout(_ => {
resolvers.shift();
const err = new Error(`Timed out after ${timeout}ms`);
err.code = 'ETIMEDOUT';
reject(err);
}, timeout);
}
});
}

function release () {
counter++;
if (resolvers.length > 0) {
resolvers.shift()();
}
}

function test () {
if (counter < 1) return false;
return take() && true;
}
}
1 change: 1 addition & 0 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ const bucket = _ => ({
timeouts: 0,
cacheHits: 0,
cacheMisses: 0,
semaphoreRejections: 0,
percentiles: {},
latencyTimes: []
});
Expand Down
Loading

0 comments on commit 4d0c057

Please sign in to comment.