diff --git a/src/Illuminate/Queue/Failed/FileFailedJobProvider.php b/src/Illuminate/Queue/Failed/FileFailedJobProvider.php index 5a107b969db9..e07d858554f3 100644 --- a/src/Illuminate/Queue/Failed/FileFailedJobProvider.php +++ b/src/Illuminate/Queue/Failed/FileFailedJobProvider.php @@ -2,7 +2,9 @@ namespace Illuminate\Queue\Failed; +use Closure; use DateTimeInterface; +use Illuminate\Contracts\Cache\LockProvider; use Illuminate\Support\Facades\Date; class FileFailedJobProvider implements FailedJobProviderInterface, PrunableFailedJobProvider @@ -21,17 +23,26 @@ class FileFailedJobProvider implements FailedJobProviderInterface, PrunableFaile */ protected $limit; + /** + * The lock provider resolver. + * + * @var \Closure + */ + protected $lockProviderResolver; + /** * Create a new database failed job provider. * * @param string $path * @param int $limit + * @param \Closure|null $lockProviderResolver * @return void */ - public function __construct($path, $limit = 100) + public function __construct($path, $limit = 100, ?Closure $lockProviderResolver = null) { $this->path = $path; $this->limit = $limit; + $this->lockProviderResolver = $lockProviderResolver; } /** @@ -45,23 +56,27 @@ public function __construct($path, $limit = 100) */ public function log($connection, $queue, $payload, $exception) { - $id = json_decode($payload, true)['uuid']; + return $this->lock(function () use ($connection, $queue, $payload, $exception) { + $id = json_decode($payload, true)['uuid']; - $jobs = $this->read(); + $jobs = $this->read(); - $failedAt = Date::now(); + $failedAt = Date::now(); - array_unshift($jobs, [ - 'id' => $id, - 'connection' => $connection, - 'queue' => $queue, - 'payload' => $payload, - 'exception' => (string) mb_convert_encoding($exception, 'UTF-8'), - 'failed_at' => $failedAt->format('Y-m-d H:i:s'), - 'failed_at_timestamp' => $failedAt->getTimestamp(), - ]); + array_unshift($jobs, [ + 'id' => $id, + 'connection' => $connection, + 'queue' => $queue, + 'payload' => $payload, + 'exception' => (string) mb_convert_encoding($exception, 'UTF-8'), + 'failed_at' => $failedAt->format('Y-m-d H:i:s'), + 'failed_at_timestamp' => $failedAt->getTimestamp(), + ]); - $this->write(array_slice($jobs, 0, $this->limit)); + $this->write(array_slice($jobs, 0, $this->limit)); + + return $id; + }); } /** @@ -94,12 +109,14 @@ public function find($id) */ public function forget($id) { - $this->write($pruned = collect($jobs = $this->read()) - ->reject(fn ($job) => $job->id === $id) - ->values() - ->all()); - - return count($jobs) !== count($pruned); + return $this->lock(function () use ($id) { + $this->write($pruned = collect($jobs = $this->read()) + ->reject(fn ($job) => $job->id === $id) + ->values() + ->all()); + + return count($jobs) !== count($pruned); + }); } /** @@ -121,13 +138,34 @@ public function flush($hours = null) */ public function prune(DateTimeInterface $before) { - $jobs = $this->read(); + return $this->lock(function () use ($before) { + $jobs = $this->read(); + + $this->write($prunedJobs = collect($jobs)->reject(function ($job) use ($before) { + return $job->failed_at_timestamp <= $before->getTimestamp(); + })->values()->all()); - $this->write($prunedJobs = collect($jobs)->reject(function ($job) use ($before) { - return $job->failed_at_timestamp <= $before->getTimestamp(); - })->values()->all()); + return count($jobs) - count($prunedJobs); + }); + } + + /** + * Execute the given callback while holding a lock. + * + * @param \Closure $callback + * @return mixed + */ + protected function lock(Closure $callback) + { + if (! $this->lockProviderResolver) { + return $callback(); + } - return count($jobs) - count($prunedJobs); + return ($this->lockProviderResolver)() + ->lock('laravel-failed-jobs', 5) + ->block(10, function () use ($callback) { + return $callback(); + }); } /** diff --git a/src/Illuminate/Queue/QueueServiceProvider.php b/src/Illuminate/Queue/QueueServiceProvider.php index a915f3f62a09..8c8ab21a6081 100755 --- a/src/Illuminate/Queue/QueueServiceProvider.php +++ b/src/Illuminate/Queue/QueueServiceProvider.php @@ -255,6 +255,7 @@ protected function registerFailedJobServices() return new FileFailedJobProvider( $config['path'] ?? $this->app->storagePath('framework/cache/failed-jobs.json'), $config['limit'] ?? 100, + fn () => $app['cache']->store('file'), ); } elseif (isset($config['driver']) && $config['driver'] === 'dynamodb') { return $this->dynamoFailedJobProvider($config);