Skip to content

Commit

Permalink
feat(functions): Add features to task queue functions (#2216)
Browse files Browse the repository at this point in the history
* enhance tq functions api to allow naming of tasks and deleting tasks

* tidy up docstrings

* make task already exists error consistent w rest of admin sdk

* minor comment fixes
  • Loading branch information
blidd-google authored Jul 10, 2023
1 parent b0e65c4 commit b9eaae6
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 48 deletions.
3 changes: 3 additions & 0 deletions etc/firebase-admin.functions.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export function getFunctions(app?: App): Functions;
// @public
export type TaskOptions = DeliverySchedule & TaskOptionsExperimental & {
dispatchDeadlineSeconds?: number;
id?: string;
headers?: Record<string, string>;
};

// @public
Expand All @@ -50,6 +52,7 @@ export interface TaskOptionsExperimental {

// @public
export class TaskQueue<Args = Record<string, any>> {
delete(id: string): Promise<void>;
enqueue(data: Args, opts?: TaskOptions): Promise<void>;
}

Expand Down
142 changes: 111 additions & 31 deletions src/functions/functions-api-client-internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import * as validator from '../utils/validator';
import { TaskOptions } from './functions-api';
import { ComputeEngineCredential } from '../app/credential-internal';

const CLOUD_TASKS_API_URL_FORMAT = 'https://cloudtasks.googleapis.com/v2/projects/{projectId}/locations/{locationId}/queues/{resourceId}/tasks';
const CLOUD_TASKS_API_RESOURCE_PATH = 'projects/{projectId}/locations/{locationId}/queues/{resourceId}/tasks';
const CLOUD_TASKS_API_URL_FORMAT = 'https://cloudtasks.googleapis.com/v2/' + CLOUD_TASKS_API_RESOURCE_PATH;
const FIREBASE_FUNCTION_URL_FORMAT = 'https://{locationId}-{projectId}.cloudfunctions.net/{resourceId}';

const FIREBASE_FUNCTIONS_CONFIG_HEADERS = {
Expand Down Expand Up @@ -54,6 +55,61 @@ export class FunctionsApiClient {
}
this.httpClient = new AuthorizedHttpClient(app as FirebaseApp);
}
/**
* Deletes a task from a queue.
*
* @param id - The ID of the task to delete.
* @param functionName - The function name of the queue.
* @param extensionId - Optional canonical ID of the extension.
*/
public async delete(id: string, functionName: string, extensionId?: string): Promise<void> {
if (!validator.isNonEmptyString(functionName)) {
throw new FirebaseFunctionsError(
'invalid-argument', 'Function name must be a non empty string');
}
if (!validator.isTaskId(id)) {
throw new FirebaseFunctionsError(
'invalid-argument', 'id can contain only letters ([A-Za-z]), numbers ([0-9]), '
+ 'hyphens (-), or underscores (_). The maximum length is 500 characters.');
}

let resources: utils.ParsedResource;
try {
resources = utils.parseResourceName(functionName, 'functions');
} catch (err) {
throw new FirebaseFunctionsError(
'invalid-argument', 'Function name must be a single string or a qualified resource name');
}
resources.projectId = resources.projectId || await this.getProjectId();
resources.locationId = resources.locationId || DEFAULT_LOCATION;
if (!validator.isNonEmptyString(resources.resourceId)) {
throw new FirebaseFunctionsError(
'invalid-argument', 'No valid function name specified to enqueue tasks for.');
}
if (typeof extensionId !== 'undefined' && validator.isNonEmptyString(extensionId)) {
resources.resourceId = `ext-${extensionId}-${resources.resourceId}`;
}

try {
const serviceUrl = await this.getUrl(resources, CLOUD_TASKS_API_URL_FORMAT.concat('/', id));
const request: HttpRequestConfig = {
method: 'DELETE',
url: serviceUrl,
headers: FIREBASE_FUNCTIONS_CONFIG_HEADERS,
};
await this.httpClient.send(request);
} catch (err: unknown) {
if (err instanceof HttpError) {
if (err.response.status === 404) {
// if no task with the provided ID exists, then ignore the delete.
return;
}
throw this.toFirebaseError(err);
} else {
throw err;
}
}
}

/**
* Creates a task and adds it to a queue.
Expand All @@ -63,47 +119,53 @@ export class FunctionsApiClient {
* @param extensionId - Optional canonical ID of the extension.
* @param opts - Optional options when enqueuing a new task.
*/
public enqueue(data: any, functionName: string, extensionId?: string, opts?: TaskOptions): Promise<void> {
public async enqueue(data: any, functionName: string, extensionId?: string, opts?: TaskOptions): Promise<void> {
if (!validator.isNonEmptyString(functionName)) {
throw new FirebaseFunctionsError(
'invalid-argument', 'Function name must be a non empty string');
}

const task = this.validateTaskOptions(data, opts);
let resources: utils.ParsedResource;
try {
resources = utils.parseResourceName(functionName, 'functions');
}
catch (err) {
} catch (err) {
throw new FirebaseFunctionsError(
'invalid-argument', 'Function name must be a single string or a qualified resource name');
}

resources.projectId = resources.projectId || await this.getProjectId();
resources.locationId = resources.locationId || DEFAULT_LOCATION;
if (!validator.isNonEmptyString(resources.resourceId)) {
throw new FirebaseFunctionsError(
'invalid-argument', 'No valid function name specified to enqueue tasks for.');
}
if (typeof extensionId !== 'undefined' && validator.isNonEmptyString(extensionId)) {
resources.resourceId = `ext-${extensionId}-${resources.resourceId}`;
}

return this.getUrl(resources, CLOUD_TASKS_API_URL_FORMAT)
.then((serviceUrl) => {
return this.updateTaskPayload(task, resources, extensionId)
.then((task) => {
const request: HttpRequestConfig = {
method: 'POST',
url: serviceUrl,
headers: FIREBASE_FUNCTIONS_CONFIG_HEADERS,
data: {
task,
}
};
return this.httpClient.send(request);
})
})
.then(() => {
return;
})
.catch((err) => {
throw this.toFirebaseError(err);
});

const task = this.validateTaskOptions(data, resources, opts);
try {
const serviceUrl = await this.getUrl(resources, CLOUD_TASKS_API_URL_FORMAT);
const taskPayload = await this.updateTaskPayload(task, resources, extensionId);
const request: HttpRequestConfig = {
method: 'POST',
url: serviceUrl,
headers: FIREBASE_FUNCTIONS_CONFIG_HEADERS,
data: {
task: taskPayload,
}
};
await this.httpClient.send(request);
} catch (err: unknown) {
if (err instanceof HttpError) {
if (err.response.status === 409) {
throw new FirebaseFunctionsError('task-already-exists', `A task with ID ${opts?.id} already exists`);
} else {
throw this.toFirebaseError(err);
}
} else {
throw err;
}
}
}

private getUrl(resourceName: utils.ParsedResource, urlFormat: string): Promise<string> {
Expand Down Expand Up @@ -167,15 +229,18 @@ export class FunctionsApiClient {
});
}

private validateTaskOptions(data: any, opts?: TaskOptions): Task {
private validateTaskOptions(data: any, resources: utils.ParsedResource, opts?: TaskOptions): Task {
const task: Task = {
httpRequest: {
url: '',
oidcToken: {
serviceAccountEmail: '',
},
body: Buffer.from(JSON.stringify({ data })).toString('base64'),
headers: { 'Content-Type': 'application/json' }
headers: {
'Content-Type': 'application/json',
...opts?.headers,
}
}
}

Expand Down Expand Up @@ -214,6 +279,19 @@ export class FunctionsApiClient {
}
task.dispatchDeadline = `${opts.dispatchDeadlineSeconds}s`;
}
if ('id' in opts && typeof opts.id !== 'undefined') {
if (!validator.isTaskId(opts.id)) {
throw new FirebaseFunctionsError(
'invalid-argument', 'id can contain only letters ([A-Za-z]), numbers ([0-9]), '
+ 'hyphens (-), or underscores (_). The maximum length is 500 characters.');
}
const resourcePath = utils.formatString(CLOUD_TASKS_API_RESOURCE_PATH, {
projectId: resources.projectId,
locationId: resources.locationId,
resourceId: resources.resourceId,
});
task.name = resourcePath.concat('/', opts.id);
}
if (typeof opts.uri !== 'undefined') {
if (!validator.isURL(opts.uri)) {
throw new FirebaseFunctionsError(
Expand Down Expand Up @@ -280,6 +358,7 @@ interface Error {
* containing the relevant fields for enqueueing tasks that tirgger Cloud Functions.
*/
export interface Task {
name?: string;
// A timestamp in RFC3339 UTC "Zulu" format, with nanosecond resolution and up to nine fractional
// digits. Examples: "2014-10-02T15:01:23Z" and "2014-10-02T15:01:23.045123456Z".
scheduleTime?: string;
Expand Down Expand Up @@ -317,7 +396,8 @@ export type FunctionsErrorCode =
| 'permission-denied'
| 'unauthenticated'
| 'not-found'
| 'unknown-error';
| 'unknown-error'
| 'task-already-exists';

/**
* Firebase Functions error code structure. This extends PrefixedFirebaseError.
Expand Down
34 changes: 34 additions & 0 deletions src/functions/functions-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,40 @@ export type TaskOptions = DeliverySchedule & TaskOptionsExperimental & {
* The default is 10 minutes. The deadline must be in the range of 15 seconds and 30 minutes.
*/
dispatchDeadlineSeconds?: number;

/**
* The ID to use for the enqueued event.
* If not provided, one will be automatically generated.
* If provided, an explicitly specified task ID enables task de-duplication. If a task's ID is
* identical to that of an existing task or a task that was deleted or executed recently then
* the call will throw an error with code "functions/task-already-exists". Another task with
* the same ID can't be created for ~1hour after the original task was deleted or executed.
*
* Because there is an extra lookup cost to identify duplicate task IDs, setting ID
* significantly increases latency. Using hashed strings for the task ID or for the prefix of
* the task ID is recommended. Choosing task IDs that are sequential or have sequential
* prefixes, for example using a timestamp, causes an increase in latency and error rates in
* all task commands. The infrastructure relies on an approximately uniform distribution of
* task IDs to store and serve tasks efficiently.
*
* "Push IDs" from the Firebase Realtime Database make poor IDs because they are based on
* timestamps and will cause contention (slowdowns) in your task queue. Reversed push IDs
* however form a perfect distribution and are an ideal key. To reverse a string in
* javascript use `someString.split("").reverse().join("")`
*/
id?: string;

/**
* HTTP request headers to include in the request to the task queue function.
* These headers represent a subset of the headers that will accompany the task's HTTP
* request. Some HTTP request headers will be ignored or replaced, e.g. Authorization, Host, Content-Length,
* User-Agent etc. cannot be overridden.
*
* By default, Content-Type is set to 'application/json'.
*
* The size of the headers must be less than 80KB.
*/
headers?: Record<string, string>;
}

/**
Expand Down
9 changes: 9 additions & 0 deletions src/functions/functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,13 @@ export class TaskQueue<Args = Record<string, any>> {
public enqueue(data: Args, opts?: TaskOptions): Promise<void> {
return this.client.enqueue(data, this.functionName, this.extensionId, opts);
}

/**
* Deletes an enqueued task if it has not yet completed.
* @param id - the ID of the task, relative to this queue.
* @returns A promise that resolves when the task has been deleted.
*/
public delete(id: string): Promise<void> {
return this.client.delete(id, this.functionName, this.extensionId);
}
}
2 changes: 1 addition & 1 deletion src/functions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export {
AbsoluteDelivery,
DeliverySchedule,
TaskOptions,
TaskOptionsExperimental
TaskOptionsExperimental,
} from './functions-api';
export {
Functions,
Expand Down
16 changes: 16 additions & 0 deletions src/utils/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,19 @@ export function isTopic(topic: any): boolean {
const VALID_TOPIC_REGEX = /^(\/topics\/)?(private\/)?[a-zA-Z0-9-_.~%]+$/;
return VALID_TOPIC_REGEX.test(topic);
}

/**
* Validates that the provided string can be used as a task ID
* for Cloud Tasks.
*
* @param taskId - the task ID to validate.
* @returns Whether the provided task ID is valid.
*/
export function isTaskId(taskId: any): boolean {
if (typeof taskId !== 'string') {
return false;
}

const VALID_TASK_ID_REGEX = /^[A-Za-z0-9_-]+$/;
return VALID_TASK_ID_REGEX.test(taskId);
}
Loading

0 comments on commit b9eaae6

Please sign in to comment.