Skip to content

Commit

Permalink
feat(bin): clean-stalled-frames command
Browse files Browse the repository at this point in the history
  • Loading branch information
sg-gs committed Jan 11, 2024
1 parent d8dd55a commit 2120817
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 53 deletions.
13 changes: 13 additions & 0 deletions bin/cli/commands/clean-stalled-frames.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { PrepareFunctionReturnType } from "../init";
import { cleanStalledFrames } from "../tasks/clean-stalled-frames.task";
import { CommandId } from "./id";

export default {
id: CommandId.CleanStalledFrames,
version: '0.0.1',
fn: async (
{ usecase: { bucketEntriesUsecase }, readers }: PrepareFunctionReturnType,
): Promise<void> => {
await cleanStalledFrames(bucketEntriesUsecase, readers.framesReader);
},
};
1 change: 1 addition & 0 deletions bin/cli/commands/id.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export enum CommandId {
DestroyUserBuckets = 'destroy-user-buckets',
EmptyBucket = 'empty-bucket',
EmptyBuckets = 'empty-buckets',
CleanStalledFrames = 'clean-stalled-frames',
}
11 changes: 11 additions & 0 deletions bin/cli/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { buildCommand } from './build'
import { default as destroyUserBuckets } from "./destroy-user-buckets.command";
import { default as emptyBucket } from "./empty-bucket.command";
import { default as emptyBuckets } from "./empty-buckets.command";
import { default as cleanStalledFrames } from "./clean-stalled-frames.command";

