Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.x dev] Added support for handler-url and headers per job #139

Merged
merged 8 commits into from
Mar 31, 2024
32 changes: 25 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,44 @@ Please check the table below on what the values mean and what their value should

#### Passing headers to a task

You can pass headers to a task by using the `withHeaders` method on the queue connection.
You can pass headers to a task by using the `setTaskHeadersUsing` method on the `CloudTasksQueue` class.

```php
use Illuminate\Queue\Queue;
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksQueue;

Queue::connection()->setTaskHeaders([
CloudTasksQueue::setTaskHeadersUsing(static fn() => [
'X-My-Header' => 'My-Value',
]);
```

If necessary, the current job being dispatched is also available:
If necessary, the current payload being dispatched is also available:

```php
use Illuminate\Queue\Queue;
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksQueue;

Queue::connection()->setTaskHeaders(fn (array $job) => [
'X-My-Header' => $job['displayName']
CloudTasksQueue::setTaskHeadersUsing(static fn(array $payload) => [
'X-My-Header' => $payload['displayName'],
]);
```

#### Configure task handler url

You can set the handler url for a task by using the `configureHandlerUrlUsing` method on the `CloudTasksQueue` class.

```php
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksQueue;

CloudTasksQueue::configureHandlerUrlUsing(static fn() => 'https://example.com/my-url');
```

If necessary, the current job being dispatched is also available:

```php
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksQueue;

CloudTasksQueue::configureHandlerUrlUsing(static fn(MyJob $job) => 'https://example.com/my-url/' . $job->something());
```

<details>
<summary>
How it works & Differences
Expand Down
67 changes: 52 additions & 15 deletions src/CloudTasksQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,33 @@

class CloudTasksQueue extends LaravelQueue implements QueueContract
{
private Closure|array $headers = [];
private static ?Closure $handlerUrlCallback = null;
private static ?Closure $taskHeadersCallback = null;

public function __construct(public array $config, public CloudTasksClient $client, public $dispatchAfterCommit = false)
{
//
}

public static function configureHandlerUrlUsing(Closure $callback): void
{
static::$handlerUrlCallback = $callback;
}

public static function forgetHandlerUrlCallback(): void
{
self::$handlerUrlCallback = null;
}

public static function setTaskHeadersUsing(Closure $callback): void
{
static::$taskHeadersCallback = $callback;
}

public static function forgetTaskHeadersCallback(): void
{
self::$taskHeadersCallback = null;
}

/**
* Get the size of the queue.
Expand Down Expand Up @@ -57,8 +78,8 @@ public function push($job, $data = '', $queue = null)
$this->createPayload($job, $queue, $data),
$queue,
null,
function ($payload, $queue) {
return $this->pushRaw($payload, $queue);
function ($payload, $queue) use ($job) {
return $this->pushRaw($payload, $queue, ['job' => $job]);
}
);
}
Expand All @@ -73,8 +94,9 @@ function ($payload, $queue) {
public function pushRaw($payload, $queue = null, array $options = [])
{
$delay = ! empty($options['delay']) ? $options['delay'] : 0;
$job = $options['job'] ?? null;

return $this->pushToCloudTasks($queue, $payload, $delay);
return $this->pushToCloudTasks($queue, $payload, $delay, $job);
}

/**
Expand All @@ -93,8 +115,8 @@ public function later($delay, $job, $data = '', $queue = null)
$this->createPayload($job, $queue, $data),
$queue,
$delay,
function ($payload, $queue, $delay) {
return $this->pushToCloudTasks($queue, $payload, $delay);
function ($payload, $queue, $delay) use ($job) {
return $this->pushToCloudTasks($queue, $payload, $delay, $job);
}
);
}
Expand All @@ -105,9 +127,10 @@ function ($payload, $queue, $delay) {
* @param string|null $queue
* @param string $payload
* @param \DateTimeInterface|\DateInterval|int $delay
* @param string|object $job
* @return string
*/
protected function pushToCloudTasks($queue, $payload, $delay = 0)
protected function pushToCloudTasks($queue, $payload, $delay, mixed $job)
{
$queue = $queue ?: $this->config['queue'];

Expand All @@ -122,7 +145,7 @@ protected function pushToCloudTasks($queue, $payload, $delay = 0)
connectionName: $this->getConnectionName(),
);

$this->addPayloadToTask($payload, $task);
$this->addPayloadToTask($payload, $task, $job);

// The deadline for requests sent to the app. If the app does not respond by
// this deadline then the request is cancelled and the attempt is marked as
Expand Down Expand Up @@ -173,9 +196,10 @@ private function enrichPayloadWithInternalData(
return $payload;
}

public function addPayloadToTask(array $payload, Task $task): Task
/** @param string|object $job */
public function addPayloadToTask(array $payload, Task $task, mixed $job): Task
{
$headers = value($this->headers, $payload) ?: [];
$headers = $this->headers($payload);

if (! empty($this->config['app_engine'])) {
$path = \Safe\parse_url(route('cloud-tasks.handle-task'), PHP_URL_PATH);
Expand All @@ -195,7 +219,7 @@ public function addPayloadToTask(array $payload, Task $task): Task
$task->setAppEngineHttpRequest($appEngineRequest);
} else {
$httpRequest = new HttpRequest();
$httpRequest->setUrl($this->getHandler());
$httpRequest->setUrl($this->getHandler($job));
$httpRequest->setBody(json_encode($payload));
$httpRequest->setHttpMethod(HttpMethod::POST);
$httpRequest->setHeaders($headers);
Expand Down Expand Up @@ -225,12 +249,17 @@ public function release(CloudTasksJob $job, int $delay = 0): void
$this->pushRaw(
payload: $job->getRawBody(),
queue: $job->getQueue(),
options: ['delay' => $delay]
options: ['delay' => $delay, 'job' => $job],
);
}

public function getHandler(): string
/** @param string|object $job */
public function getHandler(mixed $job): string
{
if (static::$handlerUrlCallback) {
return (static::$handlerUrlCallback)($job);
}

if (empty($this->config['handler'])) {
$this->config['handler'] = request()->getSchemeAndHttpHost();
}
Expand All @@ -244,8 +273,16 @@ public function getHandler(): string
return $handler.'/'.config('cloud-tasks.uri');
}

public function setTaskHeaders(Closure|array $headers): void
/**
* @param array<string, mixed> $payload
* @return array<string, mixed>
*/
private function headers(mixed $payload): array
{
$this->headers = $headers;
if (!static::$taskHeadersCallback) {
return [];
}

return (static::$taskHeadersCallback)($payload);
}
}
42 changes: 35 additions & 7 deletions tests/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\Str;
use Override;
use PHPUnit\Framework\Attributes\Test;
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksApi;
use Stackkit\LaravelGoogleCloudTasksQueue\CloudTasksQueue;
use Stackkit\LaravelGoogleCloudTasksQueue\Events\JobReleased;
use Tests\Support\FailingJob;
use Tests\Support\FailingJobWithExponentialBackoff;
Expand All @@ -28,6 +30,15 @@

class QueueTest extends TestCase
{
#[Override]
protected function tearDown(): void
{
parent::tearDown();

CloudTasksQueue::forgetHandlerUrlCallback();
CloudTasksQueue::forgetTaskHeadersCallback();
}

#[Test]
public function a_http_request_with_the_handler_url_is_made()
{
Expand Down Expand Up @@ -59,7 +70,7 @@ public function it_posts_to_the_handler()
}

#[Test]
public function it_posts_to_the_correct_handler_url()
public function it_posts_to_the_configured_handler_url()
{
// Arrange
$this->setConfigValue('handler', 'https://docker.for.mac.localhost:8081');
Expand All @@ -74,6 +85,25 @@ public function it_posts_to_the_correct_handler_url()
});
}

