Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue plugin #29

Merged
merged 6 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/tcore-api/src/Plugins/Module/Module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class Module {
const values = moduleSettings?.values || {};

const conf = this.setup.configs || [];
const defs = conf?.filter((x) => "defaultValue" in x && x.defaultValue);
const defs = conf?.filter((x) => "defaultValue" in x && x.defaultValue !== "");
const flat = flattenConfigFields(defs);
const defsObject = {};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ describe("addDeviceDataByDevice", () => {
const arg1 = mock.mock.calls[0][0];
const arg2 = mock.mock.calls[0][1];
expect(arg1).toEqual(activeDevice.id);
expect(arg2.updated_at).toBeInstanceOf(Date);
expect(arg2.last_input).toBeInstanceOf(Date);
});

Expand Down
138 changes: 102 additions & 36 deletions packages/tcore-api/src/Services/DeviceData/DeviceData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { editDevice, getDeviceInfo } from "../Device";
import { addStatistic } from "../Statistic";
import { runPayloadParser } from "../PayloadParserCodeExecution";
import { emitToLiveInspector, getLiveInspectorID } from "../LiveInspector";
import { triggerHooks } from "../Plugins";
import { getMainQueueModule, triggerHooks } from "../Plugins";

const LIMIT_DATA_ON_MUTABLE = 50_000;
const MAXIMUM_MONTH_RANGE = 1.1;
Expand Down Expand Up @@ -168,54 +168,41 @@ function isImmutableTimeOutOfRange(time: Date, period: IDeviceChunkPeriod, reten
}

/**
* Adds data into a device by an actual device object.
* @param {IDevice} device Device object who sent the data.
* @param {any} data Data to be inserted.
* Validates the amount of mutable data points.
* Throws an error if something is wrong.
*/
export const addDeviceDataByDevice = async (device: IDevice, data: any, options?: IDeviceAddDataOptions) => {
if (!device || !device.active) {
throw new Error("Device not found or inactive");
}
if (!options) {
options = {};
}
if (!options.liveInspectorID) {
options.liveInspectorID = getLiveInspectorID(device);
}

if (Array.isArray(data)) {
options.rawPayload = data;
} else if (typeof data === "object") {
options.rawPayload = [data];
} else {
options.rawPayload = data;
}

async function validateMutableDataAmount(device: IDevice) {
if (device.type === "mutable") {
const amount = await getDeviceDataAmount(device.id);
if (amount >= LIMIT_DATA_ON_MUTABLE) {
throw new Error(`The device has reached the limit of ${LIMIT_DATA_ON_MUTABLE} data registers`);
}
}
}

let items: any = data;
items = await applyPayloadEncoder(device, items, options);
items = await runPayloadParser(device, items, options);
items = await z.array(zDeviceDataCreate).parseAsync([items].flat());

/**
* Validates the time range for immutable data points.
* Throws an error if something is wrong.
*/
async function validateImmutableTimeRange(device: IDevice, items: any) {
for (const item of items) {
if (device.type === "immutable" && device.chunk_period) {
const outOfRage = isImmutableTimeOutOfRange(item.time, device.chunk_period, device.chunk_retention || 0);

if (!outOfRage.isOk) {
const title = `Time must be between ${outOfRage.startDate} and ${outOfRage.endDate}`;
await emitToLiveInspector(device, { title, content: item }, options.liveInspectorID);
throw new Error(title);
}
}
}
}

await emitToLiveInspector(device, { title: "Raw Payload", content: data }, options.liveInspectorID);
/**
* Apply the Zod formatting capability on the data items, and also makes sure that
* their format respects the latest tago format.
*/
async function applyZodDeviceData(items: any) {
items = await z.array(zDeviceDataCreate).parseAsync([items].flat());

const group = generateResourceID().split("").reverse().join("");
const now = new Date();
Expand All @@ -228,34 +215,113 @@ export const addDeviceDataByDevice = async (device: IDevice, data: any, options?
delete item.serie;
}

return items;
}

/**
* Formats the timestamp for the chunks of the data.
*/
async function formatChunkTimestamp(device: IDevice, data: any) {
// map the items to insert into database
const dbInsertItems = items.map((x: IDeviceDataCreate) => {
return data.map((x: IDeviceDataCreate) => {
const chunkTimestamp = getChunkTimestamp(x.time as Date, device);
return {
...x,
chunk_timestamp_start: chunkTimestamp?.startDate,
chunk_timestamp_end: chunkTimestamp?.endDate,
};
});
}

/**
* Adds data into a device by an actual device object.
* @param {IDevice} device Device object who sent the data.
* @param {any} data Data to be inserted.
*/
export const addDeviceDataByDevice = async (device: IDevice, data: any, options?: IDeviceAddDataOptions) => {
if (!device || !device.active) {
throw new Error("Device not found or inactive");
}
if (!options) {
options = {};
}
if (!options.liveInspectorID) {
options.liveInspectorID = getLiveInspectorID(device);
}

if (Array.isArray(data)) {
options.rawPayload = data;
} else if (typeof data === "object") {
options.rawPayload = [data];
} else {
options.rawPayload = data;
}

await validateMutableDataAmount(device);

let items: any = data;
items = await applyPayloadEncoder(device, items, options);
items = await runPayloadParser(device, items, options);
items = await z.array(zDeviceDataCreate).parseAsync([items].flat());

await validateImmutableTimeRange(device, items);

await emitToLiveInspector(device, { title: "Raw Payload", content: data }, options.liveInspectorID);

items = await applyZodDeviceData(items);
items = await formatChunkTimestamp(device, items);

return await addDataToQueue(device, items);
};

/**
* Adds the data point directly into the database.
*/
async function addDataToDatabase(device: IDevice, data: any) {
let items: any = await z.array(zDeviceDataCreate).parseAsync([data].flat());
await validateMutableDataAmount(device);
await validateImmutableTimeRange(device, items);
items = await applyZodDeviceData(items);
items = await formatChunkTimestamp(device, items);

await invokeDatabaseFunction("addDeviceData", device.id, device.type, dbInsertItems);
await invokeDatabaseFunction("addDeviceData", device.id, device.type, items);

triggerActions(device.id, items).catch(() => null);
await addStatistic({ input: items.length });
await editDevice(device.id, { updated_at: now, last_input: now });
await editDevice(device.id, { last_input: new Date() });

triggerHooks("onAfterInsertDeviceData", device.id, data);
triggerHooks("onAfterInsertDeviceData", device.id, items);

return `${items.length} items added`;
};
}

/**
* Adds the data point to the queue.
*/
async function addDataToQueue(device: IDevice, data: any) {
const queue = await getMainQueueModule();

if (!queue) {
return await addDataToDatabase(device, data);
}

await queue.invoke("onAddDeviceData", device.id, data);

return `${data.length} items added to queue`;
}

/**
* Adds data into a device by device ID.
* @param {TGenericID} deviceID ID of the device who sent the data.
* @param {any} data Data to be inserted.
*/
export async function addDeviceData(deviceID: TGenericID, data: any) {
export async function addDeviceData(deviceID: TGenericID, data: any, options?: { forceDBInsert: boolean }) {
const device = await getDeviceInfo(deviceID);

if (options?.forceDBInsert) {
return await addDataToDatabase(device, data);
}

return await addDeviceDataByDevice(device, data);
}

Expand Down
21 changes: 21 additions & 0 deletions packages/tcore-api/src/Services/Plugins.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,27 @@ export async function getMainDatabaseModule(): Promise<Module | null> {
return null;
}

/**
* Finds and returns the main queue plugin.
*/
export async function getMainQueueModule(): Promise<Module | null> {
const settings = await getMainSettings();
const [pluginID, moduleID] = String(settings.queue_plugin).split(":");

if (pluginID && moduleID) {
const plugin = plugins.get(pluginID);
if (plugin) {
const module = plugin.modules.get(moduleID);

if (module?.state === "started") {
return module;
}
}
}

return null;
}

/**
* TODO
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/tcore-api/src/Services/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export async function getMainSettings(): Promise<ISettings> {

const filesystem_plugin = process.env.TCORE_FILESYSTEM_PLUGIN || data.filesystem_plugin || "";
const database_plugin = process.env.TCORE_DATABASE_PLUGIN || data.database_plugin || "";
const queue_plugin = process.env.TCORE_QUEUE_PLUGIN || data.queue_plugin || "";
const settings_folder = process.env.TCORE_SETTINGS_FOLDER || folder;
const plugin_folder = process.env.TCORE_PLUGINS_FOLDER || data.plugin_folder || (await getPluginsFolder());
const port = process.env.TCORE_PORT || data.port || "8888";
Expand All @@ -84,6 +85,7 @@ export async function getMainSettings(): Promise<ISettings> {
const settings: ISettings = {
filesystem_plugin,
database_plugin,
queue_plugin,
plugin_folder,
port,
settings_folder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ function PluginEdit() {

const flattened = flattenConfigFields(module.configs || []);
for (const field of flattened) {
if ("defaultValue" in field && field.defaultValue) {
newValues[module.id][field.field] = field.defaultValue;
if ("defaultValue" in field) {
if (field.defaultValue === 0) {
newValues[module.id][field.field] = "0";
} else {
newValues[module.id][field.field] = field.defaultValue;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ interface IGeneralInformationTabProps {
function GeneralInformationTab(props: IGeneralInformationTabProps) {
const [resetting, setResetting] = useState(false);
const { data: databaseList } = useApiRequest<any[]>("/module?type=database");
const { data: queueList } = useApiRequest<any[]>("/module?type=queue");
const { data: filesystemList } = useApiRequest<any[]>("/module?type=filesystem");
const { data, metadata, errors } = props;

Expand All @@ -58,6 +59,17 @@ function GeneralInformationTab(props: IGeneralInformationTabProps) {
return options;
};

/**
* Gets a list of database options based on the backend list.
*/
const getQueueOptions = () => {
const options: ISelectOption[] = [{ label: "None", value: "" }];
for (const item of queueList || []) {
options.push({ label: item.setupName, value: `${item.pluginID}:${item.setupID}` });
}
return options;
};

/**
* Gets a list of filesystem options based on the backend list.
*/
Expand Down Expand Up @@ -158,6 +170,21 @@ function GeneralInformationTab(props: IGeneralInformationTabProps) {
/>
</FormGroup>

<FormGroup
tooltip={`The plugin that will be used as the main queue for ${getSystemName()}.`}
icon={EIcon.list}
label="Queue plugin"
>
<Select
onChange={(e) => props.onChange("queue_plugin", e.target.value)}
placeholder="Select the default queue plugin"
value={data.queue_plugin}
error={errors?.queue_plugin}
options={getQueueOptions()}
disabled={!queueList?.length}
/>
</FormGroup>

<FormGroup
tooltip={`The plugin that will be used as the main filesystem for ${getSystemName()}.`}
icon={EIcon.folder}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ function InstallLocalPluginButton() {
*/
const deactivateModalInstall = useCallback(() => {
setModalInstall(false);
window.location.reload();
}, []);

return (
Expand Down
13 changes: 9 additions & 4 deletions packages/tcore-sdk/src/Lib/Core/Core.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,15 @@ test("addDeviceData", async () => {
value: 10,
});
expect(response).toEqual(undefined);
expect(fn).toHaveBeenCalledWith("addDeviceData", "61261ef1f87480ff318b7bcb", {
variable: "temperature",
value: 10,
});
expect(fn).toHaveBeenCalledWith(
"addDeviceData",
"61261ef1f87480ff318b7bcb",
{
variable: "temperature",
value: 10,
},
undefined
);
});

test("getDeviceData works without query", async () => {
Expand Down
4 changes: 2 additions & 2 deletions packages/tcore-sdk/src/Lib/Core/Core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ class Core extends APIBridge {
/**
* Adds a data item into a device.
*/
public async addDeviceData(deviceID: TGenericID, data: any): Promise<void> {
await this.invokeApiMethod("addDeviceData", deviceID, data);
public async addDeviceData(deviceID: TGenericID, data: any, options?: { forceDBInsert: boolean }): Promise<void> {
await this.invokeApiMethod("addDeviceData", deviceID, data, options);
}

/**
Expand Down
21 changes: 21 additions & 0 deletions packages/tcore-sdk/src/Lib/QueueModule/QueueModule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { IDatabaseDeviceDataCreate, IModuleSetupWithoutType, TGenericID, TDeviceType } from "../../Types";
import TCoreModule from "../TCoreModule/TCoreModule";

/**
* This module allows the creation of a new queue plugin.
*/
class QueueModule extends TCoreModule {
constructor(protected setup: IModuleSetupWithoutType) {
super(setup, "queue");
}

/**
* Trigger this function when data added to device.
* The data is already parsed and encoded.
*/
public async onAddDeviceData(deviceID: TGenericID, data: IDatabaseDeviceDataCreate[]): Promise<void> {
throw new Error("Method not implemented");
}
}

export default QueueModule;
Loading