Skip to content

Commit

Permalink
🪲 FIX(NOTION-115): Re-subscribe when internet connection is lost and …
Browse files Browse the repository at this point in the history
…regained #33
  • Loading branch information
alexcastillo committed Feb 6, 2020
1 parent 4a87041 commit 43a8462
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 65 deletions.
40 changes: 20 additions & 20 deletions examples/node/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,27 @@ const notion2 = new Notion({
password: process.env.NEUROSITY_PASSWORD
});

const signalQuality = notion
.signalQuality()
.subscribe(signalQuality => {
console.log("signalQuality", signalQuality);
});

const signalQuality2 = notion
.signalQuality()
.subscribe(signalQuality => {
console.log("signalQuality", signalQuality);
});

const kinesis = notion2.kinesis("push").subscribe(kinesis => {
console.log("kinesis", kinesis);
});
const sub1 = notion.signalQuality().subscribe();
const sub2 = notion.calm().subscribe();
const sub3 = notion.focus().subscribe();
const sub4 = notion2.focus().subscribe();

console.log("subscribed to signalQuality");
console.log("subscribed to kinesis");
setTimeout(() => {
sub2.unsubscribe();
}, 2000);

setTimeout(() => {
signalQuality.unsubscribe();
console.log("unsubscribed from signalQuality");
}, 4000);
notion.goOffline();
setTimeout(() => {
sub1.unsubscribe();
}, 100);
}, 6000);

