Skip to content

Commit 09bc48a

Browse files
authored
Merge pull request #25 from boenrobot/eol-processing
Added the ability to process streams that use CR or CRLF as newlines
2 parents 7fc5855 + 3578436 commit 09bc48a

4 files changed

Lines changed: 168 additions & 16 deletions

File tree

src/EventSource.php

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -181,12 +181,13 @@ private function request()
181181

182182
$buffer = '';
183183
$stream->on('data', function ($chunk) use (&$buffer, $stream) {
184-
$buffer .= $chunk;
185-
186-
while (($pos = strpos($buffer, "\n\n")) !== false) {
187-
$data = substr($buffer, 0, $pos);
188-
$buffer = substr($buffer, $pos + 2);
184+
$messageEvents = preg_split(
185+
'/(?:\r\n|\r(?!\n)|\n){2}/S',
186+
$buffer . $chunk
187+
);
188+
$buffer = array_pop($messageEvents);
189189

190+
foreach ($messageEvents as $data) {
190191
$message = MessageEvent::parse($data);
191192
if ($message->lastEventId === null) {
192193
$message->lastEventId = $this->lastEventId;
@@ -196,6 +197,9 @@ private function request()
196197

197198
if ($message->data !== '') {
198199
$this->emit($message->type, array($message));
200+
if ($this->readyState === self::CLOSED) {
201+
break;
202+
}
199203
}
200204
}
201205
});

src/MessageEvent.php

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,30 @@ class MessageEvent
1111
*/
1212
public static function parse($data)
1313
{
14-
$message = new MessageEvent();
15-
16-
preg_match_all('/^([a-z]*)\: ?(.*)/m', $data, $matches, PREG_SET_ORDER);
17-
foreach ($matches as $match) {
18-
if ($match[1] === 'data') {
19-
$message->data .= $match[2] . "\n";
20-
} elseif ($match[1] === 'id') {
21-
$message->lastEventId .= $match[2];
22-
} elseif ($match[1] === 'event') {
23-
$message->type = $match[2];
14+
$message = new self();
15+
16+
$lines = preg_split(
17+
'/\r\n|\r(?!\n)|\n/S',
18+
$data
19+
);
20+
foreach ($lines as $line) {
21+
$name = strstr($line, ':', true);
22+
$value = substr(strstr($line, ':'), 1);
23+
if (isset($value[0]) && $value[0] === ' ') {
24+
$value = substr($value, 1);
25+
}
26+
if ($name === 'data') {
27+
$message->data .= $value . "\n";
28+
} elseif ($name === 'id') {
29+
$message->lastEventId .= $value;
30+
} elseif ($name === 'event') {
31+
$message->type = $value;
2432
}
2533
}
2634

2735
if (substr($message->data, -1) === "\n") {
2836
$message->data = substr($message->data, 0, -1);
2937
}
30-
//$message->data = rtrim($message->data, "\r\n");
3138

3239
return $message;
3340
}

tests/EventSourceTest.php

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,4 +599,117 @@ public function setExpectedException($exception, $exceptionMessage = '', $except
599599
parent::setExpectedException($exception, $exceptionMessage, $exceptionCode);
600600
}
601601
}
602+
603+
public function testSplitMessagesWithCarriageReturn()
604+
{
605+
$deferred = new Deferred();
606+
$browser = $this->getMockBuilder('React\Http\Browser')->disableOriginalConstructor()->getMock();
607+
$browser->expects($this->once())->method('withRejectErrorResponse')->willReturnSelf();
608+
$browser->expects($this->once())->method('requestStreaming')->willReturn($deferred->promise());
609+
610+
$es = new EventSource('http://example.com', $browser);
611+
612+
$stream = new ThroughStream();
613+
$response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream));
614+
$deferred->resolve($response);
615+
616+
$messages = [];
617+
$es->on('message', function ($m) use (&$messages) {
618+
$messages[] = $m;
619+
});
620+
621+
$stream->write("data:hello\r\rdata:hi\r\r");
622+
623+
$expected = ['hello', 'hi'];
624+
$this->assertCount(count($expected), $messages);
625+
foreach ($messages as $i => $message) {
626+
$this->assertInstanceOf('Clue\React\EventSource\MessageEvent', $message);
627+
$this->assertEquals($expected[$i], $message->data);
628+
}
629+
}
630+
631+
public function testSplitMessagesWithWindowsEndOfLineSequence()
632+
{
633+
$deferred = new Deferred();
634+
$browser = $this->getMockBuilder('React\Http\Browser')->disableOriginalConstructor()->getMock();
635+
$browser->expects($this->once())->method('withRejectErrorResponse')->willReturnSelf();
636+
$browser->expects($this->once())->method('requestStreaming')->willReturn($deferred->promise());
637+
638+
$es = new EventSource('http://example.com', $browser);
639+
640+
$stream = new ThroughStream();
641+
$response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream));
642+
$deferred->resolve($response);
643+
644+
$messages = [];
645+
$es->on('message', function ($m) use (&$messages) {
646+
$messages[] = $m;
647+
});
648+
649+
$stream->write("data:hello\r\n\r\ndata:hi\r\n\r\n");
650+
651+
$expected = ['hello', 'hi'];
652+
$this->assertCount(count($expected), $messages);
653+
foreach ($messages as $i => $message) {
654+
$this->assertInstanceOf('Clue\React\EventSource\MessageEvent', $message);
655+
$this->assertEquals($expected[$i], $message->data);
656+
}
657+
}
658+
659+
public function testSplitMessagesWithBufferedWindowsEndOfLineSequence()
660+
{
661+
$deferred = new Deferred();
662+
$browser = $this->getMockBuilder('React\Http\Browser')->disableOriginalConstructor()->getMock();
663+
$browser->expects($this->once())->method('withRejectErrorResponse')->willReturnSelf();
664+
$browser->expects($this->once())->method('requestStreaming')->willReturn($deferred->promise());
665+
666+
$es = new EventSource('http://example.com', $browser);
667+
668+
$stream = new ThroughStream();
669+
$response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream));
670+
$deferred->resolve($response);
671+
672+
$messages = [];
673+
$es->on('message', function ($m) use (&$messages) {
674+
$messages[] = $m;
675+
});
676+
677+
$stream->write("data:hello\r\n\r");
678+
$stream->write("\ndata:hi\r\n\r\n");
679+
680+
$expected = ['hello', 'hi'];
681+
$this->assertCount(count($expected), $messages);
682+
foreach ($messages as $i => $message) {
683+
$this->assertInstanceOf('Clue\React\EventSource\MessageEvent', $message);
684+
$this->assertEquals($expected[$i], $message->data);
685+
}
686+
}
687+
688+
public function testSplitMessagesWithMixedEndOfLine()
689+
{
690+
$deferred = new Deferred();
691+
$browser = $this->getMockBuilder('React\Http\Browser')->disableOriginalConstructor()->getMock();
692+
$browser->expects($this->once())->method('withRejectErrorResponse')->willReturnSelf();
693+
$browser->expects($this->once())->method('requestStreaming')->willReturn($deferred->promise());
694+
695+
$es = new EventSource('http://example.com', $browser);
696+
697+
$stream = new ThroughStream();
698+
$response = new Response(200, array('Content-Type' => 'text/event-stream'), new ReadableBodyStream($stream));
699+
$deferred->resolve($response);
700+
701+
$messages = [];
702+
$es->on('message', function ($m) use (&$messages) {
703+
$messages[] = $m;
704+
});
705+
706+
$stream->write("data: LF CR\n\rdata: CRLF LF\r\n\ndata: CRLF CR\r\n\rdata: LF CRLF\n\r\ndata: CR CRLF\r\r\n");
707+
708+
$expected = ['LF CR', 'CRLF LF', 'CRLF CR', 'LF CRLF', 'CR CRLF'];
709+
$this->assertCount(count($expected), $messages);
710+
foreach ($messages as $i => $message) {
711+
$this->assertInstanceOf('Clue\React\EventSource\MessageEvent', $message);
712+
$this->assertEquals($expected[$i], $message->data);
713+
}
714+
}
602715
}

