Skip to content

Commit

Permalink
feat(market): added proposal batching to avoid duplicate offers
Browse files Browse the repository at this point in the history
  • Loading branch information
mgordel authored and grisha87 committed Jan 16, 2024
1 parent b47edca commit d82b09a
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 7 deletions.
1 change: 0 additions & 1 deletion src/agreement/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ export class AgreementFactory {
const offerProperties: ProposalProperties = data.offer.properties as ProposalProperties;
const demandProperties: ProposalProperties = data.demand.properties as ProposalProperties;
const chosenPaymentPlatform = demandProperties["golem.com.payment.chosen-platform"];
console.log({ chosenPaymentPlatform });
const provider = {
name: offerProperties["golem.node.id.name"],
id: data.offer.providerId,
Expand Down
18 changes: 18 additions & 0 deletions src/market/proposal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,22 @@ describe("Proposal", () => {
});
});
});

describe("Estimating cost", () => {
test("it estimate cost based on CPU, Env and startup costs", () => {
const proposal = buildTestProposal({
"golem.inf.cpu.threads": 5,
"golem.com.usage.vector": ["golem.usage.duration_sec", "golem.usage.cpu_sec"],
"golem.com.pricing.model.linear.coeffs": [0.01, 0.02, 0.03],
});
expect(proposal.getEstimatedCost()).toEqual(0.14);
});
test("it estimate cost based on CPU, Env and startup costs if info about the number of threads is missing", () => {
const proposal = buildTestProposal({
"golem.com.usage.vector": ["golem.usage.duration_sec", "golem.usage.cpu_sec"],
"golem.com.pricing.model.linear.coeffs": [0.1, 0.2, 0.3],
});
expect(proposal.getEstimatedCost()).toEqual(0.6);
});
});
});
8 changes: 8 additions & 0 deletions src/market/proposal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ export class Proposal {
return this.getProviderPaymentPlatforms().includes(paymentPlatform);
}

/**
* Proposal cost estimation based on CPU, Env and startup costs
*/
getEstimatedCost(): number {
const threadsNo = this.properties["golem.inf.cpu.threads"] || 1;
return this.pricing.start + this.pricing.cpuSec * threadsNo + this.pricing.envSec;
}

