Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Upgrade Redis 3 to 4 for LiveQuery #8333

Merged
merged 13 commits into from
Nov 26, 2022
2 changes: 1 addition & 1 deletion spec/ParseGraphQLServer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ describe('ParseGraphQLServer', () => {
const expressApp = express();
httpServer = http.createServer(expressApp);
expressApp.use('/parse', parseServer.app);
parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {
parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {
port: 1338,
});
parseGraphQLServer.applyGraphQL(expressApp);
Expand Down
56 changes: 56 additions & 0 deletions spec/ParseLiveQueryRedis.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') {
describe('ParseLiveQuery redis', () => {
afterEach(async () => {
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
client.close();
});
it('can connect', async () => {
await reconfigureServer({
startLiveQueryServer: true,
liveQuery: {
classNames: ['TestObject'],
redisURL: 'redis://localhost:6379',
},
liveQueryServerOptions: {
redisURL: 'redis://localhost:6379',
},
});
const subscription = await new Parse.Query('TestObject').subscribe();
const [, object] = await Promise.all([
dblythy marked this conversation as resolved.
Show resolved Hide resolved
new Promise(resolve =>
subscription.on('create', () => {
resolve();
})
),
new Parse.Object('TestObject').save(),
]);
await Promise.all([
new Promise(resolve =>
subscription.on('delete', () => {
resolve();
})
),
object.destroy(),
]);
});

it('can call connect twice', async () => {
const server = await reconfigureServer({
startLiveQueryServer: true,
liveQuery: {
classNames: ['TestObject'],
redisURL: 'redis://localhost:6379',
},
liveQueryServerOptions: {
redisURL: 'redis://localhost:6379',
},
});
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
await server.config.liveQueryController.connect();
expect(server.config.liveQueryController.liveQueryPublisher.parsePublisher.isOpen).toBeTrue();
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
await server.liveQueryServer.connect();
expect(server.liveQueryServer.subscriber.isOpen).toBe(true);
});
});
}
18 changes: 9 additions & 9 deletions spec/ParseLiveQueryServer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,29 @@ describe('ParseLiveQueryServer', function () {
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
});

it('can be initialized from ParseServer', function () {
it('can be initialized from ParseServer', async () => {
const httpServer = {};
const parseLiveQueryServer = ParseServer.createLiveQueryServer(httpServer, {});
const parseLiveQueryServer = await ParseServer.createLiveQueryServer(httpServer, {});

expect(parseLiveQueryServer.clientId).toBeUndefined();
expect(parseLiveQueryServer.clients.size).toBe(0);
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
});

it('can be initialized from ParseServer without httpServer', function (done) {
const parseLiveQueryServer = ParseServer.createLiveQueryServer(undefined, {
it('can be initialized from ParseServer without httpServer', async () => {
const parseLiveQueryServer = await ParseServer.createLiveQueryServer(undefined, {
port: 22345,
});

expect(parseLiveQueryServer.clientId).toBeUndefined();
expect(parseLiveQueryServer.clients.size).toBe(0);
expect(parseLiveQueryServer.subscriptions.size).toBe(0);
parseLiveQueryServer.server.close(done);
await new Promise(resolve => parseLiveQueryServer.server.close(resolve));
});

describe_only_db('mongo')('initialization', () => {
it('can be initialized through ParseServer without liveQueryServerOptions', function (done) {
const parseServer = ParseServer.start({
it('can be initialized through ParseServer without liveQueryServerOptions', async function (done) {
const parseServer = await ParseServer.start({
appId: 'hello',
masterKey: 'world',
port: 22345,
Expand All @@ -137,8 +137,8 @@ describe('ParseLiveQueryServer', function () {
});
});

it('can be initialized through ParseServer with liveQueryServerOptions', function (done) {
const parseServer = ParseServer.start({
it('can be initialized through ParseServer with liveQueryServerOptions', async function (done) {
const parseServer = await ParseServer.start({
appId: 'hello',
masterKey: 'world',
port: 22346,
Expand Down
6 changes: 4 additions & 2 deletions spec/RedisPubSub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ describe('RedisPubSub', function () {
});

const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true,
no_ready_check: true,
});
Expand All @@ -28,7 +29,8 @@ describe('RedisPubSub', function () {
});

const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true,
no_ready_check: true,
});
Expand Down
24 changes: 13 additions & 11 deletions spec/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,19 @@ const reconfigureServer = (changedConfiguration = {}) => {
port,
});
cache.clear();
parseServer = ParseServer.start(newConfiguration);
parseServer.expressApp.use('/1', err => {
console.error(err);
fail('should not call next');
});
server = parseServer.server;
server.on('connection', connection => {
const key = `${connection.remoteAddress}:${connection.remotePort}`;
openConnections[key] = connection;
connection.on('close', () => {
delete openConnections[key];
ParseServer.start(newConfiguration).then(_parseServer => {
parseServer = _parseServer;
parseServer.expressApp.use('/1', err => {
console.error(err);
fail('should not call next');
});
server = parseServer.server;
server.on('connection', connection => {
const key = `${connection.remoteAddress}:${connection.remotePort}`;
openConnections[key] = connection;
connection.on('close', () => {
delete openConnections[key];
});
});
});
} catch (error) {
Expand Down
4 changes: 2 additions & 2 deletions src/Adapters/PubSub/RedisPubSub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { createClient } from 'redis';

function createPublisher({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions);
return createClient({ url: redisURL, ...redisOptions });
}

function createSubscriber({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions);
return createClient({ url: redisURL, ...redisOptions });
}

const RedisPubSub = {
Expand Down
4 changes: 4 additions & 0 deletions src/Controllers/LiveQueryController.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ export class LiveQueryController {
this.liveQueryPublisher = new ParseCloudCodePublisher(config);
}

connect() {
return this.liveQueryPublisher.connect();
}

onAfterSave(
className: string,
currentObject: any,
Expand Down
9 changes: 9 additions & 0 deletions src/LiveQuery/ParseCloudCodePublisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ class ParseCloudCodePublisher {
this.parsePublisher = ParsePubSub.createPublisher(config);
}

async connect() {
if (typeof this.parsePublisher.connect === 'function') {
if (this.parsePublisher.isOpen) {
return;
}
return Promise.resolve(this.parsePublisher.connect());
}
}

onCloudCodeAfterSave(request: any): void {
this._onCloudCodeMessage(Parse.applicationId + 'afterSave', request);
}
Expand Down
33 changes: 24 additions & 9 deletions src/LiveQuery/ParseLiveQueryServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,25 @@ class ParseLiveQueryServer {
parseWebsocket => this._onConnect(parseWebsocket),
config
);

// Initialize subscriber
this.subscriber = ParsePubSub.createSubscriber(config);
this.subscriber.subscribe(Parse.applicationId + 'afterSave');
this.subscriber.subscribe(Parse.applicationId + 'afterDelete');
this.subscriber.subscribe(Parse.applicationId + 'clearCache');
// Register message handler for subscriber. When publisher get messages, it will publish message
// to the subscribers and the handler will be called.
this.subscriber.on('message', (channel, messageStr) => {
if (!this.subscriber.connect) {
this.connect();
}
}

async connect() {
if (this.subscriber.isOpen) {
return;
}
if (typeof this.subscriber.connect === 'function') {
await Promise.resolve(this.subscriber.connect());
} else {
this.subscriber.isOpen = true;
}
this._createSubscribers();
}
_createSubscribers() {
const messageRecieved = (channel, messageStr) => {
logger.verbose('Subscribe message %j', messageStr);
let message;
try {
Expand All @@ -102,7 +112,12 @@ class ParseLiveQueryServer {
} else {
logger.error('Get message %s from unknown channel %j', message, channel);
}
});
};
this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr));
for (const field of ['afterSave', 'afterDelete', 'clearCache']) {
const channel = `${Parse.applicationId}${field}`;
this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr));
}
}

// Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes.
Expand Down
20 changes: 14 additions & 6 deletions src/ParseServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ class ParseServer {

const allControllers = controllers.getControllers(options);

const { loggerController, databaseController, hooksController } = allControllers;
const {
loggerController,
databaseController,
hooksController,
liveQueryController,
} = allControllers;
this.config = Config.put(Object.assign({}, options, allControllers));

logging.setLogger(loggerController);
Expand All @@ -98,6 +103,7 @@ class ParseServer {
) {
startupPromises.push(options.cacheAdapter.connect());
}
startupPromises.push(liveQueryController.connect());
await Promise.all(startupPromises);
if (serverStartComplete) {
serverStartComplete();
Expand Down Expand Up @@ -263,7 +269,7 @@ class ParseServer {
* @param {Function} callback called when the server has started
* @returns {ParseServer} the parse server instance
*/
start(options: ParseServerOptions, callback: ?() => void) {
async start(options: ParseServerOptions, callback: ?() => void) {
const app = express();
if (options.middleware) {
let middleware;
Expand Down Expand Up @@ -307,7 +313,7 @@ class ParseServer {
this.server = server;

if (options.startLiveQueryServer || options.liveQueryServerOptions) {
this.liveQueryServer = ParseServer.createLiveQueryServer(
this.liveQueryServer = await ParseServer.createLiveQueryServer(
server,
options.liveQueryServerOptions,
options
Expand Down Expand Up @@ -338,9 +344,9 @@ class ParseServer {
* @param {Server} httpServer an optional http server to pass
* @param {LiveQueryServerOptions} config options for the liveQueryServer
* @param {ParseServerOptions} options options for the ParseServer
* @returns {ParseLiveQueryServer} the live query server instance
* @returns {Promise<ParseLiveQueryServer>} the live query server instance
*/
static createLiveQueryServer(
static async createLiveQueryServer(
httpServer,
config: LiveQueryServerOptions,
options: ParseServerOptions
Expand All @@ -350,7 +356,9 @@ class ParseServer {
httpServer = require('http').createServer(app);
httpServer.listen(config.port);
}
return new ParseLiveQueryServer(httpServer, config, options);
const server = new ParseLiveQueryServer(httpServer, config, options);
await server.connect();
return server;
}

static verifyServerUrl(callback) {
Expand Down