Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10.x] Add pipe function to Process layer #46527

Merged
merged 11 commits into from
Apr 6, 2023
11 changes: 11 additions & 0 deletions src/Illuminate/Process/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,17 @@ public function pool(callable $callback)
return new Pool($this, $callback);
}

/**
* Start defining a series of piped processes.
*
* @param callable $callback
* @return \Illuminate\Process\Pipe
*/
public function pipe(callable $callback)
{
return new Pipe($this, $callback);
}

/**
* Run a pool of processes and wait for them to finish executing.
*
Expand Down
3 changes: 3 additions & 0 deletions src/Illuminate/Process/PendingProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
use Closure;
use Illuminate\Process\Exceptions\ProcessTimedOutException;
use Illuminate\Support\Str;
use Illuminate\Support\Traits\Conditionable;
use LogicException;
use RuntimeException;
use Symfony\Component\Process\Exception\ProcessTimedOutException as SymfonyTimeoutException;
use Symfony\Component\Process\Process;

class PendingProcess
{
use Conditionable;

/**
* The process factory instance.
*
Expand Down
102 changes: 102 additions & 0 deletions src/Illuminate/Process/Pipe.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
<?php

namespace Illuminate\Process;

use InvalidArgumentException;

/**
* @mixin \Illuminate\Process\Factory
* @mixin \Illuminate\Process\PendingProcess
*/
class Pipe
{
/**
* The process factory instance.
*
* @var \Illuminate\Process\Factory
*/
protected $factory;

/**
* The callback that resolves the pending processes.
*
* @var callable
*/
protected $callback;

/**
* The array of pending processes.
*
* @var array
*/
protected $pendingProcesses = [];

/**
* Create a new series of piped processes.
*
* @param \Illuminate\Process\Factory $factory
* @param callable $callback
* @return void
*/
public function __construct(Factory $factory, callable $callback)
{
$this->factory = $factory;
$this->callback = $callback;
}

/**
* Add a process to the pipe with a key.
*
* @param string $key
* @return \Illuminate\Process\PendingProcess
*/
public function as(string $key)
{
return tap($this->factory->newPendingProcess(), function ($pendingProcess) use ($key) {
$this->pendingProcesses[$key] = $pendingProcess;
});
}

/**
* Runs the processes in the pipe.
*
* @param callable|null $output
* @return \Illuminate\Contracts\Process\ProcessResult
*/
public function run(?callable $output = null)
{
call_user_func($this->callback, $this);

return collect($this->pendingProcesses)
->reduce(function ($previousProcessResult, $pendingProcess, $key) use ($output) {
if (! $pendingProcess instanceof PendingProcess) {
throw new InvalidArgumentException('Process pipe must only contain pending processes.');
}

if ($previousProcessResult && $previousProcessResult->failed()) {
return $previousProcessResult;
}

return $pendingProcess->when(
$previousProcessResult,
fn () => $pendingProcess->input($previousProcessResult->output())
)->run(output: $output ? function ($type, $buffer) use ($key, $output) {
$output($type, $buffer, $key);
} : null);
});
}

/**
* Dynamically proxy methods calls to a new pending process.
*
* @param string $method
* @param array $parameters
* @return \Illuminate\Process\PendingProcess
*/
public function __call($method, $parameters)
{
return tap($this->factory->{$method}(...$parameters), function ($pendingProcess) {
$this->pendingProcesses[] = $pendingProcess;
});
}
}
1 change: 1 addition & 0 deletions src/Illuminate/Support/Facades/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* @method static \Illuminate\Process\Factory assertDidntRun(\Closure|string $callback)
* @method static \Illuminate\Process\Factory assertNothingRan()
* @method static \Illuminate\Process\Pool pool(callable $callback)
* @method static \Illuminate\Process\Pipe pipe(callable $callback)
* @method static \Illuminate\Process\ProcessPoolResults concurrently(callable $callback, callable|null $output = null)
* @method static \Illuminate\Process\PendingProcess newPendingProcess()
* @method static void macro(string $name, object|callable $macro)
Expand Down
38 changes: 38 additions & 0 deletions tests/Process/ProcessTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,44 @@ public function testRealProcessesCanUseStandardInput()
$this->assertSame('foobar', $result->output());
}

public function testProcessPipe()
{
if (windows_os()) {
$this->markTestSkipped('Requires Linux.');
}

$factory = new Factory;
$factory->fake([
'cat *' => "Hello, world\nfoo\nbar",
]);

$pipe = $factory->pipe(function ($pipe) {
$pipe->command('cat test');
$pipe->command('grep -i "foo"');
});

$this->assertSame("foo\n", $pipe->run()->output());
}

public function testProcessPipeFailed()
{
if (windows_os()) {
$this->markTestSkipped('Requires Linux.');
}

$factory = new Factory;
$factory->fake([
'cat *' => $factory->result(exitCode: 1),
]);

$pipe = $factory->pipe(function ($pipe) {
$pipe->command('cat test');
$pipe->command('grep -i "foo"');
});

$this->assertTrue($pipe->run()->failed());
}

public function testFakeInvokedProcessOutputWithLatestOutput()
{
$factory = new Factory;
Expand Down