Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: semaphore added #72

Merged
merged 13 commits into from
Jun 20, 2017
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ Here are the events you can listen for.
* `close` - emitted when the breaker state changes to `closed`
* `halfOpen` - emitted when the breaker state changes to `halfOpen`
* `fallback` - emitted when the breaker has a fallback function and executes it
* `semaphore-locked` - emitted when the breaker is at capacity and cannot execute the request

Handling events gives a greater level of control over your application behavior.

Expand Down
93 changes: 56 additions & 37 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const EventEmitter = require('events');
const Status = require('./status');
const HystrixStats = require('./hystrix-stats');
const Semaphore = require('./semaphore');

const STATE = Symbol('state');
const OPEN = Symbol('open');
Expand Down Expand Up @@ -42,6 +43,9 @@ const CACHE = new WeakMap();
* @param options.rollingPercentilesEnabled {boolean} This property indicates whether execution latencies
* should be tracked and calculated as percentiles.
* If they are disabled, all summary statistics (mean, percentiles) are returned as -1.
* @param options.capacity the number of concurrent requests allowed. If the number of
* currently executing function calls is equal to options.capacity, further calls to `fire()`
* are rejected until at least one of the current requests completes.
*/
class CircuitBreaker extends EventEmitter {
constructor (action, options) {
Expand All @@ -50,6 +54,8 @@ class CircuitBreaker extends EventEmitter {
this.options.rollingCountTimeout = options.rollingCountTimeout || 10000;
this.options.rollingCountBuckets = options.rollingCountBuckets || 10;
this.options.rollingPercentilesEnabled = options.rollingPercentilesEnabled !== false;
this.options.capacity = typeof options.capacity === 'number' ? options.capacity : 10;
this.semaphore = new Semaphore(this.options.capacity);

this[STATUS] = new Status(this.options);
this[STATE] = CLOSED;
Expand All @@ -76,6 +82,7 @@ class CircuitBreaker extends EventEmitter {
this.on('cacheMiss', increment('cacheMisses'));
this.on('open', _ => this[STATUS].open());
this.on('close', _ => this[STATUS].close());
this.on('semaphore-locked', increment('semaphoreRejections'));

/**
* Emitted after `options.resetTimeout` has elapsed, allowing for
Expand Down Expand Up @@ -276,47 +283,59 @@ class CircuitBreaker extends EventEmitter {
let timeoutError = false;
return new Promise((resolve, reject) => {
const latencyStartTime = Date.now();
timeout = setTimeout(
() => {
timeoutError = true;
const error = new Error(`Timed out after ${this.options.timeout}ms`);
error.code = 'ETIMEDOUT';
/**
* Emitted when the circuit breaker action takes longer than `options.timeout`
* @event CircuitBreaker#timeout
*/
const latency = Date.now() - latencyStartTime;
this.emit('timeout', error, latency);
resolve(handleError(error, this, timeout, args, latency, resolve, reject));
}, this.options.timeout);

try {
const result = this.action.apply(this.action, args);
const promise = (typeof result.then === 'function')
? result
: Promise.resolve(result);

promise.then((result) => {
if (!timeoutError) {
clearTimeout(timeout);
if (this.semaphore.test()) {
timeout = setTimeout(
() => {
timeoutError = true;
const error = new Error(`Timed out after ${this.options.timeout}ms`);
error.code = 'ETIMEDOUT';
/**
* Emitted when the circuit breaker action succeeds
* @event CircuitBreaker#success
* Emitted when the circuit breaker action takes longer than `options.timeout`
* @event CircuitBreaker#timeout
*/
this.emit('success', result, (Date.now() - latencyStartTime));
resolve(result);
if (this.options.cache) {
CACHE.set(this, promise);
const latency = Date.now() - latencyStartTime;
this.semaphore.release();
this.emit('timeout', error, latency);
resolve(handleError(error, this, timeout, args, latency, resolve, reject));
}, this.options.timeout);

try {
const result = this.action.apply(this.action, args);
const promise = (typeof result.then === 'function')
? result
: Promise.resolve(result);

promise.then((result) => {
if (!timeoutError) {
clearTimeout(timeout);
/**
* Emitted when the circuit breaker action succeeds
* @event CircuitBreaker#success
*/
this.emit('success', result, (Date.now() - latencyStartTime));
this.semaphore.release();
resolve(result);
if (this.options.cache) {
CACHE.set(this, promise);
}
}
}
})
.catch((error) => {
const latencyEndTime = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latencyEndTime, resolve, reject);
});
} catch (error) {
})
.catch((error) => {
this.semaphore.release();
const latencyEndTime = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latencyEndTime, resolve, reject);
});
} catch (error) {
this.semaphore.release();
const latency = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latency, resolve, reject);
}
} else {
const latency = Date.now() - latencyStartTime;
handleError(error, this, timeout, args, latency, resolve, reject);
const err = new Error('Semaphore locked');
err.code = 'ESEMLOCKED';
this.emit('semaphore-locked', err, latency);
handleError(err, this, timeout, args, latency, resolve, reject);
}
});
}
Expand Down
6 changes: 3 additions & 3 deletions lib/hystrix-formatter.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function hystrixFormatter (stats) {
json.rollingCountFallbackRejection = 0;
json.rollingCountFallbackSuccess = 0;
json.rollingCountResponsesFromCache = stats.cacheHits;
json.rollingCountSemaphoreRejected = stats.rejects;
json.rollingCountSemaphoreRejected = stats.semaphoreRejections;
json.rollingCountShortCircuited = stats.rejects;
json.rollingCountSuccess = stats.successes;
json.rollingCountThreadPoolRejected = 0;
Expand Down Expand Up @@ -67,8 +67,8 @@ function hystrixFormatter (stats) {
json.propertyValue_executionTimeoutInMilliseconds = stats.options.timeout;
json.propertyValue_executionIsolationThreadInterruptOnTimeout = true;
json.propertyValue_executionIsolationThreadPoolKeyOverride = null;
json.propertyValue_executionIsolationSemaphoreMaxConcurrentRequests = 10;
json.propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests = 10;
json.propertyValue_executionIsolationSemaphoreMaxConcurrentRequests = stats.options.capacity;
json.propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests = stats.options.capacity;
json.propertyValue_metricsRollingStatisticalWindowInMilliseconds = 10000;
json.propertyValue_requestCacheEnabled = stats.options.cache || false;
json.propertyValue_requestLogEnabled = true;
Expand Down
54 changes: 54 additions & 0 deletions lib/semaphore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
'use strict';

module.exports = exports = semaphore;

function semaphore (count) {
const resolvers = [];
let counter = count;

let sem = {
take,
release,
test
};

Object.defineProperty(sem, 'count', {
get: _ => counter,
enumerable: true
});

return sem;

function take (timeout) {
if (counter > 0) {
--counter;
return Promise.resolve(release);
}
return new Promise((resolve, reject) => {
resolvers.push(_ => {
--counter;
resolve(release);
});
if (timeout) {
setTimeout(_ => {
resolvers.shift();
const err = new Error(`Timed out after ${timeout}ms`);
err.code = 'ETIMEDOUT';
reject(err);
}, timeout);
}
});
}

function release () {
counter++;
if (resolvers.length > 0) {
resolvers.shift()();
}
}

function test () {
if (counter < 1) return false;
return take() && true;
}
}
1 change: 1 addition & 0 deletions lib/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ const bucket = _ => ({
timeouts: 0,
cacheHits: 0,
cacheMisses: 0,
semaphoreRejections: 0,
percentiles: {},
latencyTimes: []
});
Expand Down
Loading