88use Interop \Queue \ConnectionFactory ;
99use Interop \Queue \Context ;
1010use Stomp \Network \Connection ;
11+ use Stomp \Network \Observer \HeartbeatEmitter ;
12+ use Stomp \Network \Observer \ServerAliveObserver ;
1113
1214class StompConnectionFactory implements ConnectionFactory
1315{
@@ -88,11 +90,22 @@ private function establishConnection(): BufferedStompClient
8890 $ scheme = (true === $ config ['ssl_on ' ]) ? 'ssl ' : 'tcp ' ;
8991 $ uri = $ scheme .':// ' .$ config ['host ' ].': ' .$ config ['port ' ];
9092 $ connection = new Connection ($ uri , $ config ['connection_timeout ' ]);
93+ $ connection ->setWriteTimeout ($ config ['write_timeout ' ]);
94+ $ connection ->setReadTimeout ($ config ['read_timeout ' ]);
95+
96+ if ($ config ['send_heartbeat ' ]) {
97+ $ connection ->getObservers ()->addObserver (new HeartbeatEmitter ($ connection ));
98+ }
99+
100+ if ($ config ['receive_heartbeat ' ]) {
101+ $ connection ->getObservers ()->addObserver (new ServerAliveObserver ());
102+ }
91103
92104 $ this ->stomp = new BufferedStompClient ($ connection , $ config ['buffer_size ' ]);
93105 $ this ->stomp ->setLogin ($ config ['login ' ], $ config ['password ' ]);
94106 $ this ->stomp ->setVhostname ($ config ['vhost ' ]);
95107 $ this ->stomp ->setSync ($ config ['sync ' ]);
108+ $ this ->stomp ->setHeartbeat ($ config ['send_heartbeat ' ], $ config ['receive_heartbeat ' ]);
96109
97110 $ this ->stomp ->connect ();
98111 }
@@ -128,6 +141,10 @@ private function parseDsn(string $dsn): array
128141 'sync ' => $ dsn ->getBool ('sync ' ),
129142 'lazy ' => $ dsn ->getBool ('lazy ' ),
130143 'ssl_on ' => $ dsn ->getBool ('ssl_on ' ),
144+ 'write_timeout ' => $ dsn ->getDecimal ('write_timeout ' ),
145+ 'read_timeout ' => $ dsn ->getDecimal ('read_timeout ' ),
146+ 'send_heartbeat ' => $ dsn ->getDecimal ('send_heartbeat ' ),
147+ 'receive_heartbeat ' => $ dsn ->getDecimal ('receive_heartbeat ' ),
131148 ]), function ($ value ) { return null !== $ value ; });
132149 }
133150
@@ -145,6 +162,10 @@ private function defaultConfig(): array
145162 'sync ' => false ,
146163 'lazy ' => true ,
147164 'ssl_on ' => false ,
165+ 'write_timeout ' => 3 ,
166+ 'read_timeout ' => 60 ,
167+ 'send_heartbeat ' => 0 ,
168+ 'receive_heartbeat ' => 0 ,
148169 ];
149170 }
150171}
0 commit comments