Skip to content

Commit

Permalink
feat: stop monitor thread (#32)
Browse files Browse the repository at this point in the history
* Revert "chore: parallel implementation of getThreads (#28)"

This reverts commit 2689afc.

* Revert "1.0.2-alpha.5"

This reverts commit 3cd1cf3.

* feat: add stopGenerator object to stop monitorThread

* feat: add alpha5 version back

* feat: add validateMessage in decodeMessage
  • Loading branch information
albertfolch-redeemeum authored Aug 31, 2022
1 parent 3cd1cf3 commit a8dd646
Show file tree
Hide file tree
Showing 22 changed files with 286 additions and 460 deletions.
6 changes: 1 addition & 5 deletions jest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ const config: Config.InitialOptions = {
testEnvironment: "node",
collectCoverage: true,
coverageDirectory: "coverage",
collectCoverageFrom: [
"src/**/*.{ts,js}",
"!src/example/*.{ts,js}",
"!src/workers/*.{ts,js}"
],
collectCoverageFrom: ["src/**/*.{ts,js}", "!src/example/*.{ts,js}"],
coverageReporters: ["json", "text"],
coveragePathIgnorePatterns: ["jest.config.js", "/node_modules/", "/dist/"],
globals: {
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 9 additions & 30 deletions src/example/index.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,24 @@
import { Message } from "@xmtp/xmtp-js";
import { Wallet } from "ethers";
import { BosonXmtpClient } from "..";
import {
MessageObject,
MessageType,
SupportedFileMimeTypes,
ThreadId,
ThreadObject
} from "../util/v0.0.1/types";
} from "../util/v0.0.1/definitions";

// This is just a playground for development of the SDK
async function main() {
const privateKey = "";
const counterparties: string[] = [""];
const counterparties: string[] = ["", ""];
const envName = "local-df";

const wallet: any = new Wallet(privateKey);
const xmtpClient: BosonXmtpClient = await BosonXmtpClient.initialise(
wallet,
envName
);

console.time("sync");
const threadsSync = await xmtpClient.getThreads(counterparties);
console.timeEnd("sync");

console.time("parallel");
const threads = await xmtpClient.getThreadsParallel(counterparties);
console.timeEnd("parallel");
// console.log(JSON.stringify(threads, null, 1));
// console.log(threads);
const xmtpClient: any = await BosonXmtpClient.initialise(wallet, envName);

// const threadId: ThreadId = {
// exchangeId: "0",
// buyerId: "2",
// sellerId: "2"
// }
// const thread: ThreadObject = await xmtpClient.getThread(threadId, counterparties[0])
// console.log(JSON.stringify(thread,null,2));

// console.time("sequential")
// console.log(await xmtpClient.getThreads(counterparties));
// console.timeEnd("sequential")
// const threads: any[] = await xmtpClient.getThreads(counterparties);
// console.log(threads);

// const threadId: ThreadId = {
// exchangeId: "31",
Expand All @@ -52,8 +31,8 @@ async function main() {
// console.log(messages)
// }

// await exampleEncodeAndSendStringMessage(xmtpClient, counterparties[1]);
// await exampleDecodeStringMessage(xmtpClient, counterparties[1]);
await exampleEncodeAndSendStringMessage(xmtpClient, counterparties[1]);
await exampleDecodeStringMessage(xmtpClient, counterparties[1]);

// await exampleEncodeAndSendImageMessage(xmtpClient, counterparties[0]);
// await exampleDecodeImageMessage(xmtpClient, counterparties[0]);
Expand Down
170 changes: 105 additions & 65 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,19 @@ import {
} from "@xmtp/xmtp-js";
import { Signer } from "ethers";
import { XmtpClient } from "./xmtp/client";
import { BosonCodec } from "./xmtp/codec/boson-codec";
import { BosonCodec, ContentTypeBoson } from "./xmtp/codec/boson-codec";
import {
MessageData,
MessageObject,
ThreadId,
ThreadObject
} from "./util/v0.0.1/types";
} from "./util/v0.0.1/definitions";
import {
createWorker,
decodeMessage,
filterByAuthorityId,
splitConversation
getAuthorityId,
isValidJsonString,
isValidMessageType,
matchThreadIds
} from "./util/v0.0.1/functions";
import { getAuthorityId, matchThreadIds } from "./util/helper";
import { validateMessage } from "./util/validators";

export class BosonXmtpClient extends XmtpClient {
Expand Down Expand Up @@ -65,60 +64,16 @@ export class BosonXmtpClient extends XmtpClient {
const threads: ThreadObject[] = [];

for (const counterparty of counterparties) {
let messages: Message[] = await this.getConversationHistory(
const chatThreads: ThreadObject[] = await this.splitIntoThreads(
counterparty,
options
);
messages = filterByAuthorityId(messages, this.envName);
const chatThreads: ThreadObject[] = await splitConversation(
messages,
counterparty,
this.envName
);
threads.push(...chatThreads);
}

return threads;
}

/**
* Parallelised implementation of getThreads
* which makes use of worker threads
* @param counterparties - Array of wallet addresses
* @param options - (optional) {@link ListMessagesOptions}
* @returns Threads - {@link ThreadObject}[]
*/
public async getThreadsParallel(
counterparties: string[],
options?: ListMessagesOptions
): Promise<ThreadObject[]> {
const threads: ThreadObject[] = [];

const workerPromises: Promise<ThreadObject[]>[] = [];
for (const counterparty of counterparties) {
let messages: Message[] = await this.getConversationHistory(
counterparty,
options
);
messages = filterByAuthorityId(messages, this.envName);
workerPromises.push(
createWorker(
"./dist/cjs/workers/split-conversation-worker.js",
messages,
counterparty,
this.envName
)
);
}

const threadResults: ThreadObject[][] = await Promise.all(workerPromises);
for (const threadResult of threadResults) {
threads.push(...threadResult);
}

return threads;
}

/**
* Get a specific thread between the client
* and the relevant counter-party
Expand All @@ -132,16 +87,10 @@ export class BosonXmtpClient extends XmtpClient {
counterparty: string,
options?: ListMessagesOptions
): Promise<ThreadObject> {
let messages: Message[] = await this.getConversationHistory(
const threads: ThreadObject[] = await this.splitIntoThreads(
counterparty,
options
);
messages = filterByAuthorityId(messages, this.envName);
const threads: ThreadObject[] = await splitConversation(
messages,
counterparty,
this.envName
);
const thread: ThreadObject = threads.filter((thread) =>
matchThreadIds(thread.threadId, threadId)
)[0];
Expand All @@ -159,18 +108,21 @@ export class BosonXmtpClient extends XmtpClient {
*/
public async *monitorThread(
threadId: ThreadId,
counterparty: string
counterparty: string,
stopGenerator: { done: boolean } = { done: false }
): AsyncGenerator<MessageData> {
const conversation: Conversation = await this.startConversation(
counterparty
);

for await (const message of await conversation.streamMessages()) {
if (message.senderAddress === counterparty) {
const decodedMessage: MessageObject = decodeMessage(
message,
this.envName
) as MessageObject;
if (stopGenerator.done) {
return;
}
const decodedMessage: MessageObject = (await this.decodeMessage(
message
)) as MessageObject;
if (matchThreadIds(decodedMessage.threadId, threadId)) {
const messageData: MessageData = {
authorityId: message.contentType.authorityId,
Expand Down Expand Up @@ -217,7 +169,95 @@ export class BosonXmtpClient extends XmtpClient {
timestamp: message.header.timestamp,
sender: message.senderAddress,
recipient: message.recipientAddress,
data: decodeMessage(message, this.envName) as MessageObject
data: (await this.decodeMessage(message)) as MessageObject
};
}

/**
* Decode and validate message
* @param message - {@link Message}
* @returns Decoded message - {@link MessageObject}
*/
public async decodeMessage(message: Message): Promise<MessageObject | void> {
if (
message.contentType?.authorityId === getAuthorityId(this.envName) &&
isValidJsonString(message.content)
) {
const messageObject: MessageObject = JSON.parse(message.content);

if (
isValidMessageType(messageObject.contentType) &&
(await validateMessage(messageObject, {
throwError: false
}))
) {
return messageObject;
}
}
}

/**
* This splits a conversation between the
* client and the relevant counterparty
* into individual chat threads
* TODO: refactor/optimise
* @param counterparty - wallet address
* @param options - (optional) {@link ListMessagesOptions}
* @returns Threads - {@link ThreadObject}[]
*/
private async splitIntoThreads(
counterparty: string,
options?: ListMessagesOptions
): Promise<ThreadObject[]> {
let messages: Message[] = await this.getConversationHistory(
counterparty,
options
);
messages = messages.filter(
(message) =>
message.contentType?.authorityId ===
ContentTypeBoson(this.envName).authorityId
);
const threads: ThreadObject[] = [];

for (const message of messages) {
const decodedMessage: MessageObject = (await this.decodeMessage(
message
)) as MessageObject;

if (decodedMessage && isValidMessageType(decodedMessage.contentType)) {
// if this thread does not already exist in the threads array then add it
if (
threads.filter((thread) =>
matchThreadIds(thread.threadId, decodedMessage.threadId)
).length < 1
) {
threads.push({
threadId: decodedMessage.threadId,
counterparty: counterparty,
messages: []
});
}

const messageWrapper: MessageData = {
authorityId: message.contentType?.authorityId as string,
timestamp: message.header.timestamp,
sender: message.senderAddress as string,
recipient: message.recipientAddress as string,
data: decodedMessage
};

// add message to relevant thread - TODO: refactor(?)
for (let i = 0; i < threads.length; i++) {
if (
matchThreadIds(threads[i].threadId, messageWrapper.data.threadId)
) {
threads[i].messages.push(messageWrapper);
}
}
}
}

return threads;
}
}
32 changes: 0 additions & 32 deletions src/util/helper.ts

This file was deleted.

File renamed without changes.
Loading

0 comments on commit a8dd646

Please sign in to comment.