diff --git a/packages/tcore-api/src/Plugins/Module/Module.ts b/packages/tcore-api/src/Plugins/Module/Module.ts index f049148a..fb32cc30 100644 --- a/packages/tcore-api/src/Plugins/Module/Module.ts +++ b/packages/tcore-api/src/Plugins/Module/Module.ts @@ -133,7 +133,7 @@ class Module { const values = moduleSettings?.values || {}; const conf = this.setup.configs || []; - const defs = conf?.filter((x) => "defaultValue" in x && x.defaultValue); + const defs = conf?.filter((x) => "defaultValue" in x && x.defaultValue !== ""); const flat = flattenConfigFields(defs); const defsObject = {}; diff --git a/packages/tcore-api/src/Services/DeviceData/DeviceData.test.ts b/packages/tcore-api/src/Services/DeviceData/DeviceData.test.ts index 2fab818c..d63206bc 100644 --- a/packages/tcore-api/src/Services/DeviceData/DeviceData.test.ts +++ b/packages/tcore-api/src/Services/DeviceData/DeviceData.test.ts @@ -200,7 +200,6 @@ describe("addDeviceDataByDevice", () => { const arg1 = mock.mock.calls[0][0]; const arg2 = mock.mock.calls[0][1]; expect(arg1).toEqual(activeDevice.id); - expect(arg2.updated_at).toBeInstanceOf(Date); expect(arg2.last_input).toBeInstanceOf(Date); }); diff --git a/packages/tcore-api/src/Services/DeviceData/DeviceData.ts b/packages/tcore-api/src/Services/DeviceData/DeviceData.ts index 3a8d4baa..3513b6da 100644 --- a/packages/tcore-api/src/Services/DeviceData/DeviceData.ts +++ b/packages/tcore-api/src/Services/DeviceData/DeviceData.ts @@ -23,7 +23,7 @@ import { editDevice, getDeviceInfo } from "../Device"; import { addStatistic } from "../Statistic"; import { runPayloadParser } from "../PayloadParserCodeExecution"; import { emitToLiveInspector, getLiveInspectorID } from "../LiveInspector"; -import { triggerHooks } from "../Plugins"; +import { getMainQueueModule, triggerHooks } from "../Plugins"; const LIMIT_DATA_ON_MUTABLE = 50_000; const MAXIMUM_MONTH_RANGE = 1.1; @@ -168,54 +168,41 @@ function isImmutableTimeOutOfRange(time: Date, period: IDeviceChunkPeriod, reten } /** - * Adds data into a device by an actual device object. - * @param {IDevice} device Device object who sent the data. - * @param {any} data Data to be inserted. + * Validates the amount of mutable data points. + * Throws an error if something is wrong. */ -export const addDeviceDataByDevice = async (device: IDevice, data: any, options?: IDeviceAddDataOptions) => { - if (!device || !device.active) { - throw new Error("Device not found or inactive"); - } - if (!options) { - options = {}; - } - if (!options.liveInspectorID) { - options.liveInspectorID = getLiveInspectorID(device); - } - - if (Array.isArray(data)) { - options.rawPayload = data; - } else if (typeof data === "object") { - options.rawPayload = [data]; - } else { - options.rawPayload = data; - } - +async function validateMutableDataAmount(device: IDevice) { if (device.type === "mutable") { const amount = await getDeviceDataAmount(device.id); if (amount >= LIMIT_DATA_ON_MUTABLE) { throw new Error(`The device has reached the limit of ${LIMIT_DATA_ON_MUTABLE} data registers`); } } +} - let items: any = data; - items = await applyPayloadEncoder(device, items, options); - items = await runPayloadParser(device, items, options); - items = await z.array(zDeviceDataCreate).parseAsync([items].flat()); - +/** + * Validates the time range for immutable data points. + * Throws an error if something is wrong. + */ +async function validateImmutableTimeRange(device: IDevice, items: any) { for (const item of items) { if (device.type === "immutable" && device.chunk_period) { const outOfRage = isImmutableTimeOutOfRange(item.time, device.chunk_period, device.chunk_retention || 0); if (!outOfRage.isOk) { const title = `Time must be between ${outOfRage.startDate} and ${outOfRage.endDate}`; - await emitToLiveInspector(device, { title, content: item }, options.liveInspectorID); throw new Error(title); } } } +} - await emitToLiveInspector(device, { title: "Raw Payload", content: data }, options.liveInspectorID); +/** + * Apply the Zod formatting capability on the data items, and also makes sure that + * their format respects the latest tago format. + */ +async function applyZodDeviceData(items: any) { + items = await z.array(zDeviceDataCreate).parseAsync([items].flat()); const group = generateResourceID().split("").reverse().join(""); const now = new Date(); @@ -228,8 +215,15 @@ export const addDeviceDataByDevice = async (device: IDevice, data: any, options? delete item.serie; } + return items; +} + +/** + * Formats the timestamp for the chunks of the data. + */ +async function formatChunkTimestamp(device: IDevice, data: any) { // map the items to insert into database - const dbInsertItems = items.map((x: IDeviceDataCreate) => { + return data.map((x: IDeviceDataCreate) => { const chunkTimestamp = getChunkTimestamp(x.time as Date, device); return { ...x, @@ -237,25 +231,97 @@ export const addDeviceDataByDevice = async (device: IDevice, data: any, options? chunk_timestamp_end: chunkTimestamp?.endDate, }; }); +} + +/** + * Adds data into a device by an actual device object. + * @param {IDevice} device Device object who sent the data. + * @param {any} data Data to be inserted. + */ +export const addDeviceDataByDevice = async (device: IDevice, data: any, options?: IDeviceAddDataOptions) => { + if (!device || !device.active) { + throw new Error("Device not found or inactive"); + } + if (!options) { + options = {}; + } + if (!options.liveInspectorID) { + options.liveInspectorID = getLiveInspectorID(device); + } + + if (Array.isArray(data)) { + options.rawPayload = data; + } else if (typeof data === "object") { + options.rawPayload = [data]; + } else { + options.rawPayload = data; + } + + await validateMutableDataAmount(device); + + let items: any = data; + items = await applyPayloadEncoder(device, items, options); + items = await runPayloadParser(device, items, options); + items = await z.array(zDeviceDataCreate).parseAsync([items].flat()); + + await validateImmutableTimeRange(device, items); + + await emitToLiveInspector(device, { title: "Raw Payload", content: data }, options.liveInspectorID); + + items = await applyZodDeviceData(items); + items = await formatChunkTimestamp(device, items); + + return await addDataToQueue(device, items); +}; + +/** + * Adds the data point directly into the database. + */ +async function addDataToDatabase(device: IDevice, data: any) { + let items: any = await z.array(zDeviceDataCreate).parseAsync([data].flat()); + await validateMutableDataAmount(device); + await validateImmutableTimeRange(device, items); + items = await applyZodDeviceData(items); + items = await formatChunkTimestamp(device, items); - await invokeDatabaseFunction("addDeviceData", device.id, device.type, dbInsertItems); + await invokeDatabaseFunction("addDeviceData", device.id, device.type, items); triggerActions(device.id, items).catch(() => null); await addStatistic({ input: items.length }); - await editDevice(device.id, { updated_at: now, last_input: now }); + await editDevice(device.id, { last_input: new Date() }); - triggerHooks("onAfterInsertDeviceData", device.id, data); + triggerHooks("onAfterInsertDeviceData", device.id, items); return `${items.length} items added`; -}; +} + +/** + * Adds the data point to the queue. + */ +async function addDataToQueue(device: IDevice, data: any) { + const queue = await getMainQueueModule(); + + if (!queue) { + return await addDataToDatabase(device, data); + } + + await queue.invoke("onAddDeviceData", device.id, data); + + return `${data.length} items added to queue`; +} /** * Adds data into a device by device ID. * @param {TGenericID} deviceID ID of the device who sent the data. * @param {any} data Data to be inserted. */ -export async function addDeviceData(deviceID: TGenericID, data: any) { +export async function addDeviceData(deviceID: TGenericID, data: any, options?: { forceDBInsert: boolean }) { const device = await getDeviceInfo(deviceID); + + if (options?.forceDBInsert) { + return await addDataToDatabase(device, data); + } + return await addDeviceDataByDevice(device, data); } diff --git a/packages/tcore-api/src/Services/Plugins.ts b/packages/tcore-api/src/Services/Plugins.ts index 20783994..9b375dd6 100644 --- a/packages/tcore-api/src/Services/Plugins.ts +++ b/packages/tcore-api/src/Services/Plugins.ts @@ -374,6 +374,27 @@ export async function getMainDatabaseModule(): Promise { return null; } +/** + * Finds and returns the main queue plugin. + */ +export async function getMainQueueModule(): Promise { + const settings = await getMainSettings(); + const [pluginID, moduleID] = String(settings.queue_plugin).split(":"); + + if (pluginID && moduleID) { + const plugin = plugins.get(pluginID); + if (plugin) { + const module = plugin.modules.get(moduleID); + + if (module?.state === "started") { + return module; + } + } + } + + return null; +} + /** * TODO */ diff --git a/packages/tcore-api/src/Services/Settings.ts b/packages/tcore-api/src/Services/Settings.ts index 1f21b92f..61348a4d 100644 --- a/packages/tcore-api/src/Services/Settings.ts +++ b/packages/tcore-api/src/Services/Settings.ts @@ -75,6 +75,7 @@ export async function getMainSettings(): Promise { const filesystem_plugin = process.env.TCORE_FILESYSTEM_PLUGIN || data.filesystem_plugin || ""; const database_plugin = process.env.TCORE_DATABASE_PLUGIN || data.database_plugin || ""; + const queue_plugin = process.env.TCORE_QUEUE_PLUGIN || data.queue_plugin || ""; const settings_folder = process.env.TCORE_SETTINGS_FOLDER || folder; const plugin_folder = process.env.TCORE_PLUGINS_FOLDER || data.plugin_folder || (await getPluginsFolder()); const port = process.env.TCORE_PORT || data.port || "8888"; @@ -84,6 +85,7 @@ export async function getMainSettings(): Promise { const settings: ISettings = { filesystem_plugin, database_plugin, + queue_plugin, plugin_folder, port, settings_folder, diff --git a/packages/tcore-console/src/Components/Plugins/Edit/PluginEdit.tsx b/packages/tcore-console/src/Components/Plugins/Edit/PluginEdit.tsx index 436ce13a..1c4d6530 100644 --- a/packages/tcore-console/src/Components/Plugins/Edit/PluginEdit.tsx +++ b/packages/tcore-console/src/Components/Plugins/Edit/PluginEdit.tsx @@ -48,8 +48,12 @@ function PluginEdit() { const flattened = flattenConfigFields(module.configs || []); for (const field of flattened) { - if ("defaultValue" in field && field.defaultValue) { - newValues[module.id][field.field] = field.defaultValue; + if ("defaultValue" in field) { + if (field.defaultValue === 0) { + newValues[module.id][field.field] = "0"; + } else { + newValues[module.id][field.field] = field.defaultValue; + } } } } diff --git a/packages/tcore-console/src/Components/Settings/Edit/GeneralInformationTab/GeneralInformationTab.tsx b/packages/tcore-console/src/Components/Settings/Edit/GeneralInformationTab/GeneralInformationTab.tsx index 8e932f19..f10c83ba 100644 --- a/packages/tcore-console/src/Components/Settings/Edit/GeneralInformationTab/GeneralInformationTab.tsx +++ b/packages/tcore-console/src/Components/Settings/Edit/GeneralInformationTab/GeneralInformationTab.tsx @@ -44,6 +44,7 @@ interface IGeneralInformationTabProps { function GeneralInformationTab(props: IGeneralInformationTabProps) { const [resetting, setResetting] = useState(false); const { data: databaseList } = useApiRequest("/module?type=database"); + const { data: queueList } = useApiRequest("/module?type=queue"); const { data: filesystemList } = useApiRequest("/module?type=filesystem"); const { data, metadata, errors } = props; @@ -58,6 +59,17 @@ function GeneralInformationTab(props: IGeneralInformationTabProps) { return options; }; + /** + * Gets a list of database options based on the backend list. + */ + const getQueueOptions = () => { + const options: ISelectOption[] = [{ label: "None", value: "" }]; + for (const item of queueList || []) { + options.push({ label: item.setupName, value: `${item.pluginID}:${item.setupID}` }); + } + return options; + }; + /** * Gets a list of filesystem options based on the backend list. */ @@ -158,6 +170,21 @@ function GeneralInformationTab(props: IGeneralInformationTabProps) { /> + +