Skip to content

Commit 1d05de7

Browse files
committed
add additional configuration
1 parent 34a0f6a commit 1d05de7

3 files changed

Lines changed: 89 additions & 0 deletions

File tree

pkg/stomp/StompConnectionFactory.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
use Interop\Queue\ConnectionFactory;
99
use Interop\Queue\Context;
1010
use Stomp\Network\Connection;
11+
use Stomp\Network\Observer\HeartbeatEmitter;
12+
use Stomp\Network\Observer\ServerAliveObserver;
1113

1214
class StompConnectionFactory implements ConnectionFactory
1315
{
@@ -88,11 +90,22 @@ private function establishConnection(): BufferedStompClient
8890
$scheme = (true === $config['ssl_on']) ? 'ssl' : 'tcp';
8991
$uri = $scheme.'://'.$config['host'].':'.$config['port'];
9092
$connection = new Connection($uri, $config['connection_timeout']);
93+
$connection->setWriteTimeout($config['write_timeout']);
94+
$connection->setReadTimeout($config['read_timeout']);
95+
96+
if ($config['send_heartbeat']) {
97+
$connection->getObservers()->addObserver(new HeartbeatEmitter($connection));
98+
}
99+
100+
if ($config['receive_heartbeat']) {
101+
$connection->getObservers()->addObserver(new ServerAliveObserver());
102+
}
91103

92104
$this->stomp = new BufferedStompClient($connection, $config['buffer_size']);
93105
$this->stomp->setLogin($config['login'], $config['password']);
94106
$this->stomp->setVhostname($config['vhost']);
95107
$this->stomp->setSync($config['sync']);
108+
$this->stomp->setHeartbeat($config['send_heartbeat'], $config['receive_heartbeat']);
96109

97110
$this->stomp->connect();
98111
}
@@ -128,6 +141,10 @@ private function parseDsn(string $dsn): array
128141
'sync' => $dsn->getBool('sync'),
129142
'lazy' => $dsn->getBool('lazy'),
130143
'ssl_on' => $dsn->getBool('ssl_on'),
144+
'write_timeout' => $dsn->getDecimal('write_timeout'),
145+
'read_timeout' => $dsn->getDecimal('read_timeout'),
146+
'send_heartbeat' => $dsn->getDecimal('send_heartbeat'),
147+
'receive_heartbeat' => $dsn->getDecimal('receive_heartbeat'),
131148
]), function ($value) { return null !== $value; });
132149
}
133150

@@ -145,6 +162,10 @@ private function defaultConfig(): array
145162
'sync' => false,
146163
'lazy' => true,
147164
'ssl_on' => false,
165+
'write_timeout' => 3,
166+
'read_timeout' => 60,
167+
'send_heartbeat' => 0,
168+
'receive_heartbeat' => 0,
148169
];
149170
}
150171
}

pkg/stomp/Tests/StompConnectionFactoryConfigTest.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ public static function provideConfigs()
6666
'sync' => false,
6767
'lazy' => true,
6868
'ssl_on' => false,
69+
'write_timeout' => 3,
70+
'read_timeout' => 60,
71+
'send_heartbeat' => 0,
72+
'receive_heartbeat' => 0,
6973
],
7074
];
7175

@@ -83,6 +87,10 @@ public static function provideConfigs()
8387
'sync' => false,
8488
'lazy' => true,
8589
'ssl_on' => false,
90+
'write_timeout' => 3,
91+
'read_timeout' => 60,
92+
'send_heartbeat' => 0,
93+
'receive_heartbeat' => 0,
8694
],
8795
];
8896

@@ -100,6 +108,10 @@ public static function provideConfigs()
100108
'sync' => false,
101109
'lazy' => true,
102110
'ssl_on' => false,
111+
'write_timeout' => 3,
112+
'read_timeout' => 60,
113+
'send_heartbeat' => 0,
114+
'receive_heartbeat' => 0,
103115
],
104116
];
105117

@@ -118,6 +130,10 @@ public static function provideConfigs()
118130
'lazy' => false,
119131
'foo' => 'bar',
120132
'ssl_on' => false,
133+
'write_timeout' => 3,
134+
'read_timeout' => 60,
135+
'send_heartbeat' => 0,
136+
'receive_heartbeat' => 0,
121137
],
122138
];
123139