private getProviderPaymentPlatforms(): string[] {
return (
Object.keys(this.properties)
Expand Down
223 changes: 223 additions & 0 deletions src/market/proposals_batch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import { ProposalsBatch } from "./proposals_batch";
import { mock, instance, when } from "@johanblumenberg/ts-mockito";
import { Proposal, ProposalProperties } from "./proposal";
import { ProviderInfo } from "../agreement";

const mockedProviderInfo: ProviderInfo = {
id: "provider-id-1",
name: "provider-name-1",
walletAddress: "0x1234566789",
};

describe("ProposalsBatch", () => {
describe("Adding Proposals", () => {
it("should add the proposal to the batch from new provider", async () => {
const proposalsBatch = new ProposalsBatch({ minBatchSize: 1 });
const mockedProposal = mock(Proposal);
when(mockedProposal.provider).thenReturn(mockedProviderInfo);
when(mockedProposal.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
const proposal = instance(mockedProposal);
await proposalsBatch.addProposal(proposal);
expect((await proposalsBatch.readProposals().next()).value).toContainEqual(proposal);
});
it("should not add the proposal to the batch from the existing provider and the same hardware configuration", async () => {
const proposalsBatch = new ProposalsBatch({ releaseTimeoutMs: 100 });
const mockedProposal = mock(Proposal);
when(mockedProposal.provider).thenReturn(mockedProviderInfo);
when(mockedProposal.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
const proposal1 = instance(mockedProposal);
const proposal2 = instance(mockedProposal);
await proposalsBatch.addProposal(proposal1);
await proposalsBatch.addProposal(proposal2);
const proposals = (await proposalsBatch.readProposals().next()).value;
expect(proposals.length).toEqual(1);
});

it("should add the proposal to the batch from the existing provider and different hardware configuration", async () => {
const proposalsBatch = new ProposalsBatch({ minBatchSize: 2 });
const mockedProposal1 = mock(Proposal);
when(mockedProposal1.provider).thenReturn(mockedProviderInfo);
when(mockedProposal1.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
const mockedProposal2 = mock(Proposal);
when(mockedProposal2.provider).thenReturn(mockedProviderInfo);
when(mockedProposal2.properties).thenReturn({
["golem.inf.cpu.cores"]: 77,
["golem.inf.cpu.threads"]: 77,
["golem.inf.mem.gib"]: 77,
["golem.inf.storage.gib"]: 77,
} as ProposalProperties);
const proposal1 = instance(mockedProposal1);
const proposal2 = instance(mockedProposal2);
await proposalsBatch.addProposal(proposal1);
await proposalsBatch.addProposal(proposal2);
const proposals = (await proposalsBatch.readProposals().next()).value;
expect(proposals.length).toEqual(2);
expect(proposals).toContainEqual(proposal1);
expect(proposals).toContainEqual(proposal2);
});
});
describe("Reading Proposals", () => {
it("should read the set of proposals grouped by provider key distinguished by provider id, cpu, threads, memory and storage", async () => {
const proposalsBatch = new ProposalsBatch({ releaseTimeoutMs: 100 });
const mockedProposal1 = mock(Proposal);
when(mockedProposal1.provider).thenReturn(mockedProviderInfo);
when(mockedProposal1.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
const mockedProposal2 = mock(Proposal);
when(mockedProposal2.provider).thenReturn(mockedProviderInfo);
when(mockedProposal2.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
const mockedProposal3 = mock(Proposal);
when(mockedProposal3.provider).thenReturn(mockedProviderInfo);
when(mockedProposal3.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 77,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
const mockedProposal4 = mock(Proposal);
when(mockedProposal4.provider).thenReturn(mockedProviderInfo);
when(mockedProposal4.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 77,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
const mockedProposal5 = mock(Proposal);
when(mockedProposal5.provider).thenReturn(mockedProviderInfo);
when(mockedProposal5.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 77,
} as ProposalProperties);
const mockedProposal6 = mock(Proposal);
when(mockedProposal6.provider).thenReturn({ id: "provider-77" } as ProviderInfo);
when(mockedProposal6.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
const proposal1 = instance(mockedProposal1);
const proposal2 = instance(mockedProposal2);
const proposal3 = instance(mockedProposal3);
const proposal4 = instance(mockedProposal4);
const proposal5 = instance(mockedProposal5);
const proposal6 = instance(mockedProposal6);
await Promise.all([
proposalsBatch.addProposal(proposal1),
proposalsBatch.addProposal(proposal3),
proposalsBatch.addProposal(proposal4),
proposalsBatch.addProposal(proposal5),
proposalsBatch.addProposal(proposal6),
]);
const proposals = (await proposalsBatch.readProposals().next()).value;
expect(proposals.length).toEqual(5);
expect(proposals).toContainEqual(proposal1);
expect(proposals).not.toContainEqual(proposal2);
expect(proposals).toContainEqual(proposal3);
expect(proposals).toContainEqual(proposal4);
expect(proposals).toContainEqual(proposal5);
expect(proposals).toContainEqual(proposal6);
});
it("should read the set of proposal grouped by provider key and reduced proposals from teh same provider to the lowest price and highest time", async () => {
const proposalsBatch = new ProposalsBatch({ releaseTimeoutMs: 100 });
const mockedProposal1 = mock(Proposal);
when(mockedProposal1.provider).thenReturn(mockedProviderInfo);
when(mockedProposal1.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
when(mockedProposal1.pricing).thenReturn({
cpuSec: 1,
envSec: 1,
start: 1,
});
when(mockedProposal1.timestamp).thenReturn("2024-01-01T00:00:00.000Z");
const mockedProposal2 = mock(Proposal);
when(mockedProposal2.provider).thenReturn(mockedProviderInfo);
when(mockedProposal2.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
when(mockedProposal2.pricing).thenReturn({
cpuSec: 1,
envSec: 1,
start: 1,
});
when(mockedProposal2.timestamp).thenReturn("2024-01-01T07:07:07.007Z");
const mockedProposal3 = mock(Proposal);
when(mockedProposal3.provider).thenReturn(mockedProviderInfo);
when(mockedProposal3.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
when(mockedProposal3.pricing).thenReturn({
cpuSec: 2,
envSec: 2,
start: 2,
});
when(mockedProposal3.timestamp).thenReturn("2024-01-01T07:07:07.007Z");
const proposal1 = instance(mockedProposal1);
const proposal2 = instance(mockedProposal2);
const proposal3 = instance(mockedProposal3);
await proposalsBatch.addProposal(proposal1);
await proposalsBatch.addProposal(proposal2);
await proposalsBatch.addProposal(proposal3);
const proposals = (await proposalsBatch.readProposals().next()).value;
expect(proposals.length).toEqual(1);
expect(proposals).toContainEqual(proposal2);
expect(proposals).not.toContainEqual(proposal1);
expect(proposals).not.toContainEqual(proposal3);
});
it("should drain batch after reading proposals", async () => {
const proposalsBatch = new ProposalsBatch({ releaseTimeoutMs: 100 });
const mockedProposal = mock(Proposal);
when(mockedProposal.provider).thenReturn(mockedProviderInfo);
when(mockedProposal.properties).thenReturn({
["golem.inf.cpu.cores"]: 1,
["golem.inf.cpu.threads"]: 1,
["golem.inf.mem.gib"]: 1,
["golem.inf.storage.gib"]: 1,
} as ProposalProperties);
const proposal = instance(mockedProposal);
await proposalsBatch.addProposal(proposal);
expect((await proposalsBatch.readProposals().next()).value.length).toEqual(1);
expect((await proposalsBatch.readProposals().next()).value.length).toEqual(0);
await proposalsBatch.addProposal(proposal);
await proposalsBatch.addProposal(proposal);
expect((await proposalsBatch.readProposals().next()).value.length).toEqual(1);
expect((await proposalsBatch.readProposals().next()).value.length).toEqual(0);
});
});
});
103 changes: 103 additions & 0 deletions src/market/proposals_batch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { Proposal } from "./proposal";
import AsyncLock from "async-lock";

export type ProposalsBatchOptions = {
/** The minimum number of proposals after which it will be possible to return the collection */
minBatchSize?: number;
/** The maximum waiting time for collecting proposals after which it will be possible to return the collection */
releaseTimeoutMs?: number;
};

const DEFAULTS = {
minBatchSize: 100,
releaseTimeoutMs: 1_000,
};

/**
* Proposals Batch aggregates initial proposals and returns a set grouped by the provider's key
* to avoid duplicate offers issued by the provider.
*/
export class ProposalsBatch {
/** Batch of proposals mapped by provider key and related set of initial proposals */
private batch = new Map<string, Set<Proposal>>();
/** Lock used to synchronize adding and getting proposals from the batch */
private lock: AsyncLock = new AsyncLock();
private config: Required<ProposalsBatchOptions>;

constructor(options?: ProposalsBatchOptions) {
this.config = {
minBatchSize: options?.minBatchSize ?? DEFAULTS.minBatchSize,
releaseTimeoutMs: options?.releaseTimeoutMs ?? DEFAULTS.releaseTimeoutMs,
};
}

/**
* Add proposal to the batch grouped by provider key
* which consist of providerId, cores, threads, mem and storage
*/
async addProposal(proposal: Proposal) {
const providerKey = this.getProviderKey(proposal);
await this.lock.acquire("proposals-batch", () => {
let proposals = this.batch.get(providerKey);
if (!proposals) {
proposals = new Set<Proposal>();
this.batch.set(providerKey, proposals);
}
proposals.add(proposal);
});
}

/**
* Generates a set of proposals that were collected within the specified `releaseTimeoutMs`
* or their size reached the `minBatchSize` value
*/
async *readProposals(): AsyncGenerator<Proposal[]> {
let timeoutId, intervalId;
const isTimeoutReached = new Promise((resolve) => {
timeoutId = setTimeout(resolve, this.config.releaseTimeoutMs);
});
const isBatchSizeReached = new Promise((resolve) => {
intervalId = setInterval(() => {
if (this.batch.size >= this.config.minBatchSize) {
resolve(true);
}
}, 1_000);
});
await Promise.race([isTimeoutReached, isBatchSizeReached]);
clearTimeout(timeoutId);
clearInterval(intervalId);
const proposals: Proposal[] = [];
await this.lock.acquire("proposals-batch", () => {
this.batch.forEach((providersProposals) => proposals.push(this.getBestProposal(providersProposals)));
this.batch.clear();
});
yield proposals;
}

/**
* Selects the best proposal from the set according to the lowest price and the youngest proposal age
*/
private getBestProposal(proposals: Set<Proposal>): Proposal {
const sortByLowerPriceAndHigherTime = (p1: Proposal, p2: Proposal) => {
const p1Price = p1.getEstimatedCost();
const p2Price = p2.getEstimatedCost();
const p1Time = new Date(p1.timestamp).valueOf();
const p2Time = new Date(p2.timestamp).valueOf();
return p1Price !== p2Price ? p1Price - p2Price : p2Time - p1Time;
};
return [...proposals].sort(sortByLowerPriceAndHigherTime)[0];
}

/**
* Provider key used to group proposals so that they can be distinguished based on ID and hardware configuration
*/
private getProviderKey(proposal: Proposal): string {
return [
proposal.provider.id,
proposal.properties["golem.inf.cpu.cores"],
proposal.properties["golem.inf.cpu.threads"],
proposal.properties["golem.inf.mem.gib"],
proposal.properties["golem.inf.storage.gib"],
].join("-");
}
}
Loading

0 comments on commit d82b09a

Please sign in to comment.