export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ({
[destroyUserBuckets.id]: buildCommand({
Expand Down Expand Up @@ -35,4 +36,14 @@ export default (resources: PrepareFunctionReturnType, onFinish: () => void) => (
await emptyBuckets.fn(resources, userId);
onFinish();
}),

[cleanStalledFrames.id]: buildCommand({
version: cleanStalledFrames.version,
command: `${cleanStalledFrames.id}`,
description: 'Cleans stalled frames',
options: [],
}).action(async () => {
await cleanStalledFrames.fn(resources);
onFinish();
}),
});
13 changes: 13 additions & 0 deletions bin/cli/init.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ import { ShardsRepository } from '../../lib/core/shards/Repository';
import { UploadsRepository } from '../../lib/core/uploads/Repository';
import { TokensRepository } from '../../lib/core/tokens/Repository';
import { ContactsRepository } from '../../lib/core/contacts/Repository';
import { MongoDB } from '../delete-objects/temp-shard.model';
import { DatabaseFramesReader } from '../delete-objects/ObjectStorage';

const Config = require('../../lib/config');

const config = new Config(process.env.NODE_ENV || 'develop', '', '');

export type PrepareFunctionReturnType = {
readers: {
framesReader: DatabaseFramesReader,
},
repo: {
bucketEntriesRepository: BucketEntriesRepository,
bucketEntryShardsRepository:BucketEntryShardsRepository,
Expand All @@ -57,6 +62,8 @@ export type PrepareFunctionReturnType = {
export async function prepare(): Promise<PrepareFunctionReturnType> {
const QUEUE_NAME = 'NETWORK_WORKER_TASKS_QUEUE';

const newDbConnection = new MongoDB(process.env.inxtbridge_storage__mongoUri as string);
await newDbConnection.connect();
const models = await connectToDatabase('', '');
const { QUEUE_USERNAME, QUEUE_PASSWORD, QUEUE_HOST } = config;

Expand Down Expand Up @@ -114,9 +121,15 @@ export async function prepare(): Promise<PrepareFunctionReturnType> {
tokensRepository,
contactsRepository,
)
const framesReader = new DatabaseFramesReader(
newDbConnection.getCollections().frames
);
await networkQueue.connectAndRetry();

return {
readers: {
framesReader,
},
repo: {
bucketEntriesRepository,
bucketEntryShardsRepository,
Expand Down
42 changes: 42 additions & 0 deletions bin/cli/tasks/clean-stalled-frames.task.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { BucketEntriesUsecase } from "../../../lib/core/bucketEntries/usecase";
import { FramesReader } from "../../delete-objects/ObjectStorage";
import { Frame } from "../../../lib/core/frames/Frame";
import { FrameDocument } from "../../delete-objects/temp-shard.model";

export type CleanStalledFramesFunctionType = (
bucketEntriesUsecase: BucketEntriesUsecase,
reader: FramesReader,
) => Promise<void>;

export const cleanStalledFrames: CleanStalledFramesFunctionType = async (
bucketEntriesUsecase,
reader,
): Promise<void> => {
const deleteInBulksOf = 20;
const toDelete: { frame: Frame['id'], id: '', _frame: FrameDocument }[] = [];
const stats = {
totalSize: 0,
totalCount: 0,
}

for await (const frame of reader.list()) {
if (frame.bucketEntry) {
const be = await bucketEntriesUsecase.findById(frame.bucketEntry);

if (!be) {
console.log(`deleting frame ${frame._id}, be ${frame.bucketEntry}, size ${frame.size}`);
toDelete.push({ frame: frame._id.toString(), id: '', _frame: frame });
}
}
if (toDelete.length === deleteInBulksOf) {
// await bucketEntriesUsecase.removeFilesV1(toDelete as any);

stats.totalSize += toDelete.reduce((acc, curr) => acc + curr._frame.size, 0);
stats.totalCount += toDelete.length;

toDelete.length = 0;

console.log(`total size ${stats.totalSize}, total count ${stats.totalCount}`);
}
}
}
70 changes: 68 additions & 2 deletions bin/delete-objects/ObjectStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { createHash } from 'crypto';

import { ShardsRepository } from '../../lib/core/shards/Repository';
import { Shard } from '../../lib/core/shards/Shard';
import { MongoDBCollections, TempShardDocument } from './temp-shard.model';
import { FrameDocument, MongoDBCollections, TempShardDocument } from './temp-shard.model';
import { ObjectId } from 'mongodb';

export interface StorageObject {
Expand Down Expand Up @@ -215,4 +215,70 @@ export class DatabaseTempShardsReader implements TempShardsReader {
offset += tempShards.length;
} while (offset % pageSize === 0);
}
}
}

export interface FramesReader {
list(pageSize?: number): AsyncGenerator<FrameDocument>;
}

export class DatabaseFramesReader {
constructor(private readonly frames: MongoDBCollections['frames']) {}

async* list(pageSize = 50): AsyncGenerator<FrameDocument> {
const pipeline = [
{
$lookup: {
from: 'bucketentries',
localField: '_id',
foreignField: 'frame',
as: 'matched_entries'
}
},
{
$match: {
matched_entries: { $size: 0 }
}
},
];
const cursor = this.frames.aggregate<FrameDocument>(pipeline);

while (await cursor.hasNext()) {
const frame = await cursor.next();

if (frame) {
yield frame;
}
}
}
}

export class DatabaseFramesReaderWithoutOwner {
constructor(private readonly frames: MongoDBCollections['frames']) {}

async* list(pageSize = 50): AsyncGenerator<FrameDocument> {
const pipeline = [
{
$lookup: {
from: "users",
localField: "user",
foreignField: "_id",
as: "user_info"
}
},
{
$match: {
user_info: { $eq: [] } // Filtra los documentos donde no hay coincidencias en Users
}
},
];
const cursor = this.frames.aggregate<FrameDocument>(pipeline);

while (await cursor.hasNext()) {
const frame = await cursor.next();

if (frame) {
yield frame;
}
}
}
}
7 changes: 7 additions & 0 deletions bin/delete-objects/temp-shard.model.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { ObjectId, Document, Collection, Db, MongoClient } from 'mongodb';
import { Frame } from '../../lib/core/frames/Frame';

export interface MongoDBCollections {
tempShards: Collection<TempShardDocument>;
frames: Collection<FrameDocument>;
}

interface TempShard extends Document {
Expand All @@ -15,6 +17,10 @@ export interface TempShardDocument extends Omit<TempShard, 'shardId'> {
shardId: ObjectId;
}

export interface FrameDocument extends Omit<Frame, 'id'> {
_id: ObjectId;
}

export class MongoDB {
private uri: string;
private db: Db | null;
Expand Down Expand Up @@ -45,6 +51,7 @@ export class MongoDB {

return {
tempShards: this.db.collection<TempShardDocument>('tempshards'),
frames: this.db.collection<FrameDocument>('frames')
};
}

Expand Down
6 changes: 6 additions & 0 deletions lib/core/bucketEntries/usecase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ export class BucketEntriesUsecase {
return bucketEntries;
}

async findById(id: BucketEntry['id']): Promise<BucketEntry | null> {
const bucketEntry = await this.bucketEntriesRepository.findOne({ id });

return bucketEntry;
}

async countByBucket(bucketId: Bucket['id']): Promise<number> {
const count = await this.bucketEntriesRepository.count({ bucket: bucketId });

Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
"@types/jest": "^27.4.1",
"@types/jsonwebtoken": "^8.5.8",
"@types/lodash": "^4.14.182",
"@types/mongoose": "^5.11.97",
"@types/mysql": "^2.15.21",
"@types/node": "^17.0.23",
"@types/node-mongodb-fixtures": "^3.2.3",
Expand Down
63 changes: 13 additions & 50 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -844,13 +844,6 @@
"@types/bson" "*"
"@types/node" "*"

"@types/mongoose@^5.11.97":
version "5.11.97"
resolved "https://registry.yarnpkg.com/@types/mongoose/-/mongoose-5.11.97.tgz#80b0357f3de6807eb597262f52e49c3e13ee14d8"
integrity sha512-cqwOVYT3qXyLiGw7ueU2kX9noE8DPGRY6z8eUxudhXY8NZ7DMKYAxyZkLSevGfhCX3dO/AoX5/SO9lAzfjon0Q==
dependencies:
mongoose "*"

"@types/mysql@^2.15.21":
version "2.15.21"
resolved "https://registry.yarnpkg.com/@types/mysql/-/mysql-2.15.21.tgz#7516cba7f9d077f980100c85fd500c8210bd5e45"
Expand Down Expand Up @@ -1638,7 +1631,7 @@ bson@^1.1.4:
resolved "https://registry.yarnpkg.com/bson/-/bson-1.1.6.tgz#fb819be9a60cd677e0853aee4ca712a785d6618a"
integrity sha512-EvVNVeGo4tHxwi8L6bPj3y3itEvStdwvvlojVxxbyYfoaxJ6keLgrTuKdyfEAszFK+H3olzBuafE0yoh0D1gdg==

bson@^4.6.2, bson@^4.6.3:
bson@^4.6.3:
version "4.7.2"
resolved "https://registry.yarnpkg.com/bson/-/bson-4.7.2.tgz#320f4ad0eaf5312dd9b45dc369cc48945e2a5f2e"
integrity sha512-Ry9wCtIZ5kGqkJoi6aD8KjxFZEx78guTQDnpXWiNthsxzrxAK/i8E6pCHAIZTbaEFWcOCvbecMukfK7XUvyLpQ==
Expand Down Expand Up @@ -4992,11 +4985,6 @@ kareem@2.3.2:
resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.3.2.tgz#78c4508894985b8d38a0dc15e1a8e11078f2ca93"
integrity sha512-STHz9P7X2L4Kwn72fA4rGyqyXdmrMSdxqHx9IXon/FXluXieaFA6KJ2upcHAHxQPQ0LeM/OjLrhFxifHewOALQ==

kareem@2.4.1:
version "2.4.1"
resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.4.1.tgz#7d81ec518204a48c1cb16554af126806c3cd82b0"
integrity sha512-aJ9opVoXroQUPfovYP5kaj2lM7Jn02Gw13bL0lg9v0V7SaUc0qavPs0Eue7d2DcC3NjqI6QAUElXNsuZSeM+EA==

kareem@2.5.1:
version "2.5.1"
resolved "https://registry.yarnpkg.com/kareem/-/kareem-2.5.1.tgz#7b8203e11819a8e77a34b3517d3ead206764d15d"
Expand Down Expand Up @@ -5559,18 +5547,6 @@ mongodb@3.7.4:
optionalDependencies:
saslprep "^1.0.0"

mongodb@4.7.0, mongodb@^4.5.0, mongodb@^4.6.0:
version "4.7.0"
resolved "https://registry.yarnpkg.com/mongodb/-/mongodb-4.7.0.tgz#99f7323271d93659067695b60e7b4efee2de9bf0"
integrity sha512-HhVar6hsUeMAVlIbwQwWtV36iyjKd9qdhY+s4wcU8K6TOj4Q331iiMy+FoPuxEntDIijTYWivwFJkLv8q/ZgvA==
dependencies:
bson "^4.6.3"
denque "^2.0.1"
mongodb-connection-string-url "^2.5.2"
socks "^2.6.2"
optionalDependencies:
saslprep "^1.0.3"

mongodb@6.2.0:
version "6.2.0"
resolved "https://registry.yarnpkg.com/mongodb/-/mongodb-6.2.0.tgz#2c9dcb3eeaf528ed850e94b3df392de6c6b0d7ab"
Expand All @@ -5593,6 +5569,18 @@ mongodb@^3.6.9:
optionalDependencies:
saslprep "^1.0.0"

mongodb@^4.5.0, mongodb@^4.6.0:
version "4.7.0"
resolved "https://registry.yarnpkg.com/mongodb/-/mongodb-4.7.0.tgz#99f7323271d93659067695b60e7b4efee2de9bf0"
integrity sha512-HhVar6hsUeMAVlIbwQwWtV36iyjKd9qdhY+s4wcU8K6TOj4Q331iiMy+FoPuxEntDIijTYWivwFJkLv8q/ZgvA==
dependencies:
bson "^4.6.3"
denque "^2.0.1"
mongodb-connection-string-url "^2.5.2"
socks "^2.6.2"
optionalDependencies:
saslprep "^1.0.3"

mongoose-int32@^0.1.0:
version "0.1.0"
resolved "https://registry.yarnpkg.com/mongoose-int32/-/mongoose-int32-0.1.0.tgz#2c8e138e99fa1b44e2470776177a76529a244303"
Expand All @@ -5610,19 +5598,6 @@ mongoose-types@^1.0.3:
dependencies:
mongoose ">= 1.0.16"

mongoose@*:
version "6.4.4"
resolved "https://registry.yarnpkg.com/mongoose/-/mongoose-6.4.4.tgz#4e22a36373d8a867ee8f73063d8b31f1e451316d"
integrity sha512-r6sp96veRNhNIWFtHHe4Lqak+ilgiExYnnMLhYTGdzjIMR90G1ayx0JKFVdHuC6dKNHGFX0ETJGbf36N8Romjg==
dependencies:
bson "^4.6.2"
kareem "2.4.1"
mongodb "4.7.0"
mpath "0.9.0"
mquery "4.0.3"
ms "2.1.3"
sift "16.0.0"

mongoose@=4.11.14:
version "4.11.14"
resolved "https://registry.yarnpkg.com/mongoose/-/mongoose-4.11.14.tgz#b85402aaf28c5c3e45c8ef93fe69544eaa5d00f3"
Expand Down Expand Up @@ -5715,13 +5690,6 @@ mquery@3.2.5:
safe-buffer "5.1.2"
sliced "1.0.1"

mquery@4.0.3:
version "4.0.3"
resolved "https://registry.yarnpkg.com/mquery/-/mquery-4.0.3.tgz#4d15f938e6247d773a942c912d9748bd1965f89d"
integrity sha512-J5heI+P08I6VJ2Ky3+33IpCdAvlYGTSUjwTPxkAr8i8EoduPMBX2OY/wa3IKZIQl7MU4SbFk8ndgSKyB/cl1zA==
dependencies:
debug "4.x"

mquery@5.0.0:
version "5.0.0"
resolved "https://registry.yarnpkg.com/mquery/-/mquery-5.0.0.tgz#a95be5dfc610b23862df34a47d3e5d60e110695d"
Expand Down Expand Up @@ -7154,11 +7122,6 @@ sift@13.5.2:
resolved "https://registry.yarnpkg.com/sift/-/sift-13.5.2.tgz#24a715e13c617b086166cd04917d204a591c9da6"
integrity sha512-+gxdEOMA2J+AI+fVsCqeNn7Tgx3M9ZN9jdi95939l1IJ8cZsqS8sqpJyOkic2SJk+1+98Uwryt/gL6XDaV+UZA==

sift@16.0.0:
version "16.0.0"
resolved "https://registry.yarnpkg.com/sift/-/sift-16.0.0.tgz#447991577db61f1a8fab727a8a98a6db57a23eb8"
integrity sha512-ILTjdP2Mv9V1kIxWMXeMTIRbOBrqKc4JAXmFMnFq3fKeyQ2Qwa3Dw1ubcye3vR+Y6ofA0b9gNDr/y2t6eUeIzQ==

sift@16.0.1:
version "16.0.1"
resolved "https://registry.yarnpkg.com/sift/-/sift-16.0.1.tgz#e9c2ccc72191585008cf3e36fc447b2d2633a053"
Expand Down

0 comments on commit 2120817

Please sign in to comment.