Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: different periods between send and receive #1563

Merged
merged 7 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/lib/controller/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
import { fillStoreCache, sendUpdatesAndFlushCache } from "./storeCache";

const namespace = "pepr-system";
export const debounceBackoff = 1000;
const debounceBackoffReceive = 1000;
const debounceBackoffSend = 4000;

export class StoreController {
#name: string;
Expand Down Expand Up @@ -68,6 +69,15 @@

#migrateAndSetupWatch = async (store: Store): Promise<void> => {
Log.debug(redactedStore(store), "Pepr Store migration");
// Add cacheID label to store
await K8s(Store, { namespace, name: this.#name }).Patch([

Check warning on line 73 in src/lib/controller/store.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/controller/store.ts#L73

Added line #L73 was not covered by tests
{
op: "add",
path: "/metadata/labels/pepr.dev-cacheID",
value: `${Date.now()}`,
},
]);

const data: DataStore = store.data || {};
let storeCache: Record<string, Operation> = {};

Expand Down Expand Up @@ -134,7 +144,7 @@

// Debounce the update to 1 second to avoid multiple rapid calls
clearTimeout(this.#sendDebounce);
this.#sendDebounce = setTimeout(debounced, this.#onReady ? 0 : debounceBackoff);
this.#sendDebounce = setTimeout(debounced, this.#onReady ? 0 : debounceBackoffReceive);
cmwylie19 marked this conversation as resolved.
Show resolved Hide resolved
};

#send = (capabilityName: string): DataSender => {
Expand All @@ -151,7 +161,7 @@
Log.debug(redactedPatch(storeCache), "Sending updates to Pepr store");
void sendUpdatesAndFlushCache(storeCache, namespace, this.#name);
}
}, debounceBackoff);
}, debounceBackoffSend);

return sender;
};
Expand All @@ -165,6 +175,9 @@
metadata: {
name: this.#name,
namespace,
labels: {
"pepr.dev-cacheID": `${Date.now()}`,
},
},
data: {
// JSON Patch will die if the data is empty, so we need to add a placeholder
Expand Down
19 changes: 18 additions & 1 deletion src/lib/controller/storeCache.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, it, jest, afterEach } from "@jest/globals";
import { fillStoreCache, sendUpdatesAndFlushCache } from "./storeCache";
import { fillStoreCache, sendUpdatesAndFlushCache, updateCacheID } from "./storeCache";
import { Operation } from "fast-json-patch";
import { GenericClass, K8s, KubernetesObject } from "kubernetes-fluent-client";
import { K8sInit } from "kubernetes-fluent-client/dist/fluent/types";
Expand Down Expand Up @@ -100,3 +100,20 @@ describe("sendCache", () => {
});
});
});

