diff --git a/lib/core/bucketEntries/usecase.ts b/lib/core/bucketEntries/usecase.ts index c1bd76db..ec99922f 100644 --- a/lib/core/bucketEntries/usecase.ts +++ b/lib/core/bucketEntries/usecase.ts @@ -168,7 +168,7 @@ export class BucketEntriesUsecase { const shards = await this.shardsRepository.findByIds(shardIds); if (shards.length > 0) { - await this.shardsUsecase.deleteShardsStorageByUuids(shards.map(s => ({ uuid: s.uuid!, hash: s.hash }))); + await this.shardsUsecase.deleteShardsStorageByUuids(shards as any); await this.shardsRepository.deleteByIds(shards.map(s => s.id)); } diff --git a/lib/core/shards/usecase.ts b/lib/core/shards/usecase.ts index c88f7fa3..71567276 100644 --- a/lib/core/shards/usecase.ts +++ b/lib/core/shards/usecase.ts @@ -2,19 +2,52 @@ import { MirrorsRepository } from '../mirrors/Repository'; import NetworkMessageQueue from "../../server/queues/networkQueue"; import { DELETING_FILE_MESSAGE } from "../../server/queues/messageTypes"; import log from '../../logger'; +import { ContactsRepository } from '../contacts/Repository'; +import { Contact } from '../contacts/Contact'; +import { MirrorWithContact } from '../mirrors/Mirror'; export class ShardsUsecase { constructor( - private mirrorsRepository: MirrorsRepository, - private networkQueue: NetworkMessageQueue + private readonly mirrorsRepository: MirrorsRepository, + private readonly contactsRepository: ContactsRepository, + private readonly networkQueue: NetworkMessageQueue ) {} - async deleteShardsStorageByUuids(shards: { hash: string, uuid: string }[]) { + async deleteShardsStorageByUuids(shards: { + hash: string, + uuid: string, + contracts: ({ nodeID: Contact['id'] })[] + }[]) { const mirrors = await this.mirrorsRepository.findByShardHashesWithContacts(shards.map(s => s.hash)); const stillExistentMirrors = mirrors.filter((mirror) => { return mirror.contact && mirror.contact.address && mirror.contact.port; }); + const noMirrors = stillExistentMirrors.length === 0; + + if (noMirrors) { + const contactIdsWithShardsHashes = shards.flatMap((s) => + s.contracts.map(c => ({ nodeID: c.nodeID, shardHash: s.hash, uuid: s.uuid })) + ); + + const contacts = await this.contactsRepository.findByIds( + contactIdsWithShardsHashes.map(c => c.nodeID) + ); + + for (const shard of shards) { + const contactsForGivenShard = contactIdsWithShardsHashes.filter((contactWHash) => { + return contactWHash.shardHash === shard.hash + }); + for (const mirror of contactsForGivenShard) { + stillExistentMirrors.push({ + id: '000000000000000000000000', + contact: contacts.find(c => c.id === mirror.nodeID) as Contact, + shardHash: mirror.shardHash + } as MirrorWithContact); + } + } + } + for (const { contact, shardHash } of stillExistentMirrors) { const { address, port } = contact; const { uuid } = (shards.find(s => s.hash === shardHash) as { hash: string, uuid: string }); @@ -35,7 +68,7 @@ export class ShardsUsecase { }) } - if (stillExistentMirrors.length > 0) { + if (!noMirrors && stillExistentMirrors.length > 0) { log.info('Deleting still existent mirrors (by uuids): %s from hashes: %s', stillExistentMirrors.map(m => m.id).toString(), shards.toString()); await this.mirrorsRepository.deleteByIds(stillExistentMirrors.map(m => m.id)); diff --git a/lib/server/http/index.ts b/lib/server/http/index.ts index 1024d8c5..27092b6d 100644 --- a/lib/server/http/index.ts +++ b/lib/server/http/index.ts @@ -91,6 +91,7 @@ export function bindNewRoutes( const shardsUsecase = new ShardsUsecase( mirrorsRepository, + contactsRepository, networkQueue ); diff --git a/tests/lib/core/bucketentries/usecase.test.ts b/tests/lib/core/bucketentries/usecase.test.ts index 6a0cb573..fb5c834f 100644 --- a/tests/lib/core/bucketentries/usecase.test.ts +++ b/tests/lib/core/bucketentries/usecase.test.ts @@ -24,6 +24,8 @@ import { ShardsUsecase } from '../../../../lib/core/shards/usecase'; import fixtures from '../fixtures'; import { BucketEntry } from '../../../../lib/core/bucketEntries/BucketEntry'; import { Bucket } from '../../../../lib/core/buckets/Bucket'; +import { ContactsRepository } from '../../../../lib/core/contacts/Repository'; +import { MongoDBContactsRepository } from '../../../../lib/core/contacts/MongoDBContactsRepository'; describe('BucketEntriesUsecase', function () { const bucketId = 'bucketIdSAMPLE'; @@ -38,6 +40,7 @@ describe('BucketEntriesUsecase', function () { let pointersRepository: PointersRepository = new MongoDBPointersRepository({}); let usersRepository: UsersRepository = new MongoDBUsersRepository({}); let bucketEntryShardsRepository: BucketEntryShardsRepository = new MongoDBBucketEntryShardsRepository({}); + let contactsRepository: ContactsRepository = new MongoDBContactsRepository({}); let networkQueue: any = { enqueueMessage: (message: any) => {} @@ -45,6 +48,7 @@ describe('BucketEntriesUsecase', function () { let shardsUseCase = new ShardsUsecase( mirrorsRepository, + contactsRepository, networkQueue, ); @@ -67,9 +71,11 @@ describe('BucketEntriesUsecase', function () { shardsRepository = new MongoDBShardsRepository({}); bucketsRepository = new MongoDBBucketsRepository({}); pointersRepository = new MongoDBPointersRepository({}); + contactsRepository = new MongoDBContactsRepository({}); shardsUseCase = new ShardsUsecase( mirrorsRepository, + contactsRepository, networkQueue, ); @@ -278,7 +284,9 @@ describe('BucketEntriesUsecase', function () { expect(findShardsStub.calledWith(shards.map(b => b.id))); expect(deleteShardsStorageStub.calledOnce).toBeTruthy(); - expect(deleteShardsStorageStub.calledWith(shards.map(s => ({ hash: s.hash, uuid: (s.uuid as string) })))) + expect(deleteShardsStorageStub.calledWith( + shards.map(s => ({ hash: s.hash, uuid: (s.uuid as string), contracts: s.contracts }))) + ) expect(deleteShardsStub.calledOnce).toBeTruthy(); expect(deleteShardsStub.calledWith(shards.map(s => s.id))); diff --git a/tests/lib/core/fixtures.ts b/tests/lib/core/fixtures.ts index 15ace86c..7408656e 100644 --- a/tests/lib/core/fixtures.ts +++ b/tests/lib/core/fixtures.ts @@ -7,8 +7,9 @@ import { Frame } from '../../../lib/core/frames/Frame'; import { BucketEntryShard } from '../../../lib/core/bucketEntryShards/BucketEntryShard'; import { Bucket } from '../../../lib/core/buckets/Bucket'; import { User } from '../../../lib/core/users/User'; -import { Shard } from '../../../lib/core/shards/Shard'; +import { Shard, Contract } from '../../../lib/core/shards/Shard'; import { Contact } from '../../../lib/core/contacts/Contact'; +import { Mirror } from '../../../lib/core/mirrors/Mirror'; function getBucketEntriesWithFrames(fileIds?: string[]): BucketEntryWithFrame[] { const ids = fileIds ?? [v4()]; @@ -156,7 +157,7 @@ function getShard(custom?: Partial, contactId?: Contact['id']): Shard { function getContact(custom?: Partial): Contact { const defaultContact: Contact = { - address: `http://${randomBytes(10).toString('hex')}.com`, + address: `${randomBytes(10).toString('hex')}.com`, id: v4(), ip: 'http://1.1.1.1', lastSeen: new Date(), @@ -174,6 +175,30 @@ function getContact(custom?: Partial): Contact { return { ...defaultContact, ...custom }; } +function getContract(custom?: Partial): Contract { + return { + version: 1, + farmer_id: randomBytes(40).toString('hex'), + data_size: Math.trunc(Math.random() * 1000), + data_hash: randomBytes(40).toString('hex'), + store_begin: new Date(), + ...custom + } +}; + +function getMirror(custom?: Partial): Mirror { + return { + id: v4(), + shardHash: randomBytes(40).toString('hex'), + contact: v4(), + token: v4(), + isEstablished: true, + contract: getContract(), + created: new Date(), + ...custom + }; +} + export default { getBucketEntriesWithFrames, getBucketEntriesWithoutFrames, @@ -184,5 +209,7 @@ export default { getBucket, getUser, getShard, - getContact + getContact, + getContract, + getMirror }; diff --git a/tests/lib/core/shards/usecase.test.ts b/tests/lib/core/shards/usecase.test.ts new file mode 100644 index 00000000..2193fc36 --- /dev/null +++ b/tests/lib/core/shards/usecase.test.ts @@ -0,0 +1,137 @@ +import { restore, stub } from 'sinon'; +import fixtures from '../fixtures'; + +import { MirrorsRepository } from '../../../../lib/core/mirrors/Repository'; +import { ShardsUsecase } from '../../../../lib/core/shards/usecase'; +import { MongoDBMirrorsRepository } from '../../../../lib/core/mirrors/MongoDBMirrorsRepository'; +import { MongoDBContactsRepository } from '../../../../lib/core/contacts/MongoDBContactsRepository'; +import { ContactsRepository } from '../../../../lib/core/contacts/Repository'; +import NetworkMessageQueue from '../../../../lib/server/queues/networkQueue'; +import { DELETING_FILE_MESSAGE } from '../../../../lib/server/queues/messageTypes'; +import { Shard } from '../../../../lib/core/shards/Shard'; +import { MirrorWithContact } from '../../../../lib/core/mirrors/Mirror'; + +describe('ShardsUsecase', () => { + let mirrorsRepository: MirrorsRepository = new MongoDBMirrorsRepository({}); + let contactsRepository: ContactsRepository = new MongoDBContactsRepository({}); + const queue = new NetworkMessageQueue({ + connection: { + url: `amqp://fake@fake`, + }, + exchange: { + name: 'exchangeName', + type: 'direct', + }, + queue: { + name: 'fake_name', + }, + routingKey: { + name: 'routingKeyName', + }, + }); + + let usecase = new ShardsUsecase(mirrorsRepository, contactsRepository, queue); + + beforeEach(() => { + mirrorsRepository = new MongoDBMirrorsRepository({}); + contactsRepository = new MongoDBContactsRepository({}); + + usecase = new ShardsUsecase( + mirrorsRepository, + contactsRepository, + queue, + ); + + restore(); + }); + + describe('deleteShardsStorageByUuids()', () => { + it('When mirrors exist, then it deletes them properly', async () => { + const shardsToDelete = [fixtures.getShard(), fixtures.getShard()]; + const [firstShard, secondShard] = shardsToDelete; + const contacts = shardsToDelete.map(s => fixtures.getContact({ id: s.contracts[0].nodeID })) + const mirrors: MirrorWithContact[] = contacts.map((c, i) => ({ + ...fixtures.getMirror(), + shardHash: shardsToDelete[i].hash, + contact: c, + })); + const [firstMirror, secondMirror] = mirrors; + + const findByShardHashes = stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); + const findContactsByIds = stub(contactsRepository, 'findByIds').resolves(); + const enqueueMessage = stub(queue, 'enqueueMessage').resolves(); + const deleteMirrorsByIds = stub(mirrorsRepository, 'deleteByIds').resolves(); + + await usecase.deleteShardsStorageByUuids( + shardsToDelete as (Shard & { uuid: string })[] + ); + + expect(findByShardHashes.calledOnce).toBeTruthy(); + expect(findByShardHashes.firstCall.args).toStrictEqual([shardsToDelete.map(s => s.hash)]); + expect(findContactsByIds.notCalled).toBeTruthy(); + expect(enqueueMessage.callCount).toEqual(mirrors.length); + expect(enqueueMessage.firstCall.args[0]).toEqual({ + type: DELETING_FILE_MESSAGE, + payload: { + key: firstShard.uuid, + hash: firstShard.uuid, + url: `http://${firstMirror.contact.address}:${firstMirror.contact.port}/v2/shards/${firstShard.uuid}` + } + }) + expect(enqueueMessage.secondCall.args[0]).toEqual({ + type: DELETING_FILE_MESSAGE, + payload: { + key: secondShard.uuid, + hash: secondShard.uuid, + url: `http://${secondMirror.contact.address}:${secondMirror.contact.port}/v2/shards/${secondShard.uuid}` + } + }); + expect(deleteMirrorsByIds.calledOnce).toBeTruthy(); + expect(deleteMirrorsByIds.firstCall.args).toStrictEqual([mirrors.map(m => m.id)]); + }); + + it('When mirrors do not exist, then uses contracts as fallback to delete shards', async () => { + const shardsToDelete = [fixtures.getShard(), fixtures.getShard()]; + const [firstShard, secondShard] = shardsToDelete; + const contacts = shardsToDelete.map(s => fixtures.getContact({ id: s.contracts[0].nodeID })) + const [firstContact, secondContact] = contacts; + const mirrors: MirrorWithContact[] = []; + + const findByShardHashes = stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); + const findContactsByIds = stub(contactsRepository, 'findByIds').resolves(contacts); + const enqueueMessage = stub(queue, 'enqueueMessage').resolves(); + const deleteMirrorsByIds = stub(mirrorsRepository, 'deleteByIds').resolves(); + + await usecase.deleteShardsStorageByUuids( + shardsToDelete as (Shard & { uuid: string })[] + ); + + expect(findByShardHashes.calledOnce).toBeTruthy(); + expect(findByShardHashes.firstCall.args).toStrictEqual([shardsToDelete.map(s => s.hash)]); + expect(findContactsByIds.calledOnce).toBeTruthy(); + expect(findContactsByIds.firstCall.args).toStrictEqual([ + shardsToDelete.flatMap(s => s.contracts.flatMap(c => c.nodeID)) + ]); + expect(enqueueMessage.callCount).toEqual( + shardsToDelete.reduce((a, s) => a + s.contracts.length, 0) + ); + expect(enqueueMessage.firstCall.args[0]).toEqual({ + type: DELETING_FILE_MESSAGE, + payload: { + key: firstShard.uuid, + hash: firstShard.uuid, + url: `http://${firstContact.address}:${firstContact.port}/v2/shards/${firstShard.uuid}` + } + }) + expect(enqueueMessage.secondCall.args[0]).toEqual({ + type: DELETING_FILE_MESSAGE, + payload: { + key: secondShard.uuid, + hash: secondShard.uuid, + url: `http://${secondContact.address}:${secondContact.port}/v2/shards/${secondShard.uuid}` + } + }); + expect(deleteMirrorsByIds.notCalled).toBeTruthy(); + }); + }); +});