Skip to content

Commit

Permalink
Merge pull request #29 from tago-io/feat/queue-plugin
Browse files Browse the repository at this point in the history
Queue plugin
  • Loading branch information
matheuslbenachio authored Sep 30, 2022
2 parents 8f872ac + a3b2d41 commit 0f8f14c
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 46 deletions.
2 changes: 1 addition & 1 deletion packages/tcore-api/src/Plugins/Module/Module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class Module {
const values = moduleSettings?.values || {};

const conf = this.setup.configs || [];
const defs = conf?.filter((x) => "defaultValue" in x && x.defaultValue);
const defs = conf?.filter((x) => "defaultValue" in x && x.defaultValue !== "");
const flat = flattenConfigFields(defs);
const defsObject = {};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ describe("addDeviceDataByDevice", () => {
const arg1 = mock.mock.calls[0][0];
const arg2 = mock.mock.calls[0][1];
expect(arg1).toEqual(activeDevice.id);
expect(arg2.updated_at).toBeInstanceOf(Date);
expect(arg2.last_input).toBeInstanceOf(Date);
});

Expand Down
138 changes: 102 additions & 36 deletions packages/tcore-api/src/Services/DeviceData/DeviceData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { editDevice, getDeviceInfo } from "../Device";
import { addStatistic } from "../Statistic";
import { runPayloadParser } from "../PayloadParserCodeExecution";
import { emitToLiveInspector, getLiveInspectorID } from "../LiveInspector";
import { triggerHooks } from "../Plugins";
import { getMainQueueModule, triggerHooks } from "../Plugins";

const LIMIT_DATA_ON_MUTABLE = 50_000;
const MAXIMUM_MONTH_RANGE = 1.1;
Expand Down Expand Up @@ -168,54 +168,41 @@ function isImmutableTimeOutOfRange(time: Date, period: IDeviceChunkPeriod, reten
}

/**
* Adds data into a device by an actual device object.
* @param {IDevice} device Device object who sent the data.
* @param {any} data Data to be inserted.
* Validates the amount of mutable data points.
* Throws an error if something is wrong.
*/
export const addDeviceDataByDevice = async (device: IDevice, data: any, options?: IDeviceAddDataOptions) => {
if (!device || !device.active) {
throw new Error("Device not found or inactive");
}
if (!options) {
options = {};
}
if (!options.liveInspectorID) {
options.liveInspectorID = getLiveInspectorID(device);
}

if (Array.isArray(data)) {
options.rawPayload = data;
} else if (typeof data === "object") {
options.rawPayload = [data];
} else {
options.rawPayload = data;
}

async function validateMutableDataAmount(device: IDevice) {
if (device.type === "mutable") {
const amount = await getDeviceDataAmount(device.id);
if (amount >= LIMIT_DATA_ON_MUTABLE) {
throw new Error(`The device has reached the limit of ${LIMIT_DATA_ON_MUTABLE} data registers`);
}
}
}

let items: any = data;
items = await applyPayloadEncoder(device, items, options);
items = await runPayloadParser(device, items, options);
items = await z.array(zDeviceDataCreate).parseAsync([items].flat());

/**
* Validates the time range for immutable data points.
* Throws an error if something is wrong.
*/
async function validateImmutableTimeRange(device: IDevice, items: any) {
for (const item of items) {
if (device.type === "immutable" && device.chunk_period) {
const outOfRage = isImmutableTimeOutOfRange(item.time, device.chunk_period, device.chunk_retention || 0);

if (!outOfRage.isOk) {
const title = `Time must be between ${outOfRage.startDate} and ${outOfRage.endDate}`;
await emitToLiveInspector(device, { title, content: item }, options.liveInspectorID);
throw new Error(title);
}
}
}
}

await emitToLiveInspector(device, { title: "Raw Payload", content: data }, options.liveInspectorID);
/**
* Apply the Zod formatting capability on the data items, and also makes sure that
* their format respects the latest tago format.
*/
async function applyZodDeviceData(items: any) {
items = await z.array(zDeviceDataCreate).parseAsync([items].flat());

const group = generateResourceID().split("").reverse().join("");
const now = new Date();
Expand All @@ -228,34 +215,113 @@ export const addDeviceDataByDevice = async (device: IDevice, data: any, options?
delete item.serie;
}

return items;
}

/**
* Formats the timestamp for the chunks of the data.
*/
async function formatChunkTimestamp(device: IDevice, data: any) {
// map the items to insert into database
const dbInsertItems = items.map((x: IDeviceDataCreate) => {
return data.map((x: IDeviceDataCreate) => {
const chunkTimestamp = getChunkTimestamp(x.time as Date, device);
return {
...x,
chunk_timestamp_start: chunkTimestamp?.startDate,
chunk_timestamp_end: chunkTimestamp?.endDate,
};
});
}

/**
* Adds data into a device by an actual device object.
* @param {IDevice} device Device object who sent the data.
* @param {any} data Data to be inserted.
*/
export const addDeviceDataByDevice = async (device: IDevice, data: any, options?: IDeviceAddDataOptions) => {
if (!device || !device.active) {
throw new Error("Device not found or inactive");
}
if (!options) {
options = {};
}
if (!options.liveInspectorID) {
options.liveInspectorID = getLiveInspectorID(device);
}

if (Array.isArray(data)) {
options.rawPayload = data;
} else if (typeof data === "object") {
options.rawPayload = [data];
} else {
options.rawPayload = data;
}

await validateMutableDataAmount(device);

let items: any = data;
items = await applyPayloadEncoder(device, items, options);
items = await runPayloadParser(device, items, options);
items = await z.array(zDeviceDataCreate).parseAsync([items].flat());

await validateImmutableTimeRange(device, items);

await emitToLiveInspector(device, { title: "Raw Payload", content: data }, options.liveInspectorID);

items = await applyZodDeviceData(items);
items = await formatChunkTimestamp(device, items);

return await addDataToQueue(device, items);
};

/**
* Adds the data point directly into the database.
*/
async function addDataToDatabase(device: IDevice, data: any) {
let items: any = await z.array(zDeviceDataCreate).parseAsync([data].flat());
await validateMutableDataAmount(device);
await validateImmutableTimeRange(device, items);
items = await applyZodDeviceData(items);
items = await formatChunkTimestamp(device, items);

await invokeDatabaseFunction("addDeviceData", device.id, device.type, dbInsertItems);
await invokeDatabaseFunction("addDeviceData", device.id, device.type, items);

triggerActions(device.id, items).catch(() => null);
await addStatistic({ input: items.length });
await editDevice(device.id, { updated_at: now, last_input: now });
await editDevice(device.id, { last_input: new Date() });

triggerHooks("onAfterInsertDeviceData", device.id, data);
triggerHooks("onAfterInsertDeviceData", device.id, items);

return `${items.length} items added`;
};
}

/**
* Adds the data point to the queue.
*/
async function addDataToQueue(device: IDevice, data: any) {
const queue = await getMainQueueModule();

if (!queue) {
return await addDataToDatabase(device, data);
}

await queue.invoke("onAddDeviceData", device.id, data);

return `${data.length} items added to queue`;
}

/**
* Adds data into a device by device ID.
* @param {TGenericID} deviceID ID of the device who sent the data.
* @param {any} data Data to be inserted.
*/
export async function addDeviceData(deviceID: TGenericID, data: any) {
export async function addDeviceData(deviceID: TGenericID, data: any, options?: { forceDBInsert: boolean }) {
const device = await getDeviceInfo(deviceID);

if (options?.forceDBInsert) {
return await addDataToDatabase(device, data);
}

return await addDeviceDataByDevice(device, data);
}

Expand Down
21 changes: 21 additions & 0 deletions packages/tcore-api/src/Services/Plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,27 @@ export async function getMainDatabaseModule(): Promise<Module | null> {
return null;
}

/**
* Finds and returns the main queue plugin.
*/
export async function getMainQueueModule(): Promise<Module | null> {
const settings = await getMainSettings();
const [pluginID, moduleID] = String(settings.queue_plugin).split(":");

if (pluginID && moduleID) {
const plugin = plugins.get(pluginID);
if (plugin) {
const module = plugin.modules.get(moduleID);

if (module?.state === "started") {
return module;
}
}
}

return null;
}

/**
* TODO
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/tcore-api/src/Services/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export async function getMainSettings(): Promise<ISettings> {

const filesystem_plugin = process.env.TCORE_FILESYSTEM_PLUGIN || data.filesystem_plugin || "";
const database_plugin = process.env.TCORE_DATABASE_PLUGIN || data.database_plugin || "";
const queue_plugin = process.env.TCORE_QUEUE_PLUGIN || data.queue_plugin || "";
const settings_folder = process.env.TCORE_SETTINGS_FOLDER || folder;
const plugin_folder = process.env.TCORE_PLUGINS_FOLDER || data.plugin_folder || (await getPluginsFolder());
const port = process.env.TCORE_PORT || data.port || "8888";
Expand All @@ -84,6 +85,7 @@ export async function getMainSettings(): Promise<ISettings> {
const settings: ISettings = {
filesystem_plugin,
database_plugin,
queue_plugin,
plugin_folder,
port,
settings_folder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ function PluginEdit() {

const flattened = flattenConfigFields(module.configs || []);
for (const field of flattened) {
if ("defaultValue" in field && field.defaultValue) {
newValues[module.id][field.field] = field.defaultValue;
if ("defaultValue" in field) {
if (field.defaultValue === 0) {
newValues[module.id][field.field] = "0";
} else {
newValues[module.id][field.field] = field.defaultValue;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ interface IGeneralInformationTabProps {
function GeneralInformationTab(props: IGeneralInformationTabProps) {
const [resetting, setResetting] = useState(false);
const { data: databaseList } = useApiRequest<any[]>("/module?type=database");
const { data: queueList } = useApiRequest<any[]>("/module?type=queue");
const { data: filesystemList } = useApiRequest<any[]>("/module?type=filesystem");
const { data, metadata, errors } = props;

Expand All @@ -58,6 +59,17 @@ function GeneralInformationTab(props: IGeneralInformationTabProps) {
return options;
};

/**
* Gets a list of database options based on the backend list.
*/
const getQueueOptions = () => {
const options: ISelectOption[] = [{ label: "None", value: "" }];
for (const item of queueList || []) {
options.push({ label: item.setupName, value: `${item.pluginID}:${item.setupID}` });
}
return options;
};

/**
* Gets a list of filesystem options based on the backend list.
*/
Expand Down Expand Up @@ -158,6 +170,21 @@ function GeneralInformationTab(props: IGeneralInformationTabProps) {
/>
</FormGroup>

<FormGroup
tooltip={`The plugin that will be used as the main queue for ${getSystemName()}.`}
icon={EIcon.list}
label="Queue plugin"
>
<Select
onChange={(e) => props.onChange("queue_plugin", e.target.value)}
placeholder="Select the default queue plugin"
value={data.queue_plugin}
error={errors?.queue_plugin}
options={getQueueOptions()}
disabled={!queueList?.length}
/>
</FormGroup>

<FormGroup
tooltip={`The plugin that will be used as the main filesystem for ${getSystemName()}.`}
icon={EIcon.folder}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ function InstallLocalPluginButton() {
*/
const deactivateModalInstall = useCallback(() => {
setModalInstall(false);
window.location.reload();
}, []);

return (
Expand Down
13 changes: 9 additions & 4 deletions packages/tcore-sdk/src/Lib/Core/Core.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,15 @@ test("addDeviceData", async () => {
value: 10,
});
expect(response).toEqual(undefined);
expect(fn).toHaveBeenCalledWith("addDeviceData", "61261ef1f87480ff318b7bcb", {
variable: "temperature",
value: 10,
});
expect(fn).toHaveBeenCalledWith(
"addDeviceData",
"61261ef1f87480ff318b7bcb",
{
variable: "temperature",
value: 10,
},
undefined
);
});

test("getDeviceData works without query", async () => {
Expand Down
4 changes: 2 additions & 2 deletions packages/tcore-sdk/src/Lib/Core/Core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ class Core extends APIBridge {
/**
* Adds a data item into a device.
*/
public async addDeviceData(deviceID: TGenericID, data: any): Promise<void> {
await this.invokeApiMethod("addDeviceData", deviceID, data);
public async addDeviceData(deviceID: TGenericID, data: any, options?: { forceDBInsert: boolean }): Promise<void> {
await this.invokeApiMethod("addDeviceData", deviceID, data, options);
}

/**
Expand Down
21 changes: 21 additions & 0 deletions packages/tcore-sdk/src/Lib/QueueModule/QueueModule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { IDatabaseDeviceDataCreate, IModuleSetupWithoutType, TGenericID, TDeviceType } from "../../Types";
import TCoreModule from "../TCoreModule/TCoreModule";

/**
* This module allows the creation of a new queue plugin.
*/
class QueueModule extends TCoreModule {
constructor(protected setup: IModuleSetupWithoutType) {
super(setup, "queue");
}

/**
* Trigger this function when data added to device.
* The data is already parsed and encoded.
*/
public async onAddDeviceData(deviceID: TGenericID, data: IDatabaseDeviceDataCreate[]): Promise<void> {
throw new Error("Method not implemented");
}
}

export default QueueModule;
Loading

0 comments on commit 0f8f14c

Please sign in to comment.