describe("updateCacheId", () => {
it("should update the metadata label of the cacheID in the payload array of patches", () => {
const patches: Operation[] = [
{
op: "add",
path: "/data/hello-pepr-v2-a",
value: "a",
},
];

const updatedPatches = updateCacheID(patches);
expect(updatedPatches.length).toBe(2);
expect(updatedPatches[1].op).toBe("replace");
expect(updatedPatches[1].path).toBe("/metadata/labels/pepr.dev-cacheID");
});
});
11 changes: 10 additions & 1 deletion src/lib/controller/storeCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export const sendUpdatesAndFlushCache = async (cache: Record<string, Operation>,

try {
if (payload.length > 0) {
await K8s(Store, { namespace, name }).Patch(payload); // Send patch to cluster
await K8s(Store, { namespace, name }).Patch(updateCacheID(payload)); // Send patch to cluster
Object.keys(cache).forEach(key => delete cache[key]);
}
} catch (err) {
Expand Down Expand Up @@ -61,3 +61,12 @@ export const fillStoreCache = (
}
return cache;
};

export function updateCacheID(payload: Operation[]): Operation[] {
payload.push({
op: "replace",
path: "/metadata/labels/pepr.dev-cacheID",
value: `${Date.now()}`,
});
return payload;
}
8 changes: 4 additions & 4 deletions src/lib/schedule.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ export class MockStorage {
this.storage[key] = value;
}

setItemAndWait(key: string, value: string): Promise<void> {
setItemAndWait(key: string, value: string): Promise<string> {
return new Promise(resolve => {
this.storage[key] = value;
resolve();
resolve("ok");
});
}

removeItem(key: string): void {
delete this.storage[key];
}

removeItemAndWait(key: string): Promise<void> {
removeItemAndWait(key: string): Promise<string> {
return new Promise(resolve => {
delete this.storage[key];
resolve();
resolve("ok");
});
}

Expand Down
57 changes: 33 additions & 24 deletions src/lib/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
const MAX_WAIT_TIME = 15000;
const STORE_VERSION_PREFIX = "v2";

interface WaitRecord {
timeout?: ReturnType<typeof setTimeout>;
unsubscribe?: () => void;
}

export function v2StoreKey(key: string): string {
return `${STORE_VERSION_PREFIX}-${pointer.escape(key)}`;
}
Expand Down Expand Up @@ -58,13 +63,13 @@
* Sets the value of the pair identified by key to value, creating a new key/value pair if none existed for key previously.
* Resolves when the key/value show up in the store.
*/
setItemAndWait(key: string, value: string): Promise<void>;
setItemAndWait(key: string, value: string): Promise<string>;

/**
* Remove the value of the key.
* Resolves when the key does not show up in the store.
*/
removeItemAndWait(key: string): Promise<void>;
removeItemAndWait(key: string): Promise<string>;
}

/**
Expand Down Expand Up @@ -128,22 +133,24 @@
* @param value - The value of the key
* @returns
*/
setItemAndWait = (key: string, value: string): Promise<void> => {
setItemAndWait = (key: string, value: string): Promise<string> => {
this.#dispatchUpdate("add", [v2StoreKey(key)], value);
const record: WaitRecord = {};

return new Promise<void>((resolve, reject) => {
const unsubscribe = this.subscribe(data => {
return new Promise<string>((resolve, reject) => {
// If promise has not resolved before MAX_WAIT_TIME reject
record.timeout = setTimeout(() => {
record.unsubscribe!();
return reject(`MAX_WAIT_TIME elapsed: Key ${key} not seen in ${MAX_WAIT_TIME / 1000}s`);

Check warning on line 144 in src/lib/storage.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/storage.ts#L143-L144

Added lines #L143 - L144 were not covered by tests
}, MAX_WAIT_TIME);

record.unsubscribe = this.subscribe(data => {
if (data[`${v2UnescapedStoreKey(key)}`] === value) {
unsubscribe();
resolve();
record.unsubscribe!();
clearTimeout(record.timeout);
resolve("ok");

Check warning on line 151 in src/lib/storage.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/storage.ts#L149-L151

Added lines #L149 - L151 were not covered by tests
}
});

// If promise has not resolved before MAX_WAIT_TIME reject
setTimeout(() => {
unsubscribe();
return reject();
}, MAX_WAIT_TIME);
});
};

Expand All @@ -154,21 +161,23 @@
* @param key - The key to add into the store
* @returns
*/
removeItemAndWait = (key: string): Promise<void> => {
removeItemAndWait = (key: string): Promise<string> => {
this.#dispatchUpdate("remove", [v2StoreKey(key)]);
return new Promise<void>((resolve, reject) => {
const unsubscribe = this.subscribe(data => {
const record: WaitRecord = {};
return new Promise<string>((resolve, reject) => {
// If promise has not resolved before MAX_WAIT_TIME reject
record.timeout = setTimeout(() => {
record.unsubscribe!();
return reject(`MAX_WAIT_TIME elapsed: Key ${key} still seen after ${MAX_WAIT_TIME / 1000}s`);

Check warning on line 171 in src/lib/storage.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/storage.ts#L170-L171

Added lines #L170 - L171 were not covered by tests
}, MAX_WAIT_TIME);

record.unsubscribe = this.subscribe(data => {
if (!Object.hasOwn(data, `${v2UnescapedStoreKey(key)}`)) {
unsubscribe();
resolve();
record.unsubscribe!();
clearTimeout(record.timeout);
resolve("ok");

Check warning on line 178 in src/lib/storage.ts

View check run for this annotation

Codecov / codecov/patch

src/lib/storage.ts#L176-L178

Added lines #L176 - L178 were not covered by tests
}
});

// If promise has not resolved before MAX_WAIT_TIME reject
setTimeout(() => {
unsubscribe();
return reject();
}, MAX_WAIT_TIME);
});
};

Expand Down
Loading