Skip to content

Commit

Permalink
feat: Upgrade Redis 3 to 4 for LiveQuery (parse-community#8333)
Browse files Browse the repository at this point in the history
  • Loading branch information
dblythy committed Feb 15, 2023
1 parent 3dd7f15 commit be248c0
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 40 deletions.
2 changes: 1 addition & 1 deletion spec/ParseGraphQLServer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ describe('ParseGraphQLServer', () => {
const expressApp = express();
httpServer = http.createServer(expressApp);
expressApp.use('/parse', parseServer.app);
parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {
parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {
port: 1338,
});
parseGraphQLServer.applyGraphQL(expressApp);
Expand Down
56 changes: 56 additions & 0 deletions spec/ParseLiveQueryRedis.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') {
describe('ParseLiveQuery redis', () => {
afterEach(async () => {
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
client.close();
});
it('can connect', async () => {
await reconfigureServer({
startLiveQueryServer: true,
liveQuery: {
classNames: ['TestObject'],
redisURL: 'redis://localhost:6379',
},
liveQueryServerOptions: {
redisURL: 'redis://localhost:6379',
},
});
const subscription = await new Parse.Query('TestObject').subscribe();
const [object] = await Promise.all([
new Parse.Object('TestObject').save(),
new Promise(resolve =>
subscription.on('create', () => {
resolve();
})
),
]);
await Promise.all([
new Promise(resolve =>
subscription.on('delete', () => {
resolve();
})
),
object.destroy(),
]);
});

it('can call connect twice', async () => {
const server = await reconfigureServer({
startLiveQueryServer: true,
liveQuery: {
classNames: ['TestObject'],
redisURL: 'redis://localhost:6379',
},
liveQueryServerOptions: {
redisURL: 'redis://localhost:6379',
},
});
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
await server.config.liveQueryController.connect();
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
await server.liveQueryServer.connect();
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
});
});
}
18 changes: 9 additions & 9 deletions spec/ParseLiveQueryServer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,29 @@ describe('ParseLiveQueryServer', function () {
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
});

it('can be initialized from ParseServer', function () {
it('can be initialized from ParseServer', async () => {
const httpServer = {};
const parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {});
const parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {});

expect(parseLiveQueryServer.clientId).toBeUndefined();
expect(parseLiveQueryServer.clients.size).toBe(0);
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
});

it('can be initialized from ParseServer without httpServer', function (done) {
const parseLiveQueryServer = ParseServer.createLiveQueryServer(undefined, {
it('can be initialized from ParseServer without httpServer', async () => {
const parseLiveQueryServer = await ParseServer.createLiveQueryServer(undefined, {
port: 22345,
});

expect(parseLiveQueryServer.clientId).toBeUndefined();
expect(parseLiveQueryServer.clients.size).toBe(0);
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
parseLiveQueryServer.server.close(done);
await new Promise(resolve => parseLiveQueryServer.server.close(resolve));
});

describe_only_db('mongo')('initialization', () => {
it('can be initialized through ParseServer without liveQueryServerOptions', function (done) {
const parseServer = ParseServer.start({
it('can be initialized through ParseServer without liveQueryServerOptions', async function (done) {
const parseServer = await ParseServer.start({
appId: 'hello',
masterKey: 'world',
port: 22345,
Expand All @@ -137,8 +137,8 @@ describe('ParseLiveQueryServer', function () {
});
});

it('can be initialized through ParseServer with liveQueryServerOptions', function (done) {
const parseServer = ParseServer.start({
it('can be initialized through ParseServer with liveQueryServerOptions', async function (done) {
const parseServer = await ParseServer.start({
appId: 'hello',
masterKey: 'world',
port: 22346,
Expand Down
6 changes: 4 additions & 2 deletions spec/RedisPubSub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ describe('RedisPubSub', function () {
});

const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true,
no_ready_check: true,
});
Expand All @@ -28,7 +29,8 @@ describe('RedisPubSub', function () {
});

const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true,
no_ready_check: true,
});
Expand Down
24 changes: 13 additions & 11 deletions spec/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,19 @@ const reconfigureServer = (changedConfiguration = {}) => {
port,
});
cache.clear();
parseServer = ParseServer.start(newConfiguration);
parseServer.expressApp.use('/1', err => {
console.error(err);
fail('should not call next');
});
server = parseServer.server;
server.on('connection', connection => {
const key = `${connection.remoteAddress}:${connection.remotePort}`;
openConnections[key] = connection;
connection.on('close', () => {
delete openConnections[key];
ParseServer.start(newConfiguration).then(_parseServer => {
parseServer = _parseServer;
parseServer.expressApp.use('/1', err => {
console.error(err);
fail('should not call next');
});
server = parseServer.server;
server.on('connection', connection => {
const key = `${connection.remoteAddress}:${connection.remotePort}`;
openConnections[key] = connection;
connection.on('close', () => {
delete openConnections[key];
});
});
});
} catch (error) {
Expand Down
4 changes: 2 additions & 2 deletions src/Adapters/PubSub/RedisPubSub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { createClient } from 'redis';

function createPublisher({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions);
return createClient({ url: redisURL, ...redisOptions });
}

function createSubscriber({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions);
return createClient({ url: redisURL, ...redisOptions });
}

const RedisPubSub = {
Expand Down
4 changes: 4 additions & 0 deletions src/Controllers/LiveQueryController.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ export class LiveQueryController {
this.liveQueryPublisher = new ParseCloudCodePublisher(config);
}

connect() {
return this.liveQueryPublisher.connect();
}

onAfterSave(
className: string,
currentObject: any,
Expand Down
9 changes: 9 additions & 0 deletions src/LiveQuery/ParseCloudCodePublisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ class ParseCloudCodePublisher {
this.parsePublisher = ParsePubSub.createPublisher(config);
}

async connect() {
if (typeof this.parsePublisher.connect === 'function') {
if (this.parsePublisher.isOpen) {
return;
}
return Promise.resolve(this.parsePublisher.connect());
}
}

onCloudCodeAfterSave(request: any): void {
this._onCloudCodeMessage(Parse.applicationId + 'afterSave', request);
}
Expand Down
33 changes: 24 additions & 9 deletions src/LiveQuery/ParseLiveQueryServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,25 @@ class ParseLiveQueryServer {
parseWebsocket => this._onConnect(parseWebsocket),
config
);

// Initialize subscriber
this.subscriber = ParsePubSub.createSubscriber(config);
this.subscriber.subscribe(Parse.applicationId + 'afterSave');
this.subscriber.subscribe(Parse.applicationId + 'afterDelete');
this.subscriber.subscribe(Parse.applicationId + 'clearCache');
// Register message handler for subscriber. When publisher get messages, it will publish message
// to the subscribers and the handler will be called.
this.subscriber.on('message', (channel, messageStr) => {
if (!this.subscriber.connect) {
this.connect();
}
}

async connect() {
if (this.subscriber.isOpen) {
return;
}
if (typeof this.subscriber.connect === 'function') {
await Promise.resolve(this.subscriber.connect());
} else {
this.subscriber.isOpen = true;
}
this._createSubscribers();
}
_createSubscribers() {
const messageRecieved = (channel, messageStr) => {
logger.verbose('Subscribe message %j', messageStr);
let message;
try {
Expand All @@ -102,7 +112,12 @@ class ParseLiveQueryServer {
} else {
logger.error('Get message %s from unknown channel %j', message, channel);
}
});
};
this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr));
for (const field of ['afterSave', 'afterDelete', 'clearCache']) {
const channel = `${Parse.applicationId}${field}`;
this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr));
}
}

// Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes.
Expand Down
20 changes: 14 additions & 6 deletions src/ParseServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ class ParseServer {

const allControllers = controllers.getControllers(options);

const { loggerController, databaseController, hooksController } = allControllers;
const {
loggerController,
databaseController,
hooksController,
liveQueryController,
} = allControllers;
this.config = Config.put(Object.assign({}, options, allControllers));

logging.setLogger(loggerController);
Expand All @@ -98,6 +103,7 @@ class ParseServer {
) {
startupPromises.push(options.cacheAdapter.connect());
}
startupPromises.push(liveQueryController.connect());
await Promise.all(startupPromises);
if (serverStartComplete) {
serverStartComplete();
Expand Down Expand Up @@ -263,7 +269,7 @@ class ParseServer {
* @param {Function} callback called when the server has started
* @returns {ParseServer} the parse server instance
*/
start(options: ParseServerOptions, callback: ?() => void) {
async start(options: ParseServerOptions, callback: ?() => void) {
const app = express();
if (options.middleware) {
let middleware;
Expand Down Expand Up @@ -307,7 +313,7 @@ class ParseServer {
this.server = server;

if (options.startLiveQueryServer || options.liveQueryServerOptions) {
this.liveQueryServer = ParseServer.createLiveQueryServer(
this.liveQueryServer = await ParseServer.createLiveQueryServer(
server,
options.liveQueryServerOptions,
options
Expand Down Expand Up @@ -341,9 +347,9 @@ class ParseServer {
* @param {Server} httpServer an optional http server to pass
* @param {LiveQueryServerOptions} config options for the liveQueryServer
* @param {ParseServerOptions} options options for the ParseServer
* @returns {ParseLiveQueryServer} the live query server instance
* @returns {Promise<ParseLiveQueryServer>} the live query server instance
*/
static createLiveQueryServer(
static async createLiveQueryServer(
httpServer,
config: LiveQueryServerOptions,
options: ParseServerOptions
Expand All @@ -353,7 +359,9 @@ class ParseServer {
httpServer = require('http').createServer(app);
httpServer.listen(config.port);
}
return new ParseLiveQueryServer(httpServer, config, options);
const server = new ParseLiveQueryServer(httpServer, config, options);
await server.connect();
return server;
}

static verifyServerUrl(callback) {
Expand Down

0 comments on commit be248c0

Please sign in to comment.