Skip to content

Commit

Permalink
Merge 2c2fafb into 9af9115
Browse files Browse the repository at this point in the history
  • Loading branch information
dblythy authored Nov 10, 2022
2 parents 9af9115 + 2c2fafb commit eb8d9e6
Show file tree
Hide file tree
Showing 7 changed files with 444 additions and 399 deletions.
648 changes: 346 additions & 302 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"pg-monitor": "1.5.0",
"pg-promise": "10.12.1",
"pluralize": "8.0.0",
"redis": "3.1.2",
"redis": "4.0.6",
"semver": "7.3.8",
"subscriptions-transport-ws": "0.11.0",
"tv4": "1.3.0",
Expand Down
29 changes: 29 additions & 0 deletions spec/DefinedSchemas.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -677,4 +677,33 @@ describe('DefinedSchemas', () => {
expect(testSchema.classLevelPermissions.create).toEqual({ requiresAuthentication: true });
expect(logger.error).toHaveBeenCalledTimes(0);
});

it('should not affect cacheAdapter', async () => {
const server = await reconfigureServer();
const logger = require('../lib/logger').logger;
spyOn(logger, 'error').and.callThrough();
const migrationOptions = {
definitions: [
{
className: 'Test',
fields: { aField: { type: 'String' } },
indexes: { aField: { aField: 1 } },
classLevelPermissions: {
create: { requiresAuthentication: true },
},
},
],
};

const cacheAdapter = {
get: () => Promise.resolve(null),
put: () => {},
del: () => {},
clear: () => {},
connect: jasmine.createSpy('clear'),
};
server.config.cacheAdapter = cacheAdapter;
await new DefinedSchemas(migrationOptions, server.config).execute();
expect(cacheAdapter.connect).not.toHaveBeenCalled();
});
});
29 changes: 15 additions & 14 deletions spec/RedisCacheAdapter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,22 @@ describe_only(() => {

beforeEach(async () => {
cache = new RedisCacheAdapter(null, 100);
await cache.connect();
await cache.clear();
});

it('should get/set/clear', done => {
it('should get/set/clear', async () => {
const cacheNaN = new RedisCacheAdapter({
ttl: NaN,
});

cacheNaN
.put(KEY, VALUE)
.then(() => cacheNaN.get(KEY))
.then(value => expect(value).toEqual(VALUE))
.then(() => cacheNaN.clear())
.then(() => cacheNaN.get(KEY))
.then(value => expect(value).toEqual(null))
.then(() => cacheNaN.clear())
.then(done);
await cacheNaN.connect();
await cacheNaN.put(KEY, VALUE);
let value = await cacheNaN.get(KEY);
expect(value).toEqual(VALUE);
await cacheNaN.clear();
value = await cacheNaN.get(KEY);
expect(value).toEqual(null);
await cacheNaN.clear();
});

it('should expire after ttl', done => {
Expand Down Expand Up @@ -100,7 +99,7 @@ describe_only(() => {
it('handleShutdown, close connection', async () => {
await cache.handleShutdown();
setTimeout(() => {
expect(cache.client.connected).toBe(false);
expect(cache.client.isOpen).toBe(false);
}, 0);
});
});
Expand All @@ -122,8 +121,9 @@ describe_only(() => {
return Object.keys(cache.queue.queue).length;
}

it('it should clear completed operations from queue', done => {
it('it should clear completed operations from queue', async done => {
const cache = new RedisCacheAdapter({ ttl: NaN });
await cache.connect();

// execute a bunch of operations in sequence
let promise = Promise.resolve();
Expand All @@ -144,8 +144,9 @@ describe_only(() => {
promise.then(() => expect(getQueueCount(cache)).toEqual(0)).then(done);
});

it('it should count per key chained operations correctly', done => {
it('it should count per key chained operations correctly', async done => {
const cache = new RedisCacheAdapter({ ttl: NaN });
await cache.connect();

let key1Promise = Promise.resolve();
let key2Promise = Promise.resolve();
Expand Down
118 changes: 40 additions & 78 deletions src/Adapters/Cache/RedisCacheAdapter.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import redis from 'redis';
import { createClient } from 'redis';
import logger from '../../logger';
import { KeyPromiseQueue } from '../../KeyPromiseQueue';

Expand All @@ -15,114 +15,76 @@ const isValidTTL = ttl => typeof ttl === 'number' && ttl > 0;
export class RedisCacheAdapter {
constructor(redisCtx, ttl = DEFAULT_REDIS_TTL) {
this.ttl = isValidTTL(ttl) ? ttl : DEFAULT_REDIS_TTL;
this.client = redis.createClient(redisCtx);
this.client = createClient(redisCtx);
this.queue = new KeyPromiseQueue();
}

handleShutdown() {
async connect() {
if (this.client.isOpen) {
return;
}
return this.client.connect();
}

async handleShutdown() {
if (!this.client) {
return Promise.resolve();
return;
}
try {
await this.client.quit();
} catch (err) {
logger.error('RedisCacheAdapter error on shutdown', { error: err });
}
return new Promise(resolve => {
this.client.quit(err => {
if (err) {
logger.error('RedisCacheAdapter error on shutdown', { error: err });
}
resolve();
});
});
}

get(key) {
async get(key) {
debug('get', { key });
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.get(key, function (err, res) {
debug('-> get', { key, res });
if (!res) {
return resolve(null);
}
resolve(JSON.parse(res));
});
})
);
try {
await this.queue.enqueue(key);
const res = await this.client.get(key);
if (!res) {
return null;
}
return JSON.parse(res);
} catch (err) {
logger.error('RedisCacheAdapter error on get', { error: err });
}
}

put(key, value, ttl = this.ttl) {
async put(key, value, ttl = this.ttl) {
value = JSON.stringify(value);
debug('put', { key, value, ttl });

await this.queue.enqueue(key);
if (ttl === 0) {
// ttl of zero is a logical no-op, but redis cannot set expire time of zero
return this.queue.enqueue(key, () => Promise.resolve());
return;
}

if (ttl === Infinity) {
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.set(key, value, function () {
resolve();
});
})
);
return this.client.set(key, value);
}

if (!isValidTTL(ttl)) {
ttl = this.ttl;
}

return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.psetex(key, ttl, value, function () {
resolve();
});
})
);
return this.client.set(key, value, { PX: ttl });
}

del(key) {
async del(key) {
debug('del', { key });
return this.queue.enqueue(
key,
() =>
new Promise(resolve => {
this.client.del(key, function () {
resolve();
});
})
);
await this.queue.enqueue(key);
return this.client.del(key);
}

clear() {
async clear() {
debug('clear');
return this.queue.enqueue(
FLUSH_DB_KEY,
() =>
new Promise(resolve => {
this.client.flushdb(function () {
resolve();
});
})
);
await this.queue.enqueue(FLUSH_DB_KEY);
return this.client.sendCommand(['FLUSHDB']);
}

// Used for testing
async getAllKeys() {
return new Promise((resolve, reject) => {
this.client.keys('*', (err, keys) => {
if (err) {
reject(err);
} else {
resolve(keys);
}
});
});
getAllKeys() {
return this.client.keys('*');
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Adapters/PubSub/RedisPubSub.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import redis from 'redis';
import { createClient } from 'redis';

function createPublisher({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return redis.createClient(redisURL, redisOptions);
return createClient(redisURL, redisOptions);
}

function createSubscriber({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return redis.createClient(redisURL, redisOptions);
return createClient(redisURL, redisOptions);
}

const RedisPubSub = {
Expand Down
11 changes: 10 additions & 1 deletion src/ParseServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,18 @@ class ParseServer {
.performInitialization()
.then(() => hooksController.load())
.then(async () => {
const startupPromises = [];
if (schema) {
await new DefinedSchemas(schema, this.config).execute();
startupPromises.push(new DefinedSchemas(schema, this.config).execute());
}
if (
options.cacheAdapter &&
options.cacheAdapter.connect &&
typeof options.cacheAdapter.connect === 'function'
) {
startupPromises.push(options.cacheAdapter.connect());
}
await Promise.all(startupPromises);
if (serverStartComplete) {
serverStartComplete();
}
Expand Down

0 comments on commit eb8d9e6

Please sign in to comment.