Skip to content

feat: Upgrade Redis 3 to 4 for LiveQuery #8333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 26, 2022
37 changes: 37 additions & 0 deletions spec/ParseLiveQueryRedis.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
if (process.env.PARSE_SERVER_TEST_CACHE === 'redis') {
describe('ParseLiveQuery redis', () => {
afterEach(async () => {
const client = await Parse.CoreManager.getLiveQueryController().getDefaultLiveQueryClient();
client.close();
});
it('can connect', async () => {
await reconfigureServer({
startLiveQueryServer: true,
liveQuery: {
classNames: ['TestObject'],
redisURL: 'redis://localhost:6379',
},
liveQueryServerOptions: {
redisURL: 'redis://localhost:6379',
},
});
const subscription = await new Parse.Query('TestObject').subscribe();
const [, object] = await Promise.all([
new Promise(resolve =>
subscription.on('create', () => {
resolve();
})
),
new Parse.Object('TestObject').save(),
]);
await Promise.all([
new Promise(resolve =>
subscription.on('delete', () => {
resolve();
})
),
object.destroy(),
]);
});
});
}
6 changes: 4 additions & 2 deletions spec/RedisPubSub.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ describe('RedisPubSub', function () {
});

const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true,
no_ready_check: true,
});
Expand All @@ -28,7 +29,8 @@ describe('RedisPubSub', function () {
});

const redis = require('redis');
expect(redis.createClient).toHaveBeenCalledWith('redisAddress', {
expect(redis.createClient).toHaveBeenCalledWith({
url: 'redisAddress',
socket_keepalive: true,
no_ready_check: true,
});
Expand Down
4 changes: 2 additions & 2 deletions src/Adapters/PubSub/RedisPubSub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { createClient } from 'redis';

function createPublisher({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions);
return createClient({ url: redisURL, ...redisOptions });
}

function createSubscriber({ redisURL, redisOptions = {} }): any {
redisOptions.no_ready_check = true;
return createClient(redisURL, redisOptions);
return createClient({ url: redisURL, ...redisOptions });
}

const RedisPubSub = {
Expand Down
7 changes: 6 additions & 1 deletion src/LiveQuery/ParseCloudCodePublisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ class ParseCloudCodePublisher {
// config object of the publisher, right now it only contains the redisURL,
// but we may extend it later.
constructor(config: any = {}) {
this.parsePublisher = ParsePubSub.createPublisher(config);
(async () => {
this.parsePublisher = ParsePubSub.createPublisher(config);
if (typeof this.parsePublisher.connect === 'function') {
await Promise.resolve(this.parsePublisher.connect());
}
})();
}

onCloudCodeAfterSave(request: any): void {
Expand Down
57 changes: 31 additions & 26 deletions src/LiveQuery/ParseLiveQueryServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,34 +75,39 @@ class ParseLiveQueryServer {
);

// Initialize subscriber
this.subscriber = ParsePubSub.createSubscriber(config);
this.subscriber.subscribe(Parse.applicationId + 'afterSave');
this.subscriber.subscribe(Parse.applicationId + 'afterDelete');
this.subscriber.subscribe(Parse.applicationId + 'clearCache');
// Register message handler for subscriber. When publisher get messages, it will publish message
// to the subscribers and the handler will be called.
this.subscriber.on('message', (channel, messageStr) => {
logger.verbose('Subscribe message %j', messageStr);
let message;
try {
message = JSON.parse(messageStr);
} catch (e) {
logger.error('unable to parse message', messageStr, e);
return;
}
if (channel === Parse.applicationId + 'clearCache') {
this._clearCachedRoles(message.userId);
return;
(async () => {
this.subscriber = ParsePubSub.createSubscriber(config);
if (typeof this.subscriber.connect === 'function') {
await Promise.resolve(this.subscriber.connect());
}
this._inflateParseObject(message);
if (channel === Parse.applicationId + 'afterSave') {
this._onAfterSave(message);
} else if (channel === Parse.applicationId + 'afterDelete') {
this._onAfterDelete(message);
} else {
logger.error('Get message %s from unknown channel %j', message, channel);
const messageRecieved = (channel, messageStr) => {
logger.verbose('Subscribe message %j', messageStr);
let message;
try {
message = JSON.parse(messageStr);
} catch (e) {
logger.error('unable to parse message', messageStr, e);
return;
}
if (channel === Parse.applicationId + 'clearCache') {
this._clearCachedRoles(message.userId);
return;
}
this._inflateParseObject(message);
if (channel === Parse.applicationId + 'afterSave') {
this._onAfterSave(message);
} else if (channel === Parse.applicationId + 'afterDelete') {
this._onAfterDelete(message);
} else {
logger.error('Get message %s from unknown channel %j', message, channel);
}
};
this.subscriber.on('message', (channel, messageStr) => messageRecieved(channel, messageStr));
for (const field of ['afterSave', 'afterDelete', 'clearCache']) {
const channel = `${Parse.applicationId}${field}`;
this.subscriber.subscribe(channel, messageStr => messageRecieved(channel, messageStr));
}
});
})();
}

// Message is the JSON object from publisher. Message.currentParseObject is the ParseObject JSON after changes.
Expand Down