Skip to content

Commit

Permalink
WIP - #340 - selective imports
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Feb 21, 2022
1 parent 4daa396 commit acdb772
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 154 deletions.
129 changes: 109 additions & 20 deletions src/vaults/VaultInternal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,28 @@ import type {
VaultName,
VaultRef,
} from './types';
import type { KeyManager } from '../keys';
import type KeyManager from '../keys/KeyManager';
import type { NodeId, NodeIdEncoded } from '../nodes/types';
import type { NodeConnectionManager } from '../nodes';
import type { ResourceAcquire } from '../utils';
import type NodeConnectionManager from '../nodes/NodeConnectionManager';
import type { ResourceAcquire } from '../utils/context';
import type GRPCClientAgent from '../agent/GRPCClientAgent';
import type { POJO } from '../types';
import path from 'path';
import git from 'isomorphic-git';
import grpc from '@grpc/grpc-js';
import Logger from '@matrixai/logger';
import {
CreateDestroyStartStop,
ready,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import * as vaultsUtils from './utils';
import * as vaultsErrors from './errors';
import { RWLock, withF, withG } from '../utils';
import { utils as nodesUtils } from '../nodes';

// TODO: Update creation of metadata.
// the use of the remote field needs to be updated so that it is using the
// remote location instead of a boolean. the remote also needs to be properly
// set, likely after calling `start()` in cloneVaultInternal.
import * as vaultsUtils from './utils';
import * as nodesUtils from '../nodes/utils';
import * as validationUtils from '../validation/utils';
import { withF, withG } from '../utils/context';
import { RWLock } from '../utils/locks';
import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb';
import * as vaultsPB from '../proto/js/polykey/v1/vaults/vaults_pb';

// TODO: this might be temp?
export type RemoteInfo = {
Expand Down Expand Up @@ -117,9 +119,6 @@ class VaultInternal {
});
// This error flag will contain the error returned by the cloning grpc stream
let error;
// Let vaultName, remoteVaultId;
const thisNodeId = keyManager.getNodeId();

// Make the directory where the .git files will be auto generated and
// where the contents will be cloned to ('contents' file)
await efs.mkdir(vault.vaultDataDir, { recursive: true });
Expand All @@ -129,9 +128,8 @@ class VaultInternal {
targetNodeId,
async (connection) => {
const client = connection.getClient();
const [request, vaultName, remoteVaultId] = await vaultsUtils.request(
const [request, vaultName, remoteVaultId] = await vault.request(
client,
thisNodeId,
targetVaultNameOrId,
);
await git.clone({
Expand Down Expand Up @@ -622,8 +620,6 @@ class VaultInternal {
// Keeps track of whether the metadata needs changing to avoid unnecessary db ops
// 0 = no change, 1 = change with vault Id, 2 = change with vault name
let metaChange = 0;
const thisNodeId = this.keyManager.getNodeId();
// TODO: get proper metadata.
const remoteInfo = await this.db.get<RemoteInfo>(
this.vaultMetadataDbDomain,
VaultInternal.remoteKey,
Expand Down Expand Up @@ -657,9 +653,8 @@ class VaultInternal {
pullNodeId!,
async (connection) => {
const client = connection.getClient();
const [request, , remoteVaultId] = await vaultsUtils.request(
const [request, , remoteVaultId] = await this.request(
client,
thisNodeId,
pullVaultNameOrId!,
);
await git.pull({
Expand Down Expand Up @@ -805,6 +800,100 @@ class VaultInternal {
}
return commitIdLatest;
}

protected async request(
client: GRPCClientAgent,
vaultNameOrId: VaultId | VaultName,
): Promise<any[]> {
const requestMessage = new vaultsPB.InfoRequest();
const vaultMessage = new vaultsPB.Vault();
const nodeMessage = new nodesPB.Node();
nodeMessage.setNodeId(nodesUtils.encodeNodeId(this.keyManager.getNodeId()));
requestMessage.setAction('clone');
if (typeof vaultNameOrId === 'string') {
vaultMessage.setNameOrId(vaultNameOrId);
} else {
// To have consistency between GET and POST, send the user
// readable form of the vault Id
vaultMessage.setNameOrId(vaultsUtils.encodeVaultId(vaultNameOrId));
}
requestMessage.setVault(vaultMessage);
requestMessage.setNode(nodeMessage);
const response = client.vaultsGitInfoGet(requestMessage);
let vaultName, remoteVaultId;
response.stream.on('metadata', async (meta) => {
// Receive the Id of the remote vault
vaultName = meta.get('vaultName').pop();
if (vaultName) vaultName = vaultName.toString();
const vId = meta.get('vaultId').pop();
if (vId) remoteVaultId = validationUtils.parseVaultId(vId.toString());
});
// Collect the response buffers from the GET request
const infoResponse: Uint8Array[] = [];
for await (const resp of response) {
infoResponse.push(resp.getChunk_asU8());
}
const metadata = new grpc.Metadata();
if (typeof vaultNameOrId === 'string') {
metadata.set('vaultNameOrId', vaultNameOrId);
} else {
// Metadata only accepts the user readable form of the vault Id
// as the string form has illegal characters
metadata.set('vaultNameOrId', vaultsUtils.encodeVaultId(vaultNameOrId));
}
return [
async function ({
url,
method = 'GET',
headers = {},
body = [Buffer.from('')],
}: {
url: string;
method: string;
headers: POJO;
body: Buffer[];
}) {
if (method === 'GET') {
// Send back the GET request info response
return {
url: url,
method: method,
body: infoResponse,
headers: headers,
statusCode: 200,
statusMessage: 'OK',
};
} else if (method === 'POST') {
const responseBuffers: Array<Uint8Array> = [];
const stream = client.vaultsGitPackGet(metadata);
const chunk = new vaultsPB.PackChunk();
// Body is usually an async generator but in the cases we are using,
// only the first value is used
chunk.setChunk(body[0]);
// Tell the server what commit we need
await stream.write(chunk);
let packResponse = (await stream.read()).value;
while (packResponse != null) {
responseBuffers.push(packResponse.getChunk_asU8());
packResponse = (await stream.read()).value;
}
return {
url: url,
method: method,
body: responseBuffers,
headers: headers,
statusCode: 200,
statusMessage: 'OK',
};
} else {
// FIXME: proper error
throw new Error('Method not supported');
}
},
vaultName,
remoteVaultId,
];
}
}

export default VaultInternal;
29 changes: 15 additions & 14 deletions src/vaults/VaultManager.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,38 @@
import type { DB, DBDomain, DBLevel } from '@matrixai/db';
import type { VaultId, VaultName, VaultActions, VaultIdString } from './types';
import type { Vault } from './Vault';

import type { FileSystem } from '../types';
import type { PolykeyWorkerManagerInterface } from '../workers/types';
import type { NodeId } from '../nodes/types';

import type { KeyManager } from '../keys';
import type { NodeConnectionManager } from '../nodes';
import type { GestaltGraph } from '../gestalts';
import type { ACL } from '../acl';
import type { NotificationsManager } from '../notifications';
import type NodeConnectionManager from '../nodes/NodeConnectionManager';
import type GestaltGraph from '../gestalts/GestaltGraph';
import type ACL from '../acl/ACL';
import type NotificationsManager from '../notifications/NotificationsManager';

import type { RemoteInfo } from './VaultInternal';
import type { ResourceAcquire } from '../utils';
import type { ResourceAcquire } from '../utils/context';
import path from 'path';
import { PassThrough } from 'readable-stream';
import { EncryptedFS, errors as encryptedfsErrors } from 'encryptedfs';
import { EncryptedFS, errors as encryptedFsErrors } from 'encryptedfs';
import Logger from '@matrixai/logger';
import {
CreateDestroyStartStop,
ready,
} from '@matrixai/async-init/dist/CreateDestroyStartStop';
import { IdInternal, utils as idUtils } from '@matrixai/id';
import { IdInternal } from '@matrixai/id';
import VaultInternal from './VaultInternal';
import * as vaultsUtils from '../vaults/utils';
import * as vaultsErrors from '../vaults/errors';
import * as gitUtils from '../git/utils';
import * as gitErrors from '../git/errors';
import { utils as nodesUtils } from '../nodes';
import { utils as keysUtils } from '../keys';
import * as nodesUtils from '../nodes/utils';
import * as keysUtils from '../keys/utils';
import * as validationUtils from '../validation/utils';
import config from '../config';
import { mkdirExists, RWLock, withF, withG } from '../utils';
import { mkdirExists } from '../utils/utils';
import { RWLock } from '../utils/locks';
import { withF, withG } from '../utils/context';
import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb';

/**
Expand Down Expand Up @@ -196,7 +196,7 @@ class VaultManager {
logger: this.logger.getChild('EncryptedFileSystem'),
});
} catch (e) {
if (e instanceof encryptedfsErrors.ErrorEncryptedFSKey) {
if (e instanceof encryptedFsErrors.ErrorEncryptedFSKey) {
throw new vaultsErrors.ErrorVaultManagerKey();
}
throw new vaultsErrors.ErrorVaultManagerEFS(e.message, {
Expand Down Expand Up @@ -737,10 +737,11 @@ class VaultManager {
}

@ready(new vaultsErrors.ErrorVaultManagerNotRunning())
// TODO: write a test for this, check if it actually handles conflicts
protected async generateVaultId(): Promise<VaultId> {
let vaultId = vaultsUtils.generateVaultId();
let i = 0;
while (await this.efs.exists(idUtils.toString(vaultId))) {
while (await this.efs.exists(vaultsUtils.encodeVaultId(vaultId))) {
i++;
if (i > 50) {
throw new vaultsErrors.ErrorVaultsCreateVaultId(
Expand Down
101 changes: 1 addition & 100 deletions src/vaults/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import type {
VaultRef,
VaultAction,
CommitId,
VaultName,
} from './types';
import type { GRPCClientAgent } from '../agent';
import type { NodeId } from '../nodes/types';

import { IdInternal, IdRandom, utils as idUtils } from '@matrixai/id';
import { IdInternal, IdRandom } from '@matrixai/id';
import { tagLast, refs, vaultActions } from './types';
import * as nodesUtils from '../nodes/utils';

Expand Down Expand Up @@ -90,102 +88,6 @@ async function* readdirRecursively(fs, dir = '.') {
// }
}

// TODO: Is this being removed or moved?
async function request(
client: GRPCClientAgent,
nodeId: NodeId,
vaultNameOrId: VaultId | VaultName,
): Promise<any[]> {
throw Error('Not Implemented');
// Const requestMessage = new vaultsPB.InfoRequest();
// const vaultMessage = new vaultsPB.Vault();
// const nodeMessage = new nodesPB.Node();
// nodeMessage.setNodeId(nodeId);
// requestMessage.setAction('clone');
// if (typeof vaultNameOrId === 'string') {
// vaultMessage.setNameOrId(vaultNameOrId);
// } else {
// // To have consistency between GET and POST, send the user
// // readable form of the vault Id
// vaultMessage.setNameOrId(makeVaultIdPretty(vaultNameOrId));
// }
// requestMessage.setVault(vaultMessage);
// requestMessage.setNode(nodeMessage);
// const response = client.vaultsGitInfoGet(requestMessage);
// let vaultName, remoteVaultId;
// response.stream.on('metadata', async (meta) => {
// // Receive the Id of the remote vault
// vaultName = meta.get('vaultName').pop();
// if (vaultName) vaultName = vaultName.toString();
// const vId = meta.get('vaultId').pop();
// if (vId) remoteVaultId = makeVaultId(vId.toString());
// });
// // Collet the response buffers from the GET request
// const infoResponse: Uint8Array[] = [];
// for await (const resp of response) {
// infoResponse.push(resp.getChunk_asU8());
// }
// const metadata = new grpc.Metadata();
// if (typeof vaultNameOrId === 'string') {
// metadata.set('vaultNameOrId', vaultNameOrId);
// } else {
// // Metadata only accepts the user readable form of the vault Id
// // as the string form has illegal characters
// metadata.set('vaultNameOrId', makeVaultIdPretty(vaultNameOrId));
// }
// return [
// async function ({
// url,
// method = 'GET',
// headers = {},
// body = [Buffer.from('')],
// }: {
// url: string;
// method: string;
// headers: POJO;
// body: Buffer[];
// }) {
// if (method === 'GET') {
// // Send back the GET request info response
// return {
// url: url,
// method: method,
// body: infoResponse,
// headers: headers,
// statusCode: 200,
// statusMessage: 'OK',
// };
// } else if (method === 'POST') {
// const responseBuffers: Array<Uint8Array> = [];
// const stream = client.vaultsGitPackGet(metadata);
// const chunk = new vaultsPB.PackChunk();
// // Body is usually an async generator but in the cases we are using,
// // only the first value is used
// chunk.setChunk(body[0]);
// // Tell the server what commit we need
// await stream.write(chunk);
// let packResponse = (await stream.read()).value;
// while (packResponse != null) {
// responseBuffers.push(packResponse.getChunk_asU8());
// packResponse = (await stream.read()).value;
// }
// return {
// url: url,
// method: method,
// body: responseBuffers,
// headers: headers,
// statusCode: 200,
// statusMessage: 'OK',
// };
// } else {
// throw new Error('Method not supported');
// }
// },
// vaultName,
// remoteVaultId,
// ];
}

function isVaultAction(action: any): action is VaultAction {
if (typeof action !== 'string') return false;
return (vaultActions as Readonly<Array<string>>).includes(action);
Expand All @@ -202,6 +104,5 @@ export {
validateCommitId,
commitAuthor,
isVaultAction,
request,
readdirRecursively,
};
Loading

0 comments on commit acdb772

Please sign in to comment.