From 163aa7e84f685ecb0b4fffafc2cdc4ef4696900e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?z=C5=8Dng=20y=C7=94?= Date: Mon, 19 Sep 2016 22:56:39 +0800 Subject: [PATCH] fix: AppWorkerClient subscribe same data failed issue (#110) --- lib/core/app_worker_client.js | 15 +- test/fixtures/apps/agent-client-app/agent.js | 19 + test/fixtures/apps/agent-client-app/app.js | 10 + .../apps/agent-client-app/app/router.js | 37 ++ .../apps/agent-client-app/package.json | 3 + test/lib/core/app_worker_client.test.js | 366 ++++++++++-------- 6 files changed, 285 insertions(+), 165 deletions(-) create mode 100644 test/fixtures/apps/agent-client-app/agent.js create mode 100644 test/fixtures/apps/agent-client-app/app.js create mode 100644 test/fixtures/apps/agent-client-app/app/router.js create mode 100644 test/fixtures/apps/agent-client-app/package.json diff --git a/lib/core/app_worker_client.js b/lib/core/app_worker_client.js index 2ec4ef823b..bc33a52cb7 100644 --- a/lib/core/app_worker_client.js +++ b/lib/core/app_worker_client.js @@ -6,6 +6,7 @@ const Base = require('sdk-base'); const MAX_VALUE = Math.pow(2, 31) - 10; const PROCESS_ID = String(process.pid); +const hasOwnProperty = Object.prototype.hasOwnProperty; const defaultOptions = { responseTimeout: '5s', @@ -29,7 +30,7 @@ class AppWorkerClient extends Base { super(); - // hsf 服务依赖肯定会比较多的,设置为 100 个 + // 服务依赖肯定会比较多的,设置为 100 个 this.setMaxListeners(100); this.options = {}; @@ -230,14 +231,18 @@ class AppWorkerClient extends Base { _subscribe(info, listener) { const key = this._formatKey(info); this.on(key, listener); - if (!this._subscriptions.has(key)) { + const subInfo = this._subscriptions.get(key); + if (!subInfo) { const subData = { key, info, pid: this.pid, }; - this._subscriptions.set(key, subData); + this._subscriptions.set(key, { subData }); this._sendToAgent(this.commands.sub, subData); + } else if (hasOwnProperty.call(subInfo, 'value')) { + // trigger listener immediately + listener(subInfo.value); } return this; } @@ -299,7 +304,7 @@ class AppWorkerClient extends Base { // 重新订阅 for (const key of this._subscriptions.keys()) { const info = this._subscriptions.get(key); - this._sendToAgent(this.commands.sub, info); + this._sendToAgent(this.commands.sub, info.subData); } this.logger.info('[egg:worker] [%s] reSubscribe done for "agent restart"', this.name); @@ -345,6 +350,8 @@ class AppWorkerClient extends Base { const key = data.key; if (this._subscriptions.has(key)) { this.logger.info('[egg:worker] [%s] key[%s] value changed, new value: %j', this.name, key, data.value); + const info = this._subscriptions.get(key); + info.value = data.value; this.emit(key, data.value); } } diff --git a/test/fixtures/apps/agent-client-app/agent.js b/test/fixtures/apps/agent-client-app/agent.js new file mode 100644 index 0000000000..b938c39879 --- /dev/null +++ b/test/fixtures/apps/agent-client-app/agent.js @@ -0,0 +1,19 @@ +'use strict'; + +module.exports = function initAgent(agent) { + const client = { + 'mock-data': 'mock-data', + 'not-exist-data': null, + ready(cb) { + setImmediate(cb); + }, + }; + + agent.startAgent({ + name: 'sub-client', + client, + subscribe(info, listener) { + listener(client[info]); + }, + }); +}; diff --git a/test/fixtures/apps/agent-client-app/app.js b/test/fixtures/apps/agent-client-app/app.js new file mode 100644 index 0000000000..28651a69cf --- /dev/null +++ b/test/fixtures/apps/agent-client-app/app.js @@ -0,0 +1,10 @@ +'use strict'; + +module.exports = function initApp(app) { + app.subClient = app.createAppWorkerClient('sub-client', { + subscribe(info, listener) { + this._subscribe(info, listener); + return this; + }, + }); +}; diff --git a/test/fixtures/apps/agent-client-app/app/router.js b/test/fixtures/apps/agent-client-app/app/router.js new file mode 100644 index 0000000000..dea9dee441 --- /dev/null +++ b/test/fixtures/apps/agent-client-app/app/router.js @@ -0,0 +1,37 @@ +'use strict'; + +module.exports = function(app) { + const done = app.readyCallback('app_subscribe_data'); + app.subClient.subscribe('mock-data', val => { + app.mockData = val; + done(); + }); + + const done1 = app.readyCallback('app_subscribe_not_exist_data'); + app.subClient.subscribe('not-exist-data', val => { + app.notExistData = val; + done1(); + }); + + app.get('/', function*() { + const val = yield cb => app.subClient.subscribe('mock-data', val => { + cb(null, val); + }); + + this.body = { + 'mock-data': val, + 'app-mock-data': app.mockData, + }; + }); + + app.get('/not-exist', function*() { + const val = yield cb => app.subClient.subscribe('not-exist-data', val => { + cb(null, val); + }); + + this.body = { + 'not-exist-data': null, + 'app-not-exist-data': null, + }; + }); +}; diff --git a/test/fixtures/apps/agent-client-app/package.json b/test/fixtures/apps/agent-client-app/package.json new file mode 100644 index 0000000000..8955a84e9b --- /dev/null +++ b/test/fixtures/apps/agent-client-app/package.json @@ -0,0 +1,3 @@ +{ + "name": "agent-client-app" +} diff --git a/test/lib/core/app_worker_client.test.js b/test/lib/core/app_worker_client.test.js index 8cf820eabe..92fbd90e1d 100644 --- a/test/lib/core/app_worker_client.test.js +++ b/test/lib/core/app_worker_client.test.js @@ -3,205 +3,249 @@ const should = require('should'); const mm = require('egg-mock'); const pedding = require('pedding'); +const request = require('supertest'); const utils = require('../../utils'); describe('test/lib/core/app_worker_client.test.js', () => { - let app; - let client; + describe('apps/demo', () => { + let app; + let client; - before(() => { - app = utils.app('apps/demo'); - return app.ready(); - }); - before(done => { - const impl = { - subscribe(reg, listener) { - this._subscribe(reg, listener); - return this; - }, - - unSubscribe(reg, listener) { - return this._unSubscribe(reg, listener); - }, - - * getData(id) { - return yield this._invoke('getData', [ id ]); - }, - - sendOneway(data) { - this._invokeOneway('sendOneway', [ data ]); - }, - }; - - client = app.createAppWorkerClient('mock', impl, { - responseTimeout: 3000, - force: true, - }); - - client._on('xxx', () => {}); - client._once('yyy', () => {}); - client._removeListener('xxx', () => {}); - client._removeAllListeners('xxx'); - client._removeAllListeners('yyy'); - - client.ready(done); - }); + before(() => { + app = utils.app('apps/demo'); + return app.ready(); + }); + before(done => { + const impl = { + subscribe(reg, listener) { + this._subscribe(reg, listener); + return this; + }, + + unSubscribe(reg, listener) { + return this._unSubscribe(reg, listener); + }, + + * getData(id) { + return yield this._invoke('getData', [ id ]); + }, + + sendOneway(data) { + this._invokeOneway('sendOneway', [ data ]); + }, + }; + + client = app.createAppWorkerClient('mock', impl, { + responseTimeout: 3000, + force: true, + }); - afterEach(mm.restore); + client._on('xxx', () => {}); + client._once('yyy', () => {}); + client._removeListener('xxx', () => {}); + client._removeAllListeners('xxx'); + client._removeAllListeners('yyy'); - it('should work', () => { - mm(client, '_opaque', Math.pow(2, 31) - 10); + client.ready(done); + }); - client.publicEvents.should.eql([ 'agent_restart', 'error' ]); - client._getNextOpaque().should.equal(0); - }); + afterEach(mm.restore); - it('should subscribe data well', done => { - done = pedding(done, 2); - const info = { - dataId: 'mockId', - groupId: 'mockGroup', - }; + it('should work', () => { + mm(client, '_opaque', Math.pow(2, 31) - 10); - client.subscribe(info, function(value) { - value.should.equal('mock data'); - done(); + client.publicEvents.should.eql([ 'agent_restart', 'error' ]); + client._getNextOpaque().should.equal(0); }); - client.messenger.emit('mock_subscribe_changed', { - key: JSON.stringify(info), - info, - value: 'mock data', - }); + it('should subscribe data well', done => { + done = pedding(done, 2); + const info = { + dataId: 'mockId', + groupId: 'mockGroup', + }; - client.unSubscribe(info, () => {}); - client.unSubscribe(info); + client.subscribe(info, function(value) { + value.should.equal('mock data'); + done(); + }); - client._subscriptions.has(JSON.stringify(info)).should.equal(false); + client.messenger.emit('mock_subscribe_changed', { + key: JSON.stringify(info), + info, + value: 'mock data', + }); - client.messenger.emit('mock_subscribe_changed', { - key: JSON.stringify(info), - info, - value: 'mock data2', - }); + client.unSubscribe(info, () => {}); + client.unSubscribe(info); - setTimeout(() => done(), 500); - }); + client._subscriptions.has(JSON.stringify(info)).should.equal(false); - it('should subscribe string well', done => { - const info = 'mock-info'; + client.messenger.emit('mock_subscribe_changed', { + key: JSON.stringify(info), + info, + value: 'mock data2', + }); - client.subscribe(info, function(value) { - value.should.equal('mock data'); - done(); + setTimeout(() => done(), 500); }); - client.messenger.emit('mock_subscribe_changed', { - key: JSON.stringify(info), - info, - value: 'mock data', - }); - }); + it('should subscribe string well', done => { + const info = 'mock-info'; - it('should invoke API well', function* () { - mm(client, '_opaque', 1); + client.subscribe(info, function(value) { + value.should.equal('mock data'); + done(); + }); - setTimeout(() => { - client.messenger.emit('mock_invoke_response', { - opaque: 1, - success: true, - data: 'mock data', + client.messenger.emit('mock_subscribe_changed', { + key: JSON.stringify(info), + info, + value: 'mock data', }); - }, 100); + }); - const result = yield client.getData('123'); - result.should.equal('mock data'); - }); + it('should invoke API well', function* () { + mm(client, '_opaque', 1); - it('should invoke API well with wrong opaque', function* () { - let warned = false; - mm(client, '_opaque', 10); - const logger = client.logger; - mm(logger, 'warn', (msg, name, data) => { - if (data === 'mock data') { - warned = true; - } + setTimeout(() => { + client.messenger.emit('mock_invoke_response', { + opaque: 1, + success: true, + data: 'mock data', + }); + }, 100); + + const result = yield client.getData('123'); + result.should.equal('mock data'); }); - setTimeout(() => { - client.messenger.emit('mock_invoke_response', { - opaque: 1, - success: true, - data: 'mock data', + it('should invoke API well with wrong opaque', function* () { + let warned = false; + mm(client, '_opaque', 10); + const logger = client.logger; + mm(logger, 'warn', (msg, name, data) => { + if (data === 'mock data') { + warned = true; + } }); - }, 100); - - try { - yield client.getData('123'); - } catch (err) { - // do noting - } - should(warned).equal(true); - }); - it('should invoke oneway ok', done => { - client._invoke('sendOneway', [ '123' ], { - oneway: true, - }).then(done); - }); + setTimeout(() => { + client.messenger.emit('mock_invoke_response', { + opaque: 1, + success: true, + data: 'mock data', + }); + }, 100); + + try { + yield client.getData('123'); + } catch (err) { + // do noting + } + should(warned).equal(true); + }); - it('should call sendOneway ok', done => { - mm(client, '_opaque', 1); - mm(client, '_sendToAgent', (cmd, data) => { - cmd.should.equal('mock_invoke_request'); - const pid = client.pid; - data.should.eql({ - opaque: 1, - method: 'sendOneway', - args: [ '123' ], - pid, + it('should invoke oneway ok', done => { + client._invoke('sendOneway', [ '123' ], { oneway: true, + }).then(done); + }); + + it('should call sendOneway ok', done => { + mm(client, '_opaque', 1); + mm(client, '_sendToAgent', (cmd, data) => { + cmd.should.equal('mock_invoke_request'); + const pid = client.pid; + data.should.eql({ + opaque: 1, + method: 'sendOneway', + args: [ '123' ], + pid, + oneway: true, + }); + done(); }); - done(); + client.sendOneway('123'); }); - client.sendOneway('123'); - }); - it('should invoke API with error', function* () { - mm(client, '_opaque', 1); + it('should invoke API with error', function* () { + mm(client, '_opaque', 1); + + setTimeout(() => { + client.messenger.emit('mock_invoke_response', { + opaque: 1, + success: false, + errorMessage: 'mock error', + }); + }, 100); + + try { + yield client.getData('123'); + throw new Error('should not run here'); + } catch (err) { + err.message.should.equal('mock error'); + } + }); - setTimeout(() => { - client.messenger.emit('mock_invoke_response', { - opaque: 1, - success: false, - errorMessage: 'mock error', - }); - }, 100); - - try { - yield client.getData('123'); - throw new Error('should not run here'); - } catch (err) { - err.message.should.equal('mock error'); - } - }); + it('should throw timeout error', function* () { + mm(client, '_opaque', 1); + try { + yield client.getData('123'); - it('should throw timeout error', function* () { - mm(client, '_opaque', 1); - try { - yield client.getData('123'); + throw new Error('should not run here'); + } catch (err) { + err.name.should.equal('AgentWorkerRequestTimeoutError'); + err.message.should.equal('Agent worker no response in 3000ms, AppWorkerClient:mock invoke getData with req#1'); + } + }); - throw new Error('should not run here'); - } catch (err) { - err.name.should.equal('AgentWorkerRequestTimeoutError'); - err.message.should.equal('Agent worker no response in 3000ms, AppWorkerClient:mock invoke getData with req#1'); - } + it('should emit agent_restart when agent worker restart', done => { + client.on('agent_restart', done); + client.messenger.emit('agent-start'); + }); }); - it('should emit agent_restart when agent worker restart', done => { - client.on('agent_restart', done); - client.messenger.emit('agent-start'); - }); + describe('apps/agent-client-app', () => { + let app; + before(function* () { + app = utils.cluster('apps/agent-client-app'); + yield app.ready(); + }); + + beforeEach(mm.restore); + + after(() => { + app.close(); + }); + + it('should subscribe second time ok', done => { + request(app.callback()) + .get('/') + .expect(200, (err, res) => { + should.not.exist(err); + res.body.should.eql({ + 'mock-data': 'mock-data', + 'app-mock-data': 'mock-data', + }); + done(); + }); + }); + it('should subscribe not exist id second time ok', done => { + request(app.callback()) + .get('/not-exist') + .expect(200, (err, res) => { + should.not.exist(err); + res.body.should.have.property('not-exist-data'); + res.body.should.have.property('app-not-exist-data'); + res.body.should.eql({ + 'not-exist-data': null, + 'app-not-exist-data': null, + }); + done(); + }); + }); + }); });