Skip to content

Commit

Permalink
[11.x] Expose queue name and delay on job queue events (#49837)
Browse files Browse the repository at this point in the history
* Expose queue name and delay on job queue events

* formatting

* fix type

---------

Co-authored-by: Taylor Otwell <taylor@laravel.com>
  • Loading branch information
stayallive and taylorotwell authored Jan 25, 2024
1 parent 5115dd8 commit bdd3ffd
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 29 deletions.
30 changes: 21 additions & 9 deletions src/Illuminate/Queue/Events/JobQueued.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace Illuminate\Queue\Events;

use RuntimeException;

class JobQueued
{
/**
Expand All @@ -13,6 +11,13 @@ class JobQueued
*/
public $connectionName;

/**
* The queue name.
*
* @var string
*/
public $queue;

/**
* The job ID.
*
Expand All @@ -30,25 +35,36 @@ class JobQueued
/**
* The job payload.
*
* @var string|null
* @var string
*/
public $payload;

/**
* The amount of time the job was delayed.
*
* @var int|null
*/
public $delay;

/**
* Create a new event instance.
*
* @param string $connectionName
* @param string $queue
* @param string|int|null $id
* @param \Closure|string|object $job
* @param string|null $payload
* @param string $payload
* @param int|null $delay
* @return void
*/
public function __construct($connectionName, $id, $job, $payload = null)
public function __construct($connectionName, $queue, $id, $job, $payload, $delay)
{
$this->connectionName = $connectionName;
$this->queue = $queue;
$this->id = $id;
$this->job = $job;
$this->payload = $payload;
$this->delay = $delay;
}

/**
Expand All @@ -58,10 +74,6 @@ public function __construct($connectionName, $id, $job, $payload = null)
*/
public function payload()
{
if ($this->payload === null) {
throw new RuntimeException('The job payload was not provided when the event was dispatched.');
}

return json_decode($this->payload, true, flags: JSON_THROW_ON_ERROR);
}
}
30 changes: 21 additions & 9 deletions src/Illuminate/Queue/Events/JobQueueing.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace Illuminate\Queue\Events;

use RuntimeException;

class JobQueueing
{
/**
Expand All @@ -13,6 +11,13 @@ class JobQueueing
*/
public $connectionName;

/**
* The queue name.
*
* @var string
*/
public $queue;

/**
* The job instance.
*
Expand All @@ -23,23 +28,34 @@ class JobQueueing
/**
* The job payload.
*
* @var string|null
* @var string
*/
public $payload;

/**
* The number of seconds the job was delayed.
*
* @var int|null
*/
public $delay;

/**
* Create a new event instance.
*
* @param string $connectionName
* @param string $queue
* @param \Closure|string|object $job
* @param string|null $payload
* @param string $payload
* @param int|null $delay
* @return void
*/
public function __construct($connectionName, $job, $payload = null)
public function __construct($connectionName, $queue, $job, $payload, $delay)
{
$this->connectionName = $connectionName;
$this->queue = $queue;
$this->job = $job;
$this->payload = $payload;
$this->delay = $delay;
}

/**
Expand All @@ -49,10 +65,6 @@ public function __construct($connectionName, $job, $payload = null)
*/
public function payload()
{
if ($this->payload === null) {
throw new RuntimeException('The job payload was not provided when the event was dispatched.');
}

return json_decode($this->payload, true, flags: JSON_THROW_ON_ERROR);
}
}
30 changes: 19 additions & 11 deletions src/Illuminate/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -327,20 +327,20 @@ protected function enqueueUsing($job, $payload, $queue, $delay, $callback)
if ($this->shouldDispatchAfterCommit($job) &&
$this->container->bound('db.transactions')) {
return $this->container->make('db.transactions')->addCallback(
function () use ($payload, $queue, $delay, $callback, $job) {
$this->raiseJobQueueingEvent($job, $payload);
function () use ($queue, $job, $payload, $delay, $callback) {
$this->raiseJobQueueingEvent($queue, $job, $payload, $delay);

return tap($callback($payload, $queue, $delay), function ($jobId) use ($job, $payload) {
$this->raiseJobQueuedEvent($jobId, $job, $payload);
return tap($callback($payload, $queue, $delay), function ($jobId) use ($queue, $job, $payload, $delay) {
$this->raiseJobQueuedEvent($queue, $jobId, $job, $payload, $delay);
});
}
);
}

$this->raiseJobQueueingEvent($job, $payload);
$this->raiseJobQueueingEvent($queue, $job, $payload, $delay);

return tap($callback($payload, $queue, $delay), function ($jobId) use ($job, $payload) {
$this->raiseJobQueuedEvent($jobId, $job, $payload);
return tap($callback($payload, $queue, $delay), function ($jobId) use ($queue, $job, $payload, $delay) {
$this->raiseJobQueuedEvent($queue, $jobId, $job, $payload, $delay);
});
}

Expand Down Expand Up @@ -370,29 +370,37 @@ protected function shouldDispatchAfterCommit($job)
/**
* Raise the job queueing event.
*
* @param string $queue
* @param \Closure|string|object $job
* @param string $payload
* @param \DateTimeInterface|\DateInterval|int|null $delay
* @return void
*/
protected function raiseJobQueueingEvent($job, $payload)
protected function raiseJobQueueingEvent($queue, $job, $payload, $delay)
{
if ($this->container->bound('events')) {
$this->container['events']->dispatch(new JobQueueing($this->connectionName, $job, $payload));
$delay = ! is_null($delay) ? $this->secondsUntil($delay) : $delay;

$this->container['events']->dispatch(new JobQueueing($this->connectionName, $queue, $job, $payload, $delay));
}
}

/**
* Raise the job queued event.
*
* @param string $queue
* @param string|int|null $jobId
* @param \Closure|string|object $job
* @param string $payload
* @param \DateTimeInterface|\DateInterval|int|null $delay
* @return void
*/
protected function raiseJobQueuedEvent($jobId, $job, $payload)
protected function raiseJobQueuedEvent($queue, $jobId, $job, $payload, $delay)
{
if ($this->container->bound('events')) {
$this->container['events']->dispatch(new JobQueued($this->connectionName, $jobId, $job, $payload));
$delay = ! is_null($delay) ? $this->secondsUntil($delay) : $delay;

$this->container['events']->dispatch(new JobQueued($this->connectionName, $queue, $jobId, $job, $payload, $delay));
}
}

Expand Down

0 comments on commit bdd3ffd

Please sign in to comment.