#[Test]
public function it_posts_to_the_callback_handler_url()
{
// Arrange
$this->setConfigValue('handler', 'https://docker.for.mac.localhost:8081');
CloudTasksApi::fake();
CloudTasksQueue::configureHandlerUrlUsing(static fn(SimpleJob $job) => 'https://example.com/api/my-custom-route?job=' . $job->id);

// Act
$job = new SimpleJob();
$job->id = 1;
$this->dispatch($job);

// Assert
CloudTasksApi::assertTaskCreated(function (Task $task): bool {
return $task->getHttpRequest()->getUrl() === 'https://example.com/api/my-custom-route?job=1';
});
}

#[Test]
public function it_posts_the_serialized_job_payload_to_the_handler()
{
Expand Down Expand Up @@ -459,7 +489,7 @@ public function headers_can_be_added_to_the_task()
CloudTasksApi::fake();

// Act
Queue::connection()->setTaskHeaders([
CloudTasksQueue::setTaskHeadersUsing(static fn() => [
'X-MyHeader' => 'MyValue',
]);

Expand All @@ -478,11 +508,9 @@ public function headers_can_be_added_to_the_task_with_job_context()
CloudTasksApi::fake();

// Act
Queue::connection()->setTaskHeaders(function (array $payload) {
return [
'X-MyHeader' => $payload['displayName'],
];
});
CloudTasksQueue::setTaskHeadersUsing(static fn(array $payload) => [
'X-MyHeader' => $payload['displayName'],
]);

$this->dispatch((new SimpleJob()));

Expand Down
1 change: 1 addition & 0 deletions tests/Support/SimpleJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class SimpleJob implements ShouldQueue
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

public $tries = 3;
public $id = 0;

/**
* Create a new job instance.
Expand Down
Loading