diff --git a/bin/cli/commands/clean-stalled-frames.command.ts b/bin/cli/commands/clean-stalled-frames.command.ts new file mode 100644 index 00000000..3dda5bcf --- /dev/null +++ b/bin/cli/commands/clean-stalled-frames.command.ts @@ -0,0 +1,13 @@ +import { PrepareFunctionReturnType } from "../init"; +import { cleanStalledFrames } from "../tasks/clean-stalled-frames.task"; +import { CommandId } from "./id"; + +export default { + id: CommandId.CleanStalledFrames, + version: '0.0.1', + fn: async ( + { usecase: { bucketEntriesUsecase }, readers }: PrepareFunctionReturnType, + ): Promise => { + await cleanStalledFrames(bucketEntriesUsecase, readers.framesReader); + }, +}; diff --git a/bin/cli/commands/id.ts b/bin/cli/commands/id.ts index 02770c7d..e8d21802 100644 --- a/bin/cli/commands/id.ts +++ b/bin/cli/commands/id.ts @@ -2,4 +2,5 @@ export enum CommandId { DestroyUserBuckets = 'destroy-user-buckets', EmptyBucket = 'empty-bucket', EmptyBuckets = 'empty-buckets', + CleanStalledFrames = 'clean-stalled-frames', } diff --git a/bin/cli/commands/index.ts b/bin/cli/commands/index.ts index 19bb90e2..44792eb9 100644 --- a/bin/cli/commands/index.ts +++ b/bin/cli/commands/index.ts @@ -4,6 +4,7 @@ import { buildCommand } from './build' import { default as destroyUserBuckets } from "./destroy-user-buckets.command"; import { default as emptyBucket } from "./empty-bucket.command"; import { default as emptyBuckets } from "./empty-buckets.command"; +import { default as cleanStalledFrames } from "./clean-stalled-frames.command"; export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ({ [destroyUserBuckets.id]: buildCommand({ @@ -35,4 +36,14 @@ export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ( await emptyBuckets.fn(resources, userId); onFinish(); }), + + [cleanStalledFrames.id]: buildCommand({ + version: cleanStalledFrames.version, + command: `${cleanStalledFrames.id}`, + description: 'Cleans stalled frames', + options: [], + }).action(async () => { + await cleanStalledFrames.fn(resources); + onFinish(); + }), }); diff --git a/bin/cli/init.ts b/bin/cli/init.ts index f3b81bca..4a9d5e72 100644 --- a/bin/cli/init.ts +++ b/bin/cli/init.ts @@ -28,12 +28,17 @@ import { ShardsRepository } from '../../lib/core/shards/Repository'; import { UploadsRepository } from '../../lib/core/uploads/Repository'; import { TokensRepository } from '../../lib/core/tokens/Repository'; import { ContactsRepository } from '../../lib/core/contacts/Repository'; +import { MongoDB } from '../delete-objects/temp-shard.model'; +import { DatabaseFramesReader } from '../delete-objects/ObjectStorage'; const Config = require('../../lib/config'); const config = new Config(process.env.NODE_ENV || 'develop', '', ''); export type PrepareFunctionReturnType = { + readers: { + framesReader: DatabaseFramesReader, + }, repo: { bucketEntriesRepository: BucketEntriesRepository, bucketEntryShardsRepository:BucketEntryShardsRepository, @@ -57,6 +62,8 @@ export type PrepareFunctionReturnType = { export async function prepare(): Promise { const QUEUE_NAME = 'NETWORK_WORKER_TASKS_QUEUE'; + const newDbConnection = new MongoDB(process.env.inxtbridge_storage__mongoUri as string); + await newDbConnection.connect(); const models = await connectToDatabase('', ''); const { QUEUE_USERNAME, QUEUE_PASSWORD, QUEUE_HOST } = config; @@ -114,9 +121,15 @@ export async function prepare(): Promise { tokensRepository, contactsRepository, ) + const framesReader = new DatabaseFramesReader( + newDbConnection.getCollections().frames + ); await networkQueue.connectAndRetry(); return { + readers: { + framesReader, + }, repo: { bucketEntriesRepository, bucketEntryShardsRepository, diff --git a/bin/cli/tasks/clean-stalled-frames.task.ts b/bin/cli/tasks/clean-stalled-frames.task.ts new file mode 100644 index 00000000..4b230c5c --- /dev/null +++ b/bin/cli/tasks/clean-stalled-frames.task.ts @@ -0,0 +1,42 @@ +import { BucketEntriesUsecase } from "../../../lib/core/bucketEntries/usecase"; +import { FramesReader } from "../../delete-objects/ObjectStorage"; +import { Frame } from "../../../lib/core/frames/Frame"; +import { FrameDocument } from "../../delete-objects/temp-shard.model"; + +export type CleanStalledFramesFunctionType = ( + bucketEntriesUsecase: BucketEntriesUsecase, + reader: FramesReader, +) => Promise; + +export const cleanStalledFrames: CleanStalledFramesFunctionType = async ( + bucketEntriesUsecase, + reader, +): Promise => { + const deleteInBulksOf = 20; + const toDelete: { frame: Frame['id'], id: '', _frame: FrameDocument }[] = []; + const stats = { + totalSize: 0, + totalCount: 0, + } + + for await (const frame of reader.list()) { + if (frame.bucketEntry) { + const be = await bucketEntriesUsecase.findById(frame.bucketEntry); + + if (!be) { + console.log(`deleting frame ${frame._id}, be ${frame.bucketEntry}, size ${frame.size}`); + toDelete.push({ frame: frame._id.toString(), id: '', _frame: frame }); + } + } + if (toDelete.length === deleteInBulksOf) { + // await bucketEntriesUsecase.removeFilesV1(toDelete as any); + + stats.totalSize += toDelete.reduce((acc, curr) => acc + curr._frame.size, 0); + stats.totalCount += toDelete.length; + + toDelete.length = 0; + + console.log(`total size ${stats.totalSize}, total count ${stats.totalCount}`); + } + } +} diff --git a/bin/delete-objects/ObjectStorage.ts b/bin/delete-objects/ObjectStorage.ts index b9c81dc7..d368e6c1 100644 --- a/bin/delete-objects/ObjectStorage.ts +++ b/bin/delete-objects/ObjectStorage.ts @@ -5,7 +5,7 @@ import { createHash } from 'crypto'; import { ShardsRepository } from '../../lib/core/shards/Repository'; import { Shard } from '../../lib/core/shards/Shard'; -import { MongoDBCollections, TempShardDocument } from './temp-shard.model'; +import { FrameDocument, MongoDBCollections, TempShardDocument } from './temp-shard.model'; import { ObjectId } from 'mongodb'; export interface StorageObject { @@ -215,4 +215,70 @@ export class DatabaseTempShardsReader implements TempShardsReader { offset += tempShards.length; } while (offset % pageSize === 0); } -} \ No newline at end of file +} + +export interface FramesReader { + list(pageSize?: number): AsyncGenerator; +} + +export class DatabaseFramesReader { + constructor(private readonly frames: MongoDBCollections['frames']) {} + + async* list(pageSize = 50): AsyncGenerator { + const pipeline = [ + { + $lookup: { + from: 'bucketentries', + localField: '_id', + foreignField: 'frame', + as: 'matched_entries' + } + }, + { + $match: { + matched_entries: { $size: 0 } + } + }, + ]; + const cursor = this.frames.aggregate(pipeline); + + while (await cursor.hasNext()) { + const frame = await cursor.next(); + + if (frame) { + yield frame; + } + } + } +} + +export class DatabaseFramesReaderWithoutOwner { + constructor(private readonly frames: MongoDBCollections['frames']) {} + + async* list(pageSize = 50): AsyncGenerator { + const pipeline = [ + { + $lookup: { + from: "users", + localField: "user", + foreignField: "_id", + as: "user_info" + } + }, + { + $match: { + user_info: { $eq: [] } // Filtra los documentos donde no hay coincidencias en Users + } + }, + ]; + const cursor = this.frames.aggregate(pipeline); + + while (await cursor.hasNext()) { + const frame = await cursor.next(); + + if (frame) { + yield frame; + } + } + } +} diff --git a/bin/delete-objects/temp-shard.model.ts b/bin/delete-objects/temp-shard.model.ts index 2e2a5266..b33b5e7f 100644 --- a/bin/delete-objects/temp-shard.model.ts +++ b/bin/delete-objects/temp-shard.model.ts @@ -1,7 +1,9 @@ import { ObjectId, Document, Collection, Db, MongoClient } from 'mongodb'; +import { Frame } from '../../lib/core/frames/Frame'; export interface MongoDBCollections { tempShards: Collection; + frames: Collection; } interface TempShard extends Document { @@ -15,6 +17,10 @@ export interface TempShardDocument extends Omit { shardId: ObjectId; } +export interface FrameDocument extends Omit { + _id: ObjectId; +} + export class MongoDB { private uri: string; private db: Db | null; @@ -45,6 +51,7 @@ export class MongoDB { return { tempShards: this.db.collection('tempshards'), + frames: this.db.collection('frames') }; } diff --git a/lib/core/bucketEntries/usecase.ts b/lib/core/bucketEntries/usecase.ts index 3fa12526..c1bd76db 100644 --- a/lib/core/bucketEntries/usecase.ts +++ b/lib/core/bucketEntries/usecase.ts @@ -41,6 +41,12 @@ export class BucketEntriesUsecase { return bucketEntries; } + async findById(id: BucketEntry['id']): Promise { + const bucketEntry = await this.bucketEntriesRepository.findOne({ id }); + + return bucketEntry; + } + async countByBucket(bucketId: Bucket['id']): Promise { const count = await this.bucketEntriesRepository.count({ bucket: bucketId }); diff --git a/package.json b/package.json index 3e84389a..42c0be1b 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,6 @@ "@types/jest": "^27.4.1", "@types/jsonwebtoken": "^8.5.8", "@types/lodash": "^4.14.182", - "@types/mongoose": "^5.11.97", "@types/mysql": "^2.15.21", "@types/node": "^17.0.23", "@types/node-mongodb-fixtures": "^3.2.3", diff --git a/yarn.lock b/yarn.lock index db6fcffc..19665cd0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -844,13 +844,6 @@ "@types/bson" "*" "@types/node" "*" -"@types/mongoose@^5.11.97": - version "5.11.97" - resolved "https://registry.yarnpkg.com/@types/mongoose/-/mongoose-5.11.97.tgz#80b0357f3de6807eb597262f52e49c3e13ee14d8" - integrity sha512-cqwOVYT3qXyLiGw7ueU2kX9noE8DPGRY6z8eUxudhXY8NZ7DMKYAxyZkLSevGfhCX3dO/AoX5/SO9lAzfjon0Q== - dependencies: - mongoose "*" - "@types/mysql@^2.15.21": version "2.15.21" resolved "https://registry.yarnpkg.com/@types/mysql/-/mysql-2.15.21.tgz#7516cba7f9d077f980100c85fd500c8210bd5e45" @@ -1638,7 +1631,7 @@ bson@^1.1.4: resolved "https://registry.yarnpkg.com/bson/-/bson-1.1.6.tgz#fb819be9a60cd677e0853aee4ca712a785d6618a" integrity sha512-EvVNVeGo4tHxwi8L6bPj3y3itEvStdwvvlojVxxbyYfoaxJ6keLgrTuKdyfEAszFK+H3olzBuafE0yoh0D1gdg== -bson@^4.6.2, bson@^4.6.3: +bson@^4.6.3: version "4.7.2" resolved "https://registry.yarnpkg.com/bson/-/bson-4.7.2.tgz#320f4ad0eaf5312dd9b45dc369cc48945e2a5f2e" integrity sha512-Ry9wCtIZ5kGqkJoi6aD8KjxFZEx78guTQDnpXWiNthsxzrxAK/i8E6pCHAIZTbaEFWcOCvbecMukfK7XUvyLpQ== @@ -4992,11 +4985,6 @@ kareem@2.3.2: resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.3.2.tgz#78c4508894985b8d38a0dc15e1a8e11078f2ca93" integrity sha512-STHz9P7X2L4Kwn72fA4rGyqyXdmrMSdxqHx9IXon/FXluXieaFA6KJ2upcHAHxQPQ0LeM/OjLrhFxifHewOALQ== -kareem@2.4.1: - version "2.4.1" - resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.4.1.tgz#7d81ec518204a48c1cb16554af126806c3cd82b0" - integrity sha512-aJ9opVoXroQUPfovYP5kaj2lM7Jn02Gw13bL0lg9v0V7SaUc0qavPs0Eue7d2DcC3NjqI6QAUElXNsuZSeM+EA== - kareem@2.5.1: version "2.5.1" resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.5.1.tgz#7b8203e11819a8e77a34b3517d3ead206764d15d" @@ -5559,18 +5547,6 @@ mongodb@3.7.4: optionalDependencies: saslprep "^1.0.0" -mongodb@4.7.0, mongodb@^4.5.0, mongodb@^4.6.0: - version "4.7.0" - resolved "https://registry.yarnpkg.com/mongodb/-/mongodb-4.7.0.tgz#99f7323271d93659067695b60e7b4efee2de9bf0" - integrity sha512-HhVar6hsUeMAVlIbwQwWtV36iyjKd9qdhY+s4wcU8K6TOj4Q331iiMy+FoPuxEntDIijTYWivwFJkLv8q/ZgvA== - dependencies: - bson "^4.6.3" - denque "^2.0.1" - mongodb-connection-string-url "^2.5.2" - socks "^2.6.2" - optionalDependencies: - saslprep "^1.0.3" - mongodb@6.2.0: version "6.2.0" resolved "https://registry.yarnpkg.com/mongodb/-/mongodb-6.2.0.tgz#2c9dcb3eeaf528ed850e94b3df392de6c6b0d7ab" @@ -5593,6 +5569,18 @@ mongodb@^3.6.9: optionalDependencies: saslprep "^1.0.0" +mongodb@^4.5.0, mongodb@^4.6.0: + version "4.7.0" + resolved "https://registry.yarnpkg.com/mongodb/-/mongodb-4.7.0.tgz#99f7323271d93659067695b60e7b4efee2de9bf0" + integrity sha512-HhVar6hsUeMAVlIbwQwWtV36iyjKd9qdhY+s4wcU8K6TOj4Q331iiMy+FoPuxEntDIijTYWivwFJkLv8q/ZgvA== + dependencies: + bson "^4.6.3" + denque "^2.0.1" + mongodb-connection-string-url "^2.5.2" + socks "^2.6.2" + optionalDependencies: + saslprep "^1.0.3" + mongoose-int32@^0.1.0: version "0.1.0" resolved "https://registry.yarnpkg.com/mongoose-int32/-/mongoose-int32-0.1.0.tgz#2c8e138e99fa1b44e2470776177a76529a244303" @@ -5610,19 +5598,6 @@ mongoose-types@^1.0.3: dependencies: mongoose ">= 1.0.16" -mongoose@*: - version "6.4.4" - resolved "https://registry.yarnpkg.com/mongoose/-/mongoose-6.4.4.tgz#4e22a36373d8a867ee8f73063d8b31f1e451316d" - integrity sha512-r6sp96veRNhNIWFtHHe4Lqak+ilgiExYnnMLhYTGdzjIMR90G1ayx0JKFVdHuC6dKNHGFX0ETJGbf36N8Romjg== - dependencies: - bson "^4.6.2" - kareem "2.4.1" - mongodb "4.7.0" - mpath "0.9.0" - mquery "4.0.3" - ms "2.1.3" - sift "16.0.0" - mongoose@=4.11.14: version "4.11.14" resolved "https://registry.yarnpkg.com/mongoose/-/mongoose-4.11.14.tgz#b85402aaf28c5c3e45c8ef93fe69544eaa5d00f3" @@ -5715,13 +5690,6 @@ mquery@3.2.5: safe-buffer "5.1.2" sliced "1.0.1" -mquery@4.0.3: - version "4.0.3" - resolved "https://registry.yarnpkg.com/mquery/-/mquery-4.0.3.tgz#4d15f938e6247d773a942c912d9748bd1965f89d" - integrity sha512-J5heI+P08I6VJ2Ky3+33IpCdAvlYGTSUjwTPxkAr8i8EoduPMBX2OY/wa3IKZIQl7MU4SbFk8ndgSKyB/cl1zA== - dependencies: - debug "4.x" - mquery@5.0.0: version "5.0.0" resolved "https://registry.yarnpkg.com/mquery/-/mquery-5.0.0.tgz#a95be5dfc610b23862df34a47d3e5d60e110695d" @@ -7154,11 +7122,6 @@ sift@13.5.2: resolved "https://registry.yarnpkg.com/sift/-/sift-13.5.2.tgz#24a715e13c617b086166cd04917d204a591c9da6" integrity sha512-+gxdEOMA2J+AI+fVsCqeNn7Tgx3M9ZN9jdi95939l1IJ8cZsqS8sqpJyOkic2SJk+1+98Uwryt/gL6XDaV+UZA== -sift@16.0.0: - version "16.0.0" - resolved "https://registry.yarnpkg.com/sift/-/sift-16.0.0.tgz#447991577db61f1a8fab727a8a98a6db57a23eb8" - integrity sha512-ILTjdP2Mv9V1kIxWMXeMTIRbOBrqKc4JAXmFMnFq3fKeyQ2Qwa3Dw1ubcye3vR+Y6ofA0b9gNDr/y2t6eUeIzQ== - sift@16.0.1: version "16.0.1" resolved "https://registry.yarnpkg.com/sift/-/sift-16.0.1.tgz#e9c2ccc72191585008cf3e36fc447b2d2633a053"