From 23500266a2f659d0a4e57cb30e0f1f88a9d13581 Mon Sep 17 00:00:00 2001 From: Raphael Benitte Date: Thu, 7 Apr 2016 09:28:08 +0900 Subject: [PATCH] feat(bus): add ability to define 'push' apis --- src/Bus.js | 112 ++++++++---- src/CoreApi.js | 11 +- src/browser/mixins/ApiConsumerMixin.js | 20 ++- test/backend/Bus.test.js | 229 ++++++++++++++++++++----- 4 files changed, 285 insertions(+), 87 deletions(-) diff --git a/src/Bus.js b/src/Bus.js index eaa827ce..4e39a80e 100644 --- a/src/Bus.js +++ b/src/Bus.js @@ -2,6 +2,10 @@ import _ from 'lodash'; import chalk from 'chalk'; +const API_MODE_POLL = 'poll'; +const API_MODE_PUSH = 'push'; + + /** * @param {Mozaik} mozaik * @returns {*} @@ -15,34 +19,62 @@ const Bus = mozaik => { const subscriptions = {}; /** - * Register a new API + * Push message to matching clients. + * + * @param {String} subscriptionId + * @param {Object} data + */ + const send = (subscriptionId, data) => { + if (!_.has(subscriptions, subscriptionId)) { + mozaik.logger.warn(chalk.red(`No subscription found mathing '${subscriptionId}'`)); + + return; + } + + subscriptions[subscriptionId].clients.forEach((clientId) => { + clients[clientId].send(JSON.stringify(data)); + }); + }; + + /** + * Register a new API, * which is basically an object composed of various methods. * - * @param {String} id - * @param {Object} api + * @param {String} id unique API identifier + * @param {Object} api api function + * @param {String} mode api mode, can be one of 'poll' or 'push' */ - const registerApi = (id, api) => { + const registerApi = (id, api, mode = API_MODE_POLL) => { + if (mode !== API_MODE_POLL && mode !== API_MODE_PUSH) { + const errMsg = `API mode '${mode}' is not a valid mode, must be one of 'poll' or 'push'`; + mozaik.logger.error(chalk.red(errMsg)); + + throw new Error(errMsg); + } + if (_.has(apis, id)) { - const errMsg = `API "${ id }" already registered`; + const errMsg = `API '${id}' already registered`; mozaik.logger.error(chalk.red(errMsg)); + throw new Error(errMsg); } - apis[id] = api(mozaik); + apis[id] = { methods: api(mozaik), mode }; - mozaik.logger.info(chalk.yellow(`registered API "${ id }"`)); + mozaik.logger.info(chalk.yellow(`registered API '${id}' (mode: ${mode})`)); }; /** * Register a new client. * - * @param {Object} client - * @param {String} id + * @param {WebSocket} client + * @param {String} id */ const addClient = (client, id) => { if (_.has(clients, id)) { - const errMsg = `Client with id "${ id }" already exists`; + const errMsg = `Client with id '${id}' already exists`; mozaik.logger.error(chalk.red(errMsg)); + throw new Error(errMsg); } @@ -54,7 +86,7 @@ const Bus = mozaik => { /** * Remove a client. * - * @param id + * @param {String} id */ const removeClient = (id) => { _.forOwn(subscriptions, (subscription, subscriptionId) => { @@ -63,7 +95,7 @@ const Bus = mozaik => { // if there's no more subscribers, clear the interval // to avoid consuming APIs for nothing. if (subscription.clients.length === 0 && subscription.timer) { - mozaik.logger.info(`removing interval for ${subscriptionId}`); + mozaik.logger.info(`removing interval for '${subscriptionId}'`); clearInterval(subscription.timer); delete subscription.timer; @@ -76,26 +108,21 @@ const Bus = mozaik => { }; /** - * * @param {String} id * @param {Function} callFn * @param {Object} params */ const processApiCall = (id, callFn, params) => { - mozaik.logger.info(`Calling "${id}"`); + mozaik.logger.info(`Calling '${id}'`); callFn(params) .then(data => { - const message = { - id, - body: data - }; + const message = { id, body: data }; + // cache message subscriptions[id].cached = message; - subscriptions[id].clients.forEach((clientId) => { - clients[clientId].send(JSON.stringify(message)); - }); + send(id, message); }) .catch(err => { mozaik.logger.error(chalk.red(`[${id.split('.')[0]}] ${id} - status code: ${err.status || err.statusCode}`)); @@ -111,7 +138,7 @@ const Bus = mozaik => { */ const clientSubscription = (clientId, request) => { if (!_.has(clients, clientId)) { - mozaik.logger.error(`Unable to find a client with id "${ clientId }"`); + mozaik.logger.error(`Unable to find a client with id '${clientId}'`); return; } @@ -120,25 +147,34 @@ const Bus = mozaik => { const parts = requestId.split('.'); let errMsg; if (parts.length < 2) { - errMsg = `Invalid request id "${ requestId }", should be something like 'api_id.method'`; + errMsg = `Invalid request id '${requestId}', should be something like 'api_id.method'`; mozaik.logger.error(chalk.red(errMsg)); + throw new Error(errMsg); } if (!_.has(apis, parts[0])) { - errMsg = `Unable to find API matching id "${ parts[0] }"`; + errMsg = `Unable to find API matching id '${parts[0]}'`; mozaik.logger.error(chalk.red(errMsg)); + throw new Error(errMsg); } const api = apis[parts[0]]; - if (!_.has(api, parts[1])) { - errMsg = `Unable to find API method matching "${ parts[1] }"`; + if (!_.has(api.methods, parts[1])) { + errMsg = `Unable to find API method matching '${parts[1]}'`; mozaik.logger.error(chalk.red(errMsg)); + throw new Error(errMsg); } - const callFn = api[parts[1]]; + const callFn = api.methods[parts[1]]; + if (!_.isFunction(callFn)) { + errMsg = `API method '${parts[0]}.${parts[1]}' MUST be a function`; + mozaik.logger.error(chalk.red(errMsg)); + + throw new Error(errMsg); + } if (!subscriptions[requestId]) { subscriptions[requestId] = { @@ -146,22 +182,32 @@ const Bus = mozaik => { currentResponse: null }; - mozaik.logger.info(`Added subscription "${ requestId }"`); + mozaik.logger.info(`Added subscription '${requestId}'`); - // make an immediate call to avoid waiting for the first interval. - processApiCall(requestId, callFn, request.params); + if (api.mode === API_MODE_POLL) { + // make an immediate call to avoid waiting for the first interval. + processApiCall(requestId, callFn, request.params); + } else if (api.mode === API_MODE_PUSH) { + mozaik.logger.info(`Creating producer for '${requestId}'`); + callFn(data => { + send(requestId, { + id: requestId, + body: data + }); + }, request.params); + } } // if there is no interval running, create one - if (!subscriptions[requestId].timer) { - mozaik.logger.info(`Setting timer for "${ requestId }"`); + if (!subscriptions[requestId].timer && api.mode === API_MODE_POLL) { + mozaik.logger.info(`Setting timer for '${requestId}'`); subscriptions[requestId].timer = setInterval(() => { processApiCall(requestId, callFn, request.params); }, apisPollInterval); } // avoid adding a client for the same API call twice - if (!_.includes(subscriptions[requestId].clients, clientId)) { + if (subscriptions[requestId].clients.indexOf(clientId) === -1) { subscriptions[requestId].clients.push(clientId); // if there's an available cached response, send it immediately diff --git a/src/CoreApi.js b/src/CoreApi.js index f3ce9e4e..1adb0502 100644 --- a/src/CoreApi.js +++ b/src/CoreApi.js @@ -1,4 +1,3 @@ -import request from 'superagent-bluebird-promise'; import Promise from 'bluebird'; @@ -8,12 +7,10 @@ import Promise from 'bluebird'; const CoreApi = mozaik => { const methods = { inspector() { - return new Promise((resolve, reject) => { - resolve({ - apis: mozaik.bus.listApis(), - clientCount: mozaik.bus.clientCount(), - uptime: process.uptime() - }); + return Promise.resolve({ + apis: mozaik.bus.listApis(), + clientCount: mozaik.bus.clientCount(), + uptime: process.uptime() }); }, }; diff --git a/src/browser/mixins/ApiConsumerMixin.js b/src/browser/mixins/ApiConsumerMixin.js index 08b85a9c..c2ee7933 100644 --- a/src/browser/mixins/ApiConsumerMixin.js +++ b/src/browser/mixins/ApiConsumerMixin.js @@ -4,17 +4,33 @@ import ApiActions from './../actions/ApiActions'; const ApiConsumerMixin = { componentWillMount() { + const displayName = this.constructor.displayName || 'Unknown'; + + if (!this.getApiRequest) { + console.warn(`Seems you're trying to use 'ApiConsumerMixin' without implementing 'getApiRequest()', see '${displayName}' component`); + return; + } + this.apiRequest = this.getApiRequest(); - this.listenTo(ApiStore, this._onApiData); + if (!this.apiRequest.id) { + console.error(`'getApiRequest()' MUST return an object with an 'id' property, see '${displayName}' component`); + return; + } + + this.listenTo(ApiStore, this.onAllApiData); }, - _onApiData(data) { + onAllApiData(data) { if (data.id === this.apiRequest.id) { this.onApiData(data.body); } }, componentDidMount() { + if (!this.apiRequest || !this.apiRequest.id) { + return; + } + ApiActions.get(this.apiRequest.id, this.apiRequest.params || {}); } }; diff --git a/test/backend/Bus.test.js b/test/backend/Bus.test.js index e55be94b..699d42c9 100644 --- a/test/backend/Bus.test.js +++ b/test/backend/Bus.test.js @@ -2,10 +2,17 @@ import expect from 'expect'; import sinon from 'sinon'; import mockery from 'mockery'; +import Promise from 'bluebird'; const sandbox = sinon.sandbox.create(); let mockedMozaik; let Bus, bus; +let clock; +let apiStub; +let thenStub, catchStub; +let apiParams; +let apiSpy; +let clientSpy; describe('Mozaïk | Bus', () => { @@ -19,6 +26,8 @@ describe('Mozaïk | Bus', () => { beforeEach(() => { mockery.registerMock('chalk', require('./chalk-mock')); + clock = sinon.useFakeTimers(); + mockedMozaik = { logger: { info: sinon.spy(), @@ -35,6 +44,7 @@ describe('Mozaïk | Bus', () => { afterEach(() => { sandbox.verifyAndRestore(); + clock.restore(); mockery.deregisterAll(); }); @@ -49,21 +59,39 @@ describe('Mozaïk | Bus', () => { expect(bus.listApis()).toEqual(['test_api']); expect(mockedMozaik.logger.info.calledOnce).toEqual(true); - expect(mockedMozaik.logger.info.getCall(0).args[0]).toEqual('registered API "test_api"'); + expect(mockedMozaik.logger.info.getCall(0).args[0]).toEqual(`registered API 'test_api' (mode: poll)`); }); it('should throw if the API was already registered', () => { - bus.registerApi('test', () => { }); + bus.registerApi('test_api', () => { }); expect(mockedMozaik.logger.info.calledOnce).toEqual(true); - expect(mockedMozaik.logger.info.getCall(0).args[0]).toEqual('registered API "test"'); + expect(mockedMozaik.logger.info.getCall(0).args[0]).toEqual(`registered API 'test_api' (mode: poll)`); expect(() => { - bus.registerApi('test', () => { }); - }).toThrow('API "test" already registered'); + bus.registerApi('test_api', () => { }); + }).toThrow(`API 'test_api' already registered`); expect(mockedMozaik.logger.error.calledOnce).toEqual(true); - expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual('API "test" already registered'); + expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual(`API 'test_api' already registered`); + }); + + it(`should allow to set API mode to 'push'`, () => { + bus.registerApi('test_api', () => { }, 'push'); + + expect(bus.listApis()).toEqual(['test_api']); + expect(mockedMozaik.logger.info.calledOnce).toEqual(true); + expect(mockedMozaik.logger.info.getCall(0).args[0]).toEqual(`registered API 'test_api' (mode: push)`); + }); + + it('should throw if we pass an invalid API mode', () => { + expect(() => { + bus.registerApi('test_api', () => { }, 'invalid'); + }).toThrow(`API mode 'invalid' is not a valid mode, must be one of 'poll' or 'push'`); + + expect(mockedMozaik.logger.error.calledOnce).toEqual(true); + expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual(`API mode 'invalid' is not a valid mode, must be one of 'poll' or 'push'`); + }); }); @@ -83,10 +111,10 @@ describe('Mozaïk | Bus', () => { expect(() => { bus.addClient({}, 'test_client'); - }).toThrow('Client with id "test_client" already exists'); + }).toThrow(`Client with id 'test_client' already exists`); expect(mockedMozaik.logger.error.calledOnce).toEqual(true); - expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual('Client with id "test_client" already exists'); + expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual(`Client with id 'test_client' already exists`); }); }); @@ -104,24 +132,41 @@ describe('Mozaïk | Bus', () => { expect(mockedMozaik.logger.info.getCall(1).args[0]).toEqual('Client #test_client disconnected'); }); - it('should cleanup subscription and remove interval', () => { + it('should cleanup subscription and remove timer if no clients left', () => { + bus.addClient({ send() {} }, 'test_client'); + expect(bus.listClients()['test_client']).toExist(); + + bus.registerApi('test_api', () => ({ test() { + return Promise.resolve(true); + }})); + expect(bus.listApis()).toEqual(['test_api']); + + bus.clientSubscription('test_client', { id: 'test_api.test' }); + const subscriptions = bus.listSubscriptions(); + expect(subscriptions['test_api.test']).toExist(); + expect(subscriptions['test_api.test'].timer).toExist(); + expect(subscriptions['test_api.test'].clients).toEqual(['test_client']); + + bus.removeClient('test_client'); + expect(subscriptions['test_api.test'].timer).toNotExist(); + expect(subscriptions['test_api.test'].clients).toEqual([]); }); }); describe('clientSubscription()', () => { - let apiSpy; - it('should log an error if there is no existing client having given id', () => { apiSpy = { fetch: sinon.spy() }; bus.registerApi('test_api', () => apiSpy); bus.clientSubscription('test_client'); + const expectedError = `Unable to find a client with id 'test_client'`; + expect(apiSpy.fetch.called).toEqual(false); expect(mockedMozaik.logger.error.calledOnce).toEqual(true); - expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual('Unable to find a client with id "test_client"'); + expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual(expectedError); }); it('should throw and log an error if the request id is invalid', () => { @@ -130,13 +175,15 @@ describe('Mozaïk | Bus', () => { bus.addClient({}, 'test_client'); + const expectedError = `Invalid request id 'test_api', should be something like 'api_id.method'`; + expect(() => { bus.clientSubscription('test_client', { id: 'test_api' }); - }).toThrow(`Invalid request id "test_api", should be something like 'api_id.method'`); + }).toThrow(expectedError); expect(apiSpy.fetch.called).toEqual(false, 'Api method should not be called'); expect(mockedMozaik.logger.error.calledOnce).toEqual(true); - expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual('Invalid request id "test_api", should be something like \'api_id.method\''); + expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual(expectedError); }); it('should throw and log an error if there is no existing api for given request id', () => { @@ -145,13 +192,15 @@ describe('Mozaïk | Bus', () => { bus.addClient({}, 'test_client'); + const expectedError = `Unable to find API matching id 'invalid_api'`; + expect(() => { bus.clientSubscription('test_client', { id: 'invalid_api.invalid_method' }); - }).toThrow('Unable to find API matching id "invalid_api"'); + }).toThrow(expectedError); expect(apiSpy.fetch.called).toEqual(false, 'Api method should not be called'); expect(mockedMozaik.logger.error.calledOnce).toEqual(true); - expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual('Unable to find API matching id "invalid_api"'); + expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual(expectedError); }); it('should throw and log an error if the api method does not exists', () => { @@ -160,13 +209,109 @@ describe('Mozaïk | Bus', () => { bus.addClient({}, 'test_client'); + const expectedError = `Unable to find API method matching 'invalid_method'`; + expect(() => { bus.clientSubscription('test_client', { id: 'test_api.invalid_method' }); - }).toThrow('Unable to find API method matching "invalid_method"'); + }).toThrow(expectedError); expect(apiSpy.fetch.called).toEqual(false, 'Api method should not be called'); expect(mockedMozaik.logger.error.calledOnce).toEqual(true); - expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual('Unable to find API method matching "invalid_method"'); + expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual(expectedError); + }); + + it('should throw and log an error if the api method is not a function', () => { + bus.registerApi('test_api', () => ({ method: false })); + + bus.addClient({}, 'test_client'); + + const expectedError = `API method 'test_api.method' MUST be a function`; + + expect(() => { + bus.clientSubscription('test_client', { id: 'test_api.method' }); + }).toThrow(expectedError); + + expect(mockedMozaik.logger.error.calledOnce).toEqual(true); + expect(mockedMozaik.logger.error.getCall(0).args[0]).toEqual(expectedError); + }); + + it(`should immediately call the api if there's no matching subscription`, () => { + const apiData = { test: true }; + + thenStub = sinon.stub(); + catchStub = sinon.stub(); + apiStub = sinon.stub().returns({ then: thenStub }); + thenStub.yields(apiData).returns({ 'catch': catchStub }); + bus.registerApi('test_api', () => ({ fetch: apiStub })); + + clientSpy = { send: sinon.spy() }; + bus.addClient(clientSpy, 'test_client'); + + bus.clientSubscription('test_client', { id: 'test_api.fetch' }); + + expect(apiStub.calledOnce).toEqual(true, 'API method should have been called'); + expect(clientSpy.send.calledOnce).toEqual(true, 'API data should have been sent to the client'); + expect(clientSpy.send.getCall(0).args[0]).toEqual(JSON.stringify({ + id: 'test_api.fetch', + body: apiData + })); + }); + + it(`should create a timer if there's no matching subscription`, () => { + const apiData = { test: true }; + + thenStub = sinon.stub(); + catchStub = sinon.stub(); + apiStub = sinon.stub().returns({ then: thenStub }); + thenStub.yields(apiData).returns({ 'catch': catchStub }); + bus.registerApi('test_api', () => ({ fetch: apiStub })); + + clientSpy = { send: sinon.spy() }; + bus.addClient(clientSpy, 'test_client'); + + bus.clientSubscription('test_client', { id: 'test_api.fetch' }); + + clock.tick(15000); + + expect(apiStub.callCount).toEqual(2, 'API method should have been called'); + expect(clientSpy.send.callCount).toEqual(2, 'API data should have been sent to the client'); + expect(clientSpy.send.alwaysCalledWith(JSON.stringify({ + id: 'test_api.fetch', + body: apiData + }))).toEqual(true); + + const subscriptions = bus.listSubscriptions(); + expect(subscriptions['test_api.fetch']).toExist(); + expect(subscriptions['test_api.fetch'].timer).toExist(); + }); + + it(`should create a producer if there's no matching subscription and API mode is 'push'`, () => { + apiSpy = sinon.spy(); + bus.registerApi('test_api', () => ({ push: apiSpy }), 'push'); + + clientSpy = { send: sinon.spy() }; + bus.addClient(clientSpy, 'test_client'); + + bus.clientSubscription('test_client', { id: 'test_api.push' }); + + expect(apiSpy.calledOnce).toEqual(true, 'API method should have been called'); + + const subscriptions = bus.listSubscriptions(); + expect(subscriptions['test_api.push']).toExist(); + expect(subscriptions['test_api.push'].timer).toNotExist(); + }); + + it('should not add the same client id twice to the subscription client list', () => { + bus.registerApi('test_api', () => ({ push: () => {} }), 'push'); + bus.addClient({ send: () => {} }, 'test_client'); + bus.clientSubscription('test_client', { id: 'test_api.push' }); + + const subscriptions = bus.listSubscriptions(); + expect(subscriptions['test_api.push']).toExist(); + expect(subscriptions['test_api.push'].clients).toEqual(['test_client']); + + bus.clientSubscription('test_client', { id: 'test_api.push' }); + expect(subscriptions['test_api.push'].clients).toEqual(['test_client']); }); }); @@ -185,38 +330,36 @@ describe('Mozaïk | Bus', () => { describe('processApiCall()', () => { - let api_stub; - let then_stub, catch_stub; let api_params; it('should log api call', () => { - api_stub = sinon.stub(); - then_stub = sinon.stub(); - catch_stub = sinon.stub(); + apiStub = sinon.stub(); + thenStub = sinon.stub(); + catchStub = sinon.stub(); - then_stub.returns({ 'catch': catch_stub }); - api_stub.returns({ then: then_stub }); + thenStub.returns({ 'catch': catchStub }); + apiStub.returns({ then: thenStub }); - bus.processApiCall('test_api.test_method', api_stub); + bus.processApiCall('test_api.test_method', apiStub); expect(mockedMozaik.logger.info.calledOnce).toEqual(true); - expect(mockedMozaik.logger.info.getCall(0).args[0]).toEqual('Calling "test_api.test_method"'); + expect(mockedMozaik.logger.info.getCall(0).args[0]).toEqual(`Calling 'test_api.test_method'`); }); it('should call api', () => { - api_stub = sinon.stub(); - then_stub = sinon.stub(); - catch_stub = sinon.stub(); + apiStub = sinon.stub(); + thenStub = sinon.stub(); + catchStub = sinon.stub(); - then_stub.returns({ 'catch': catch_stub }); - api_stub.returns({ then: then_stub }); + thenStub.returns({ 'catch': catchStub }); + apiStub.returns({ then: thenStub }); api_params = { api_param: 'api_param' }; - bus.processApiCall('test_api.test_method', api_stub, api_params); + bus.processApiCall('test_api.test_method', apiStub, api_params); - expect(api_stub.calledOnce).toEqual(true, 'should have called the given api method'); - expect(api_stub.getCall(0).args[0]).toEqual(api_params); + expect(apiStub.calledOnce).toEqual(true, 'should have called the given api method'); + expect(apiStub.getCall(0).args[0]).toEqual(api_params); }); it('should cache result', () => { @@ -224,14 +367,14 @@ describe('Mozaïk | Bus', () => { clients: [] }; - api_stub = sinon.stub(); - then_stub = sinon.stub(); - catch_stub = sinon.stub(); + apiStub = sinon.stub(); + thenStub = sinon.stub(); + catchStub = sinon.stub(); - then_stub.yields('sample_data').returns({ 'catch': catch_stub }); - api_stub.returns({ then: then_stub }); + thenStub.yields('sample_data').returns({ 'catch': catchStub }); + apiStub.returns({ then: thenStub }); - bus.processApiCall('test_api.test_method', api_stub); + bus.processApiCall('test_api.test_method', apiStub); expect(bus.listSubscriptions()['test_api.test_method'].cached).toExist; expect(bus.listSubscriptions()['test_api.test_method'].cached).toEqual({ @@ -239,9 +382,5 @@ describe('Mozaïk | Bus', () => { body: 'sample_data' }); }); - - it('should log error when the api call result in an error', () => { - - }); }); }); \ No newline at end of file