tests/MessageEventTest.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,38 @@ public function testParseDataOverTwoLinesWillBeCombined()
2121
$this->assertEquals("hello\nworld", $message->data);
2222
}
2323

24+
public function testParseDataOverTwoLinesWithCarrigeReturnsWillBeCombinedWithNewline()
25+
{
26+
$message = MessageEvent::parse("data: hello\rdata: world");
27+
28+
$this->assertEquals("hello\nworld", $message->data);
29+
}
30+
31+
public function testParseDataOverTwoLinesWithCarrigeReturnsAndNewlinesWillBeCombinedWithNewline()
32+
{
33+
$message = MessageEvent::parse("data: hello\r\ndata: world");
34+
35+
$this->assertEquals("hello\nworld", $message->data);
36+
}
37+
2438
public function testParseDataWithTrailingNewlineOverTwoLines()
2539
{
2640
$message = MessageEvent::parse("data: hello\ndata:");
2741

2842
$this->assertEquals("hello\n", $message->data);
2943
}
44+
45+
public function testParseDataWithCarrigeReturnOverTwoLines()
46+
{
47+
$message = MessageEvent::parse("data: hello\rdata:");
48+
49+
$this->assertEquals("hello\n", $message->data);
50+
}
51+
52+
public function testParseDataWithCarrigeReturnAndNewlineOverTwoLines()
53+
{
54+
$message = MessageEvent::parse("data: hello\r\ndata:");
55+
56+
$this->assertEquals("hello\n", $message->data);
57+
}
3058
}

0 commit comments

Comments
 (0)