Skip to content

Commit

Permalink
Fix #54 : Added new stream events that indicate end of reading/writing
Browse files Browse the repository at this point in the history
  • Loading branch information
khelle committed Dec 18, 2016
1 parent d848178 commit dfb19e4
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 13 deletions.
3 changes: 2 additions & 1 deletion src/Stream/AsyncStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public function close()
public function handleWrite()
{
$text = $this->buffer->peek();
$sent = fwrite($this->resource, $text);
$sent = fwrite($this->resource, $text, $this->bufferSize);

if ($sent === false)
{
Expand All @@ -209,6 +209,7 @@ public function handleWrite()
$this->loop->removeWriteStream($this->resource);
$this->listening = false;
$this->emit('drain', [ $this ]);
$this->emit('finish', [ $this ]);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/Stream/AsyncStreamWriter.php
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public function close()
public function handleWrite()
{
$text = $this->buffer->peek();
$sent = fwrite($this->resource, $text);
$sent = fwrite($this->resource, $text, $this->bufferSize);

if ($sent === false)
{
Expand All @@ -202,6 +202,7 @@ public function handleWrite()
$this->loop->removeWriteStream($this->resource);
$this->listening = false;
$this->emit('drain', [ $this ]);
$this->emit('finish', [ $this ]);
}

if ($this->closing)
Expand Down
6 changes: 6 additions & 0 deletions src/Stream/Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public function write($text)
}

$this->emit('drain', [ $this ]);
$this->emit('finish', [ $this ]);

return true;
}
Expand Down Expand Up @@ -141,6 +142,11 @@ public function read($length = null)
else if ($ret !== '')
{
$this->emit('data', [ $this, $ret ]);

if (strlen($ret) < $length)
{
$this->emit('end', [ $this ]);
}
}

return $ret;
Expand Down
5 changes: 5 additions & 0 deletions src/Stream/StreamReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public function read($length = null)
else if ($ret !== '')
{
$this->emit('data', [ $this, $ret ]);

if (strlen($ret) < $length)
{
$this->emit('end', [ $this ]);
}
}

return $ret;
Expand Down
1 change: 1 addition & 0 deletions src/Stream/StreamReaderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

/**
* @event data : callable(object, string)
* @event end : callable(object)
*/
interface StreamReaderInterface extends EventEmitterInterface, StreamSeekerInterface
{
Expand Down
1 change: 1 addition & 0 deletions src/Stream/StreamWriter.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public function write($text)
}

$this->emit('drain', [ $this ]);
$this->emit('finish', [ $this ]);

return true;
}
Expand Down
3 changes: 2 additions & 1 deletion src/Stream/StreamWriterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
use Kraken\Event\EventEmitterInterface;