@@ -136,6 +152,10 @@ public static function provideConfigs()
136152
'lazy' => false,
137153
'foo' => 'bar',
138154
'ssl_on' => false,
155+
'write_timeout' => 3,
156+
'read_timeout' => 60,
157+
'send_heartbeat' => 0,
158+
'receive_heartbeat' => 0,
139159
],
140160
];
141161

@@ -154,6 +174,10 @@ public static function provideConfigs()
154174
'lazy' => false,
155175
'foo' => 'bar',
156176
'ssl_on' => false,
177+
'write_timeout' => 3,
178+
'read_timeout' => 60,
179+
'send_heartbeat' => 0,
180+
'receive_heartbeat' => 0,
157181
],
158182
];
159183

@@ -173,6 +197,10 @@ public static function provideConfigs()
173197
'foo' => 'bar',
174198
'ssl_on' => false,
175199
'baz' => 'bazVal',
200+
'write_timeout' => 3,
201+
'read_timeout' => 60,
202+
'send_heartbeat' => 0,
203+
'receive_heartbeat' => 0,
176204
],
177205
];
178206

@@ -190,6 +218,10 @@ public static function provideConfigs()
190218
'sync' => false,
191219
'lazy' => true,
192220
'ssl_on' => false,
221+
'write_timeout' => 3,
222+
'read_timeout' => 60,
223+
'send_heartbeat' => 0,
224+
'receive_heartbeat' => 0,
193225
],
194226
];
195227

@@ -208,6 +240,10 @@ public static function provideConfigs()
208240
'lazy' => true,
209241
'foo' => 'bar',
210242
'ssl_on' => false,
243+
'write_timeout' => 3,
244+
'read_timeout' => 60,
245+
'send_heartbeat' => 0,
246+
'receive_heartbeat' => 0,
211247
],
212248
];
213249
}

pkg/stomp/Tests/StompConnectionFactoryTest.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
use Enqueue\Stomp\StompContext;
77
use Enqueue\Test\ClassExtensionTrait;
88
use Interop\Queue\ConnectionFactory;
9+
use Stomp\Network\Observer\Exception\HeartbeatException;
10+
use Stomp\Network\Observer\HeartbeatEmitter;
11+
use Stomp\Network\Observer\ServerAliveObserver;
912

1013
class StompConnectionFactoryTest extends \PHPUnit\Framework\TestCase
1114
{
@@ -52,4 +55,33 @@ public function testShouldCreateActiveMQContext()
5255
$this->assertAttributeEquals(null, 'stomp', $context);
5356
$this->assertAttributeEquals(false, 'useExchangePrefix', $context);
5457
}
58+
59+
public function testShouldNotCreateConnectionWithSendHeartbeat()
60+
{
61+
$factory = new StompConnectionFactory(['send_heartbeat' => 2000, 'host' => 'activemq']);
62+
$this->expectException(HeartbeatException::class);
63+
$factory->createContext()->getStomp();
64+
}
65+
66+
public function testShouldCreateConnectionWithSendHeartbeat()
67+
{
68+
$factory = new StompConnectionFactory(['send_heartbeat' => 2000, 'host' => 'activemq', 'read_timeout' => 1]);
69+
$context = $factory->createContext();
70+
71+
$observers = $context->getStomp()->getConnection()->getObservers()->getObservers();
72+
$this->assertAttributeEquals([2000, 0], 'heartbeat', $context->getStomp());
73+
$this->assertCount(1, $observers);
74+
$this->assertInstanceOf(HeartbeatEmitter::class, $observers[0]);
75+
}
76+
77+
public function testShouldCreateConnectionWithReceiveHeartbeat()
78+
{
79+
$factory = new StompConnectionFactory(['receive_heartbeat' => 2000, 'host' => 'activemq']);
80+
$context = $factory->createContext();
81+
82+
$observers = $context->getStomp()->getConnection()->getObservers()->getObservers();
83+
$this->assertAttributeEquals([0, 2000], 'heartbeat', $context->getStomp());
84+
$this->assertCount(1, $observers);
85+
$this->assertInstanceOf(ServerAliveObserver::class, $observers[0]);
86+
}
5587
}

0 commit comments

Comments
 (0)