setTimeout(() => {
notion.goOnline();

setTimeout(() => {
sub3.unsubscribe();
}, 6000);
}, 10000);
})();
23 changes: 21 additions & 2 deletions src/Notion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { map } from "rxjs/operators";
import { ApiClient, credentialWithLink, createUser } from "./api/index";
import { getLabels, validate } from "./utils/subscription";
import { NotionOptions } from "./types/options";
import { Subscription } from "./types/subscription";
import { Training } from "./types/training";
import { SkillInstance } from "./types/skill";
import { Credentials } from "./types/credentials";
Expand All @@ -13,6 +12,10 @@ import { SignalQuality } from "./types/signalQuality";
import { Kinesis } from "./types/kinesis";
import { Calm } from "./types/calm";
import { Focus } from "./types/focus";
import {
PendingSubscription,
Subscription
} from "./types/subscriptions";
import {
BrainwavesLabel,
Epoch,
Expand Down Expand Up @@ -146,7 +149,7 @@ export class Notion {
* Not user facing
*/
protected getMetric = (
subscription: Subscription
subscription: PendingSubscription
): Observable<any> => {
const { metric, labels, atomic } = subscription;

Expand Down Expand Up @@ -551,6 +554,22 @@ export class Notion {
};
}

/**
* @internal
* Proof of Concept for disconnecting db
*/
public goOffline(): void {
this.api.goOffline();
}

/**
* @internal
* Proof of Concept for resuming db connection
*/
public goOnline(): void {
this.api.goOnline();
}

/**
* @internal
* Proof of Concept for Skills - Not user facing yet
Expand Down
58 changes: 38 additions & 20 deletions src/api/firebase/deviceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,16 @@ export interface IDevice {
/**
* @hidden
*/
export const createDeviceStore = (app, deviceId, SERVER_TIMESTAMP) => {
export const createDeviceStore = (
app,
deviceId,
SERVER_TIMESTAMP,
subscriptionManager
) => {
const deviceRef = app.database().ref(`devices/${deviceId}`);
const clientId = deviceRef.child("subscriptions").push().key;
const clientRef = deviceRef.child(`clients/${clientId}`);

// Add client connections to db and remove them when offline
app
.database()
.ref(".info/connected")
.on("value", snapshot => {
if (!snapshot.val()) {
return;
}

clientRef
.onDisconnect()
.remove()
.then(() => {
clientRef.set(SERVER_TIMESTAMP);
});
});

const child = namespace => {
return deviceRef.child(namespace);
};
Expand All @@ -43,7 +31,7 @@ export const createDeviceStore = (app, deviceId, SERVER_TIMESTAMP) => {
};

const update = (namespace, payload) => {
deviceRef.child(namespace).update(payload);
return deviceRef.child(namespace).update(payload);
};

const on = (eventType: any = "value", namespace, callback) => {
Expand Down Expand Up @@ -96,6 +84,36 @@ export const createDeviceStore = (app, deviceId, SERVER_TIMESTAMP) => {
return match || null;
};

// Add client connections and subscriptions to db and remove them when offline
app
.database()
.ref(".info/connected")
.on("value", snapshot => {
if (!snapshot.val()) {
return;
}

clientRef
.onDisconnect()
.remove()
.then(() => {
clientRef.set(SERVER_TIMESTAMP);

// NOTION-115: Re-subscribe when internet connection is lost and regained
update("subscriptions", subscriptionManager.get()).then(
() => {
subscriptionManager.toList().forEach(subscription => {
const childPath = `subscriptions/${subscription.id}`;
deviceRef
.child(childPath)
.onDisconnect()
.remove();
});
}
);
});
});

return {
set,
once,
Expand Down Expand Up @@ -173,7 +191,7 @@ export const createDeviceStore = (app, deviceId, SERVER_TIMESTAMP) => {

return subscriptionCreated;
},
unsubscribFromMetric: (subscription, listener: Function) => {
unsubscribeFromMetric: (subscription, listener: Function) => {
off("value", listener);
remove(`subscriptions/${subscription.id}`);
}
Expand Down
29 changes: 22 additions & 7 deletions src/api/firebase/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { User } from "@firebase/auth-types";

import { config } from "./config";
import { createDeviceStore } from "./deviceStore";
import { NotionOptions } from "../../types/options";
import { NotionOptions, NotionDependencies } from "../../types/options";
import { Credentials } from "../../types/credentials";

const SERVER_TIMESTAMP = firebase.database.ServerValue.TIMESTAMP;
Expand Down Expand Up @@ -36,17 +36,24 @@ export class FirebaseClient {
protected deviceStore;
public user: User | null;

constructor(options: NotionOptions) {
this.init(options);
constructor(
options: NotionOptions,
dependencies: NotionDependencies
) {
this.init(options, dependencies);
}

private init(options: NotionOptions) {
private init(
options: NotionOptions,
dependencies: NotionDependencies
) {
this.app = this.getApp(options.deviceId);
this.standalone = this.app.name === options.deviceId;
this.deviceStore = createDeviceStore(
this.app,
options.deviceId,
SERVER_TIMESTAMP
SERVER_TIMESTAMP,
dependencies.subscriptionManager
);

this.app.auth().onAuthStateChanged((user: User | null) => {
Expand Down Expand Up @@ -105,6 +112,14 @@ export class FirebaseClient {
return this.app.auth().signOut();
}

goOffline() {
this.app.database().goOffline();
}

goOnline() {
this.app.database().goOnline();
}

private getApp(deviceId: string) {
const notionAppName = `${deviceId}-${Date.now()}`;
const moduleApps = firebase.apps;
Expand Down Expand Up @@ -212,8 +227,8 @@ export class FirebaseClient {
* @param metric
* @param subscriptionId
*/
public unsubscribFromMetric(subscription, listener: Function): void {
this.deviceStore.unsubscribFromMetric(subscription, listener);
public unsubscribeFromMetric(subscription, listener: Function): void {
this.deviceStore.unsubscribeFromMetric(subscription, listener);
}

public get timestamp(): any {
Expand Down
20 changes: 18 additions & 2 deletions src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { metrics } from "@neurosity/ipk";
import { FirebaseClient } from "./firebase/index";
import { WebsocketClient } from "./websocket";
import { Timesync } from "../timesync";
import { SubscriptionManager } from "../subscriptions/SubscriptionManager";
import { Client } from "../types/client";
import { Actions } from "../types/actions";
import { Metrics } from "../types/metrics";
Expand All @@ -24,10 +25,14 @@ export class ApiClient implements Client {
protected firebase: FirebaseClient;
protected onDeviceSocket: WebsocketClient;
protected timesync: Timesync;
protected subscriptionManager: SubscriptionManager;

constructor(options: NotionOptions) {
this.options = options;
this.firebase = new FirebaseClient(options);
this.subscriptionManager = new SubscriptionManager();
this.firebase = new FirebaseClient(options, {
subscriptionManager: this.subscriptionManager
});

this.firebase.onAuthStateChanged().subscribe(user => {
this.user = user;
Expand Down Expand Up @@ -116,10 +121,13 @@ export class ApiClient implements Client {
...subscription,
serverType
});

this.subscriptionManager.add(subscriptionCreated);
return subscriptionCreated;
},
unsubscribe: (subscription, listener): void => {
this.firebase.unsubscribFromMetric(subscription, listener);
this.subscriptionManager.remove(subscription);
this.firebase.unsubscribeFromMetric(subscription, listener);
}
};
}
Expand All @@ -143,4 +151,12 @@ export class ApiClient implements Client {
public changeSettings(settings: ChangeSettings): Promise<void> {
return this.firebase.changeSettings(settings);
}

public goOffline() {
this.firebase.goOffline();
}

public goOnline() {
this.firebase.goOnline();
}
}
25 changes: 25 additions & 0 deletions src/subscriptions/SubscriptionManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Subscription, Subscriptions } from "../types/subscriptions";

export class SubscriptionManager {
private _subscriptions: Subscriptions = {};

public get(): Subscriptions {
return this._subscriptions;
}

public toList(): Subscription[] {
return Object.values(this._subscriptions);
}

public add(subscription: Subscription): void {
this._subscriptions[subscription.id] = subscription;
}

public remove(subscription: Subscription): void {
if (!(subscription.id in this._subscriptions)) {
return;
}

Reflect.deleteProperty(this._subscriptions, subscription.id);
}
}
6 changes: 3 additions & 3 deletions src/types/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Subscription } from "./subscription";
import { PendingSubscription, Subscription } from "./subscriptions";

/**
* @hidden
Expand All @@ -11,10 +11,10 @@ type SubscriptionListener = Function;
export interface Metrics {
next(metricName: string, metricValue: { [label: string]: any }): void;
on(
subscription: Subscription,
subscription: PendingSubscription,
callback: Function
): SubscriptionListener;
subscribe(subscription: Subscription): Subscription;
subscribe(subscription: PendingSubscription): Subscription;
unsubscribe(
subscription: Subscription,
listener: SubscriptionListener
Expand Down
8 changes: 8 additions & 0 deletions src/types/options.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Skill } from "./skill";
import { SubscriptionManager } from "../subscriptions/SubscriptionManager";

export interface NotionOptions {
deviceId: string;
Expand All @@ -12,3 +13,10 @@ export interface NotionOptions {
*/
onDeviceSocketUrl?: string;
}

/**
* @hidden
*/
export interface NotionDependencies {
subscriptionManager: SubscriptionManager;
}
11 changes: 0 additions & 11 deletions src/types/subscription.ts

This file was deleted.

Loading

0 comments on commit 43a8462

Please sign in to comment.