/**
* @event drain : callable(object)
* @event drain : callable(object)
* @event finish : callable(object)
*/
interface StreamWriterInterface extends EventEmitterInterface, StreamSeekerInterface
{
Expand Down
4 changes: 4 additions & 0 deletions test/_Module/Stream/AsyncStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public function testAsyncStream_WritesAndReadsDataCorrectly($readerClass, $write
$sim->expect('data', $data);
$conn->close();
});
$reader->on('end', $this->expectCallableOnce());
$reader->on('drain', $this->expectCallableNever());
$reader->on('finish', $this->expectCallableNever());
$reader->on('error', $this->expectCallableNever());
$reader->on('close', function() use($sim, &$cnt) {
$sim->expect('close');
Expand All @@ -56,10 +58,12 @@ public function testAsyncStream_WritesAndReadsDataCorrectly($readerClass, $write
$loop
);
$writer->on('data', $this->expectCallableNever());
$writer->on('end', $this->expectCallableNever());
$writer->on('drain', function(AsyncStreamWriterInterface $writer) use($sim) {
$sim->expect('drain');
$writer->close();
});
$writer->on('finish', $this->expectCallableOnce());
$writer->on('error', $this->expectCallableNever());
$writer->on('close', function() use($sim, &$cnt) {
$sim->expect('close');
Expand Down
2 changes: 2 additions & 0 deletions test/_Module/Stream/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ public function testStream_WriteAndReadDataScenario()
$reader->on('data', function($origin, $data) use(&$capturedData) {
$capturedData = $data;
});
$reader->on('end', $this->expectCallableOnce());
$reader->on('error', $this->expectCallableNever());
$reader->on('close', $this->expectCallableOnce());

$writer->on('drain', $this->expectCallableOnce());
$writer->on('finish', $this->expectCallableOnce());
$writer->on('error', $this->expectCallableNever());
$writer->on('close', $this->expectCallableOnce());

Expand Down
12 changes: 10 additions & 2 deletions test/_Unit/Stream/AsyncStreamReaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

namespace Kraken\_Unit\Stream;

use Kraken\Loop\Loop;
use Kraken\Loop\LoopInterface;
use Kraken\Loop\Model\SelectLoop;
use Kraken\Stream\AsyncStreamReader;

class AsyncStreamReaderTest extends StreamSeekerTest
{
public function testApiRead_ReadsDataProperly()
{
$stream = $this->createAsyncStreamReaderMock();
$loop = new Loop(new SelectLoop);
$stream = $this->createAsyncStreamReaderMock(null, $loop);
$resource = $stream->getResource();

$expectedData = "foobar\n";
Expand All @@ -20,10 +23,15 @@ public function testApiRead_ReadsDataProperly()
$capturedOrigin = $origin;
$capturedData = $data;
});
$stream->on('end', $this->expectCallableOnce());

fwrite($resource, $expectedData);
rewind($resource);
$stream->handleData($stream->getResource());

$loop->addTimer(1e-1, function() use($loop) {
$loop->stop();
});
$loop->start();

$this->assertSame($expectedData, $capturedData);
$this->assertSame($stream, $capturedOrigin);
Expand Down
22 changes: 16 additions & 6 deletions test/_Unit/Stream/AsyncStreamWriterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,34 @@

namespace Kraken\_Unit\Stream;

use Kraken\Loop\Model\SelectLoop;
use Kraken\Loop\Loop;
use Kraken\Loop\LoopInterface;
use Kraken\Stream\AsyncStreamWriter;

class AsyncStreamWriterTest extends StreamSeekerTest
{
public function testApiWrite_WritesDataProperly()
{
$stream = $this->createAsyncStreamWriterMock();
$loop = new Loop(new SelectLoop);
$stream = $this->createAsyncStreamWriterMock(null, $loop);
$resource = $stream->getResource();

$expectedData = "foobar\n";
$expectedData = str_repeat('X', (int) $stream->getBufferSize()*1.5);

$stream->on('drain', $this->expectCallableOnce());
$stream->on('drain', $this->expectCallableTwice());
$stream->on('finish', $this->expectCallableOnce());

$stream->write($expectedData);
$stream->rewind();
$stream->write(substr($expectedData, 0, 1024));
$stream->write(substr($expectedData, 1024));

$loop->addTimer(1e-1, function() use($loop) {
$loop->stop();
});
$loop->start();

$this->assertSame($expectedData, fread($resource, $stream->getBufferSize()));
$stream->rewind();
$this->assertSame($expectedData, fread($resource, (int) $stream->getBufferSize()*2));
}

/**
Expand Down
1 change: 1 addition & 0 deletions test/_Unit/Stream/StreamReaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public function testApiRead_ReadsDataCorrectly()
$capturedOrigin = $origin;
$capturedData = $data;
});
$stream->on('end', $this->expectCallableOnce());

fwrite($resource, $expectedData);
rewind($resource);
Expand Down
1 change: 1 addition & 0 deletions test/_Unit/Stream/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public function testApiRead_ReadsDataCorrectly()
$capturedOrigin = $origin;
$capturedData = $data;
});
$stream->on('end', $this->expectCallableOnce());

fwrite($resource, $expectedData);
rewind($resource);
Expand Down
6 changes: 4 additions & 2 deletions test/_Unit/Stream/StreamWriterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ public function testApiWrite_WritesDataCorrectly()
$expectedData = "foobar\n";
$capturedData = null;

$stream->on('drain', $this->expectCallableOnce());
$stream->on('drain', $this->expectCallableTwice());
$stream->on('finish', $this->expectCallableTwice());

$stream->write($expectedData);
$stream->write(substr($expectedData, 0, 2));
$stream->write(substr($expectedData, 2));
$stream->rewind();

$this->assertSame($expectedData, fread($resource, $stream->getBufferSize()));
Expand Down

0 comments on commit dfb19e4

Please sign in to comment.