Skip to content

Commit

Permalink
Merge branch 'post_updates'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rinse12 committed Dec 8, 2023
2 parents c854936 + 0ce671a commit 7421471
Show file tree
Hide file tree
Showing 46 changed files with 1,602 additions and 693 deletions.
356 changes: 334 additions & 22 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@
"@types/uuid": "8.3.4",
"assert": "2.0.0",
"async-wait-until": "2.0.12",
"better-sqlite3": "9.2.0",
"buffer": "6.0.3",
"captcha-canvas": "3.2.1",
"cbor": "9.0.1",
"debounce": "1.2.1",
"err-code": "3.0.1",
"ethers": "6.7.0",
"file-type": "16.5.4",
Expand Down
41 changes: 0 additions & 41 deletions src/challenge.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
import {
ChallengeAnswerMessageType,
ChallengeAnswersTableRowInsert,
ChallengeMessageType,
ChallengeRequestMessageType,
ChallengeRequestsTableRowInsert,
ChallengesTableRow,
ChallengesTableRowInsert,
ChallengeVerificationMessageType,
ChallengeVerificationsTableRowInsert,
DecryptedChallengeAnswerMessageType,
ProtocolVersion
} from "./types";
import lodash from "lodash";
import { Encrypted, PubsubSignature } from "./signer/constants";
import assert from "assert";

export class ChallengeRequestMessage implements ChallengeRequestMessageType {
encrypted: Encrypted;
Expand Down Expand Up @@ -47,15 +39,6 @@ export class ChallengeRequestMessage implements ChallengeRequestMessageType {
timestamp: this.timestamp
};
}

toJSONForDb(challengeAnswers: string[] | undefined, challengeCommentCids: string[] | undefined): ChallengeRequestsTableRowInsert {
return {
...lodash.omit(this.toJSON(), ["type", "encrypted"]),
acceptedChallengeTypes: Array.isArray(this.acceptedChallengeTypes) ? JSON.stringify(this.acceptedChallengeTypes) : undefined,
challengeAnswers: Array.isArray(challengeAnswers) ? JSON.stringify(challengeAnswers) : undefined,
challengeCommentCids: Array.isArray(challengeCommentCids) ? JSON.stringify(challengeCommentCids) : undefined
};
}
}

export class ChallengeMessage implements ChallengeMessageType {
Expand Down Expand Up @@ -88,15 +71,6 @@ export class ChallengeMessage implements ChallengeMessageType {
timestamp: this.timestamp
};
}

toJSONForDb(challengeTypes: ChallengesTableRow["challengeTypes"]): ChallengesTableRowInsert {
assert(Array.isArray(challengeTypes), `Challenge types need to be array, (${challengeTypes}) is not an array`);

const challengeTypesFormattedForDb = JSON.stringify(challengeTypes);
if (challengeTypesFormattedForDb === "[object Object]") throw Error(`challengeTypes shouldn't be [object Object]`);

return { ...lodash.omit(this.toJSON(), ["type", "encrypted"]), challengeTypes: challengeTypesFormattedForDb };
}
}

export class ChallengeAnswerMessage implements ChallengeAnswerMessageType {
Expand Down Expand Up @@ -128,14 +102,6 @@ export class ChallengeAnswerMessage implements ChallengeAnswerMessageType {
timestamp: this.timestamp
};
}

toJSONForDb(challengeAnswers: DecryptedChallengeAnswerMessageType["challengeAnswers"]): ChallengeAnswersTableRowInsert {
assert(Array.isArray(challengeAnswers), `Challenge answers need to be array, (${challengeAnswers}) is not an array`);
const challengeAnswersFormattedForDb = JSON.stringify(challengeAnswers);
if (challengeAnswersFormattedForDb === "[object Object]") throw Error(`challengeAnswers shouldn't be [object Object]`);

return { ...lodash.omit(this.toJSON(), ["type", "encrypted"]), challengeAnswers: challengeAnswersFormattedForDb };
}
}

export class ChallengeVerificationMessage implements ChallengeVerificationMessageType {
Expand Down Expand Up @@ -177,11 +143,4 @@ export class ChallengeVerificationMessage implements ChallengeVerificationMessag
timestamp: this.timestamp
};
}

toJSONForDb(): ChallengeVerificationsTableRowInsert {
return {
...lodash.omit(this.toJSON(), ["type", "encrypted"]),
challengeErrors: Array.isArray(this.challengeErrors) ? JSON.stringify(this.challengeErrors) : undefined
};
}
}
13 changes: 8 additions & 5 deletions src/clients/base-client-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ export class BaseClientsManager {
log.trace(`Fetching url (${url})`);

const timeBefore = Date.now();
const isCid = path.includes("/ipfs/"); // If false, then IPNS
const isCid = loadType === "comment" || loadType === "generic-ipfs"; // If false, then IPNS

this.preFetchGateway(gateway, path, loadType);
try {
const resText = await this._fetchWithLimit(url, { cache: isCid ? "force-cache" : "no-store", signal: abortController.signal });
if (isCid) await this._verifyContentIsSameAsCid(resText, path.split("/ipfs/")[1]);
this.postFetchGatewaySuccess(gateway, path, loadType);
const timeElapsedMs = Date.now() - timeBefore;
await this._plebbit.stats.recordGatewaySuccess(gateway, isCid ? "cid" : "ipns", timeElapsedMs);
await this._plebbit.stats.recordGatewaySuccess(gateway, isCid || loadType === "comment-update" ? "cid" : "ipns", timeElapsedMs);
return resText;
} catch (e) {
if (e?.details?.error?.type === "aborted") {
Expand Down Expand Up @@ -295,13 +295,16 @@ export class BaseClientsManager {
}
}

// TODO rename this to _fetchPathP2P
async _fetchCidP2P(cid: string): Promise<string> {
const ipfsClient = this.getDefaultIpfs();
const fileContent = await ipfsClient._client.cat(cid, { length: DOWNLOAD_LIMIT_BYTES }); // Limit is 1mb files
if (typeof fileContent !== "string") throwWithErrorCode("ERR_FAILED_TO_FETCH_IPFS_VIA_IPFS", { cid });
const calculatedCid: string = await Hash.of(fileContent);
if (fileContent.length === DOWNLOAD_LIMIT_BYTES && calculatedCid !== cid)
throwWithErrorCode("ERR_OVER_DOWNLOAD_LIMIT", { cid, downloadLimit: DOWNLOAD_LIMIT_BYTES });
if (fileContent.length === DOWNLOAD_LIMIT_BYTES) {
const calculatedCid: string = await Hash.of(fileContent);
if (calculatedCid !== cid) throwWithErrorCode("ERR_OVER_DOWNLOAD_LIMIT", { cid, downloadLimit: DOWNLOAD_LIMIT_BYTES });
}

return fileContent;
}

Expand Down
179 changes: 148 additions & 31 deletions src/clients/client-manager.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import Publication from "../publication";
import { Plebbit } from "../plebbit";
import { Comment } from "../comment";
import { throwWithErrorCode } from "../util";
import { getPostUpdateTimestampRange, throwWithErrorCode } from "../util";
import assert from "assert";
import { Chain, CommentIpfsType, CommentUpdate } from "../types";
import { Chain, CommentIpfsType, CommentIpfsWithCid, CommentUpdate, PageIpfs } from "../types";
import { Subplebbit } from "../subplebbit/subplebbit";
import { verifySubplebbit } from "../signer";
import lodash from "lodash";
Expand All @@ -20,14 +20,15 @@ import {
} from "./ipfs-gateway-client";

import { BaseClientsManager, LoadType } from "./base-client-manager";
import { subplebbitForPublishingCache } from "../constants";
import { commentPostUpdatesParentsPathConfig, postTimestampConfig, subplebbitForPublishingCache } from "../constants";
import {
CommentPlebbitRpcStateClient,
GenericPlebbitRpcStateClient,
PublicationPlebbitRpcStateClient,
SubplebbitPlebbitRpcStateClient
} from "./plebbit-rpc-state-client";
import { SubplebbitIpfsType } from "../subplebbit/types";
import Logger from "@plebbit/plebbit-logger";

export class ClientsManager extends BaseClientsManager {
protected _plebbit: Plebbit;
Expand Down Expand Up @@ -89,7 +90,7 @@ export class ClientsManager extends BaseClientsManager {
loadType === "subplebbit"
? this._getStatePriorToResolvingSubplebbitIpns()
: loadType === "comment-update"
? "fetching-update-ipns"
? "fetching-update-ipfs"
: loadType === "comment" || loadType === "generic-ipfs"
? "fetching-ipfs"
: undefined;
Expand Down Expand Up @@ -186,17 +187,6 @@ export class ClientsManager extends BaseClientsManager {
protected _getStatePriorToResolvingSubplebbitIpfs(): "fetching-subplebbit-ipfs" | "fetching-ipfs" {
return "fetching-subplebbit-ipfs";
}

async fetchSubplebbitIpns(ipnsAddress: string): Promise<string> {
if (this._defaultIpfsProviderUrl) {
this.updateIpfsState(this._getStatePriorToResolvingSubplebbitIpns());
const subCid = await this.resolveIpnsToCidP2P(ipnsAddress);
this.updateIpfsState(this._getStatePriorToResolvingSubplebbitIpfs());
const content = await this._fetchCidP2P(subCid);
this.updateIpfsState("stopped");
return content;
} else return this.fetchFromMultipleGateways({ ipns: ipnsAddress }, "subplebbit");
}
}

export class PublicationClientsManager extends ClientsManager {
Expand Down Expand Up @@ -259,8 +249,6 @@ export class PublicationClientsManager extends ClientsManager {
this._attemptingToResolve = false;
if (!subIpns) throw new PlebbitError("ERR_ENS_ADDRESS_HAS_NO_SUBPLEBBIT_ADDRESS_TEXT_RECORD", { ensAddress: subplebbitAddress });

assert(typeof subIpns === "string", `Failed to resolve subplebbit address (${subplebbitAddress})`);

this._publication._updatePublishingState("fetching-subplebbit-ipns");
let subJson: SubplebbitIpfsType;
if (this._defaultIpfsProviderUrl) {
Expand Down Expand Up @@ -320,21 +308,147 @@ export class CommentClientsManager extends PublicationClientsManager {
this.clients.plebbitRpcClients = { ...this.clients.plebbitRpcClients, [rpcUrl]: new CommentPlebbitRpcStateClient("stopped") };
}

async fetchCommentUpdate(ipnsName: string): Promise<CommentUpdate> {
this._comment._setUpdatingState("fetching-update-ipns");
// Resolver methods here
preResolveTextRecord(
address: string,
txtRecordName: "subplebbit-address" | "plebbit-author-address",
resolvedTextRecord: string,
chain: string
): void {
super.preResolveTextRecord(address, txtRecordName, resolvedTextRecord, chain);
if (txtRecordName === "subplebbit-address")
this._comment._setUpdatingState("resolving-subplebbit-address"); // Resolving for CommentUpdate
else if (txtRecordName === "plebbit-author-address") this._comment._setUpdatingState("resolving-author-address"); // Resolving for CommentIpfs
}

async _fetchSubplebbitForCommentUpdate() {
const subIpns = await this.resolveSubplebbitAddressIfNeeded(this._comment.subplebbitAddress);
if (!subIpns)
throw new PlebbitError("ERR_ENS_ADDRESS_HAS_NO_SUBPLEBBIT_ADDRESS_TEXT_RECORD", {
ensAddress: this._comment.subplebbitAddress
});

this._comment._setUpdatingState("fetching-subplebbit-ipns");
let subJson: SubplebbitIpfsType;
if (this._defaultIpfsProviderUrl) {
this.updateIpfsState("fetching-subplebbit-ipns");
const subplebbitCid = await this.resolveIpnsToCidP2P(subIpns);
this._comment._setUpdatingState("fetching-subplebbit-ipfs");
this.updateIpfsState("fetching-subplebbit-ipfs");
subJson = JSON.parse(await this._fetchCidP2P(subplebbitCid));
} else {
// States of gateways should be updated by fetchFromMultipleGateways
subJson = JSON.parse(await this.fetchFromMultipleGateways({ ipns: subIpns }, "subplebbit"));
}
subplebbitForPublishingCache.set(subJson.address, lodash.pick(subJson, ["encryption", "pubsubTopic", "address"]));
return subJson;
}

_findCommentInSubplebbitPosts(subIpns: SubplebbitIpfsType, cid: string) {
if (!subIpns.posts) return undefined;
const findInCommentAndChildren = (comment: PageIpfs["comments"][0]): PageIpfs["comments"][0]["comment"] => {
if (comment.comment.cid === cid) return comment.comment;
if (!comment.update.replies) return undefined;
for (const childComment of comment.update.replies.pages.new.comments) {
const commentInChild = findInCommentAndChildren(childComment);
if (commentInChild) return commentInChild;
}
return undefined;
};
for (const post of subIpns.posts.pages.hot.comments) {
const commentInChild = findInCommentAndChildren(post);
if (commentInChild) return commentInChild;
}
return undefined;
}

async _fetchParentCommentForCommentUpdate(parentCid: string) {
if (this._defaultIpfsProviderUrl) {
this.updateIpfsState("fetching-update-ipns");
const updateCid = await this.resolveIpnsToCidP2P(ipnsName);
this._comment._setUpdatingState("fetching-update-ipfs");
this.updateIpfsState("fetching-update-ipfs");
const commentUpdate: CommentUpdate = JSON.parse(await this._fetchCidP2P(updateCid));
this._comment._setUpdatingState("fetching-update-ipfs");
const commentContent: CommentIpfsWithCid = { cid: parentCid, ...JSON.parse(await this._fetchCidP2P(parentCid)) };
this.updateIpfsState("stopped");
return commentUpdate;
return commentContent;
} else {
// States of gateways should be updated by fetchFromMultipleGateways
const update: CommentUpdate = JSON.parse(await this.fetchFromMultipleGateways({ ipns: ipnsName }, "comment-update"));
return update;
const commentContent: CommentIpfsWithCid = {
cid: parentCid,
...JSON.parse(await this.fetchFromMultipleGateways({ cid: parentCid }, "comment"))
};
return commentContent;
}
}

async _getParentsPath(subIpns: SubplebbitIpfsType): Promise<string> {
const parentsPathCache = await this._plebbit.createStorageLRU(commentPostUpdatesParentsPathConfig);
const pathCache: string = await parentsPathCache.getItem(this._comment.cid);
if (pathCache) return pathCache.split("/").reverse().join("/");

const postTimestampCache = await this._plebbit.createStorageLRU(postTimestampConfig);
if (this._comment.depth === 0) await postTimestampCache.setItem(this._comment.cid, this._comment.timestamp);
let parentCid = this._comment.parentCid;
let reversedPath = `${this._comment.cid}`; // Path will be reversed here, `nestedReplyCid/replyCid/postCid`
while (parentCid) {
// should attempt to fetch cache here
// Also should we set updatingState everytime we fetch a parent Comment?
const parentPathCache: string = await parentsPathCache.getItem(parentCid);
if (parentPathCache) {
reversedPath += "/" + parentPathCache;
break;
} else {
const parent =
this._findCommentInSubplebbitPosts(subIpns, parentCid) || (await this._fetchParentCommentForCommentUpdate(parentCid));

if (parent.depth === 0) await postTimestampCache.setItem(parent.cid, parent.timestamp);

reversedPath += `/${parentCid}`;
parentCid = parent.parentCid;
}
}

await parentsPathCache.setItem(this._comment.cid, reversedPath);

const finalParentsPath = reversedPath.split("/").reverse().join("/"); // will be postCid/replyCid/nestedReplyCid

return finalParentsPath;
}

async fetchCommentUpdate(): Promise<CommentUpdate> {
// Caching should eventually be moved to storage instead of in-memory
const log = Logger("plebbit-js:comment:update");
const subIpns = await this._fetchSubplebbitForCommentUpdate();
const parentsPostUpdatePath = await this._getParentsPath(subIpns);
const postTimestamp = await (await this._plebbit.createStorageLRU(postTimestampConfig)).getItem(this._comment.postCid);
if (typeof postTimestamp !== "number") throw Error("Failed to fetch post timestamp");
const timestampRanges = getPostUpdateTimestampRange(subIpns.postUpdates, postTimestamp);
if (timestampRanges.length === 0) throw Error("Post has no timestamp range bucket");

for (const timestampRange of timestampRanges) {
const folderCid = subIpns.postUpdates[timestampRange];
const path = `${folderCid}/` + parentsPostUpdatePath + "/update";
this._comment._setUpdatingState("fetching-update-ipfs");
if (this._defaultIpfsProviderUrl) {
this.updateIpfsState("fetching-update-ipfs");
try {
const commentUpdate: CommentUpdate = JSON.parse(await this._fetchCidP2P(path));
this.updateIpfsState("stopped");
return commentUpdate;
} catch (e) {
// if does not exist, try the next timestamp range
log.error(e, `Failed to fetch CommentUpdate from path (${path}). Trying the next timestamp range`);
}
} else {
// States of gateways should be updated by fetchFromMultipleGateways
try {
const update: CommentUpdate = JSON.parse(await this.fetchFromMultipleGateways({ cid: path }, "comment-update"));
return update;
} catch (e) {
// if does not exist, try the next timestamp range
log.error(e, `Failed to fetch CommentUpdate from path (${path}). Trying the next timestamp range`);
}
}
}

throw Error(`CommentUpdate of comment (${this._comment.cid}) does not exist on all timestamp ranges: ${timestampRanges}`);
}

async fetchCommentCid(cid: string): Promise<CommentIpfsType> {
Expand Down Expand Up @@ -390,19 +504,22 @@ export class SubplebbitClientsManager extends ClientsManager {

async fetchSubplebbit(ipnsName: string) {
this._subplebbit._setUpdatingState("fetching-ipns");
let subJson: SubplebbitIpfsType;
if (this._defaultIpfsProviderUrl) {
this.updateIpfsState("fetching-ipns");
const subplebbitCid = await this.resolveIpnsToCidP2P(ipnsName);
this._subplebbit._setUpdatingState("fetching-ipfs");
this.updateIpfsState("fetching-ipfs");
const subplebbit: SubplebbitIpfsType = JSON.parse(await this._fetchCidP2P(subplebbitCid));
subJson = JSON.parse(await this._fetchCidP2P(subplebbitCid));
this.updateIpfsState("stopped");
return subplebbit;
} else {
// States of gateways should be updated by fetchFromMultipleGateways
const update: SubplebbitIpfsType = JSON.parse(await this.fetchFromMultipleGateways({ ipns: ipnsName }, "subplebbit"));
return update;
subJson = JSON.parse(await this.fetchFromMultipleGateways({ ipns: ipnsName }, "subplebbit"));
}

subplebbitForPublishingCache.set(subJson.address, lodash.pick(subJson, ["encryption", "pubsubTopic", "address"]));

return subJson;
}

updateIpfsState(newState: SubplebbitIpfsClient["state"]) {
Expand Down
2 changes: 1 addition & 1 deletion src/clients/ipfs-gateway-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { GenericClientEvents } from "../types";

// Client states
type PublicationGatewayState = "stopped" | "fetching-subplebbit-ipns";
type CommentGatewayState = PublicationGatewayState | "fetching-update-ipns" | "fetching-ipfs";
type CommentGatewayState = PublicationGatewayState | "fetching-update-ipfs" | "fetching-ipfs";
type SubplebbitGatewayState = "stopped" | "fetching-ipns";
type PagesGatewayState = "fetching-ipfs" | "stopped";
type GenericGatewayState = PublicationGatewayState | CommentGatewayState | SubplebbitGatewayState;
Expand Down
Loading

0 comments on commit 7421471

Please sign in to comment.