From 84bbbb24e2b5b89ddceeb41f593b88abca0dfda3 Mon Sep 17 00:00:00 2001 From: yeoleobun Date: Thu, 7 May 2026 17:05:13 +0800 Subject: [PATCH 1/2] fix: separate local and remote transport addresses --- src/transport/channel.rs | 4 ++++ src/transport/connection.rs | 18 +++++++++++++++ src/transport/stream.rs | 1 + src/transport/tcp.rs | 4 ++++ src/transport/tcp_listener.rs | 5 +---- src/transport/tests/test_udp.rs | 1 + src/transport/tls.rs | 16 +++++++++----- src/transport/transport_layer.rs | 38 +++++++++++++++----------------- src/transport/udp.rs | 25 ++++++++++++++++----- src/transport/websocket.rs | 19 +++++++++++++++- 10 files changed, 96 insertions(+), 35 deletions(-) diff --git a/src/transport/channel.rs b/src/transport/channel.rs index 2bfb84e9..90a9f0df 100644 --- a/src/transport/channel.rs +++ b/src/transport/channel.rs @@ -90,6 +90,10 @@ impl ChannelConnection { &self.inner.addr } + pub fn get_remote_addr(&self) -> Option<&SipAddr> { + Some(&self.inner.addr) + } + pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> { let mut incoming = match self.inner.clone().incoming.lock().take() { Some(incoming) => incoming, diff --git a/src/transport/connection.rs b/src/transport/connection.rs index af82b321..2988cce3 100644 --- a/src/transport/connection.rs +++ b/src/transport/connection.rs @@ -209,6 +209,24 @@ impl SipConnection { SipConnection::WebSocketListener(transport) => transport.get_addr(), } } + + pub fn get_remote_addr(&self) -> Option<&SipAddr> { + match self { + SipConnection::Channel(transport) => transport.get_remote_addr(), + SipConnection::Udp(transport) => transport.get_remote_addr(), + SipConnection::Tcp(transport) => Some(transport.get_remote_addr()), + SipConnection::TcpListener(_) => None, + #[cfg(feature = "rustls")] + SipConnection::Tls(transport) => Some(transport.get_remote_addr()), + #[cfg(feature = "rustls")] + SipConnection::TlsListener(_) => None, + #[cfg(feature = "websocket")] + SipConnection::WebSocket(transport) => Some(transport.get_remote_addr()), + #[cfg(feature = "websocket")] + SipConnection::WebSocketListener(_) => None, + } + } + pub async fn send(&self, msg: SipMessage, destination: Option<&SipAddr>) -> Result<()> { match self { SipConnection::Channel(transport) => transport.send(msg).await, diff --git a/src/transport/stream.rs b/src/transport/stream.rs index deef1a8c..fdbeaea2 100644 --- a/src/transport/stream.rs +++ b/src/transport/stream.rs @@ -253,6 +253,7 @@ where #[async_trait::async_trait] pub trait StreamConnection: Send + Sync + 'static { fn get_addr(&self) -> &SipAddr; + fn get_remote_addr(&self) -> &SipAddr; async fn send_message(&self, msg: SipMessage) -> Result<()>; async fn send_raw(&self, data: &[u8]) -> Result<()>; async fn serve_loop(&self, sender: TransportSender) -> Result<()>; diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index 566d35d5..e74dbb6e 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -96,6 +96,10 @@ impl TcpConnection { #[async_trait::async_trait] impl StreamConnection for TcpConnection { fn get_addr(&self) -> &SipAddr { + &self.inner.local_addr + } + + fn get_remote_addr(&self) -> &SipAddr { &self.inner.remote_addr } diff --git a/src/transport/tcp_listener.rs b/src/transport/tcp_listener.rs index 00f04983..92deb09e 100644 --- a/src/transport/tcp_listener.rs +++ b/src/transport/tcp_listener.rs @@ -50,10 +50,7 @@ impl TcpListenerConnection { socket.bind(&local.into())?; socket.listen(128)?; let listener = TcpListener::from_std(socket.into())?; - let listener_local_addr = SipAddr { - r#type: Some(crate::sip::transport::Transport::Tcp), - addr: listener.local_addr()?.into(), - }; + let listener_local_addr = self.get_addr().clone(); tokio::spawn(async move { loop { let (stream, remote_addr) = match listener.accept().await { diff --git a/src/transport/tests/test_udp.rs b/src/transport/tests/test_udp.rs index 039b3264..0f844121 100644 --- a/src/transport/tests/test_udp.rs +++ b/src/transport/tests/test_udp.rs @@ -73,6 +73,7 @@ async fn test_udp_recv_sip_message() -> Result<()> { assert!(msg.is_request()); assert_eq!(from, peer_alice.get_addr().to_owned()); assert_eq!(connection.get_addr(), peer_bob.get_addr()); + assert_eq!(connection.get_remote_addr(), Some(peer_alice.get_addr())); } _ => { assert!(false, "unexpected event"); diff --git a/src/transport/tls.rs b/src/transport/tls.rs index a76940c6..3b2643a2 100644 --- a/src/transport/tls.rs +++ b/src/transport/tls.rs @@ -316,6 +316,7 @@ impl TlsListenerConnection { let listener = TcpListener::from_std(socket.into())?; let (acceptor, resolver) = Self::create_acceptor(&self.inner.config).await?; *self.inner.cert_resolver.lock() = Some(resolver); + let listener_local_addr = self.get_addr().clone(); tokio::spawn(async move { loop { @@ -333,6 +334,7 @@ impl TlsListenerConnection { let acceptor_clone = acceptor.clone(); let transport_layer_inner_ref = transport_layer_inner.clone(); + let local_addr = listener_local_addr.clone(); tokio::spawn(async move { // Perform TLS handshake @@ -352,6 +354,7 @@ impl TlsListenerConnection { // Create TLS connection let tls_connection = match TlsConnection::from_server_stream( tls_stream, + local_addr, remote_sip_addr.clone(), Some(transport_layer_inner_ref.cancel_token.child_token()), ) @@ -600,14 +603,10 @@ impl TlsConnection { // Create TLS connection from existing server TLS stream pub async fn from_server_stream( stream: TlsServerStream, + local_addr: SipAddr, remote_addr: SipAddr, cancel_token: Option, ) -> Result { - let local_addr = SipAddr { - r#type: Some(crate::sip::transport::Transport::Tls), - addr: stream.get_ref().0.local_addr()?.into(), - }; - // Split stream into read and write halves let (read_half, write_half) = tokio::io::split(stream); @@ -640,6 +639,13 @@ impl TlsConnection { #[async_trait::async_trait] impl StreamConnection for TlsConnection { fn get_addr(&self) -> &SipAddr { + match &self.inner { + TlsConnectionInner::Client(inner) => &inner.local_addr, + TlsConnectionInner::Server(inner) => &inner.local_addr, + } + } + + fn get_remote_addr(&self) -> &SipAddr { match &self.inner { TlsConnectionInner::Client(inner) => &inner.remote_addr, TlsConnectionInner::Server(inner) => &inner.remote_addr, diff --git a/src/transport/transport_layer.rs b/src/transport/transport_layer.rs index 935a5898..406febe0 100644 --- a/src/transport/transport_layer.rs +++ b/src/transport/transport_layer.rs @@ -188,19 +188,9 @@ impl TransportLayer { .iter() .map(|t| t.get_addr().to_owned()) .collect(); - // Also include local addresses from TCP/TLS client connections. - // For connection-oriented transports, get_addr() returns the remote address - // (used for lookup), but Via/Contact headers need the local address. let connections = self.inner.connections.read(); for conn in connections.values() { - match conn { - SipConnection::Tcp(tcp) => { - addrs.push(tcp.inner.local_addr.clone()); - } - // TLS inner is private — skip for now - SipConnection::Tls(_) => {} - _ => {} - } + addrs.push(conn.get_addr().clone()); } addrs } @@ -259,10 +249,14 @@ impl TransportLayerInner { } pub(super) fn add_connection(&self, connection: SipConnection) { - let mut connections = self.connections.write(); - connections.insert(connection.get_addr().to_owned(), connection.clone()); - drop(connections); - self.serve_connection(connection); + if let Some(remote_addr) = connection.get_remote_addr().cloned() { + let mut connections = self.connections.write(); + connections.insert(remote_addr, connection.clone()); + drop(connections); + self.serve_connection(connection); + } else { + warn!(addr = %connection.get_addr(), "connection has no remote address"); + } } pub(super) fn del_connection(&self, addr: &SipAddr) { @@ -392,14 +386,18 @@ impl TransportLayerInner { pub fn serve_connection(&self, transport: SipConnection) { let sub_token = self.cancel_token.child_token(); let sender_clone = self.transport_tx.clone(); - info!(addr=%transport.get_addr(), "serve_connection: starting serve_loop"); + let remote_addr = transport + .get_remote_addr() + .map(|addr| addr.to_string()) + .unwrap_or_else(|| "-".to_string()); + info!(addr=%transport.get_addr(), remote=%remote_addr, "serve_connection: starting serve_loop"); tokio::spawn(async move { match sender_clone.send(TransportEvent::New(transport.clone())) { Ok(()) => { - info!(addr=%transport.get_addr(), "serve_connection: New event sent"); + info!(addr=%transport.get_addr(), remote=%remote_addr, "serve_connection: New event sent"); } Err(e) => { - warn!(addr=%transport.get_addr(), error = ?e, "Error sending new connection event"); + warn!(addr=%transport.get_addr(), remote=%remote_addr, error = ?e, "Error sending new connection event"); return; } } @@ -409,11 +407,11 @@ impl TransportLayerInner { transport.serve_loop(sender_clone.clone()).await } => { if let Err(e) = result { - warn!(addr=%transport.get_addr(), error = %e, "serve_loop error"); + warn!(addr=%transport.get_addr(), remote=%remote_addr, error = %e, "serve_loop error"); } } } - info!(addr=%transport.get_addr(), "transport serve_loop exited"); + info!(addr=%transport.get_addr(), remote=%remote_addr, "transport serve_loop exited"); transport.close().await.ok(); sender_clone.send(TransportEvent::Closed(transport)).ok(); }); diff --git a/src/transport/udp.rs b/src/transport/udp.rs index 5e2ff5b1..74e3b92e 100644 --- a/src/transport/udp.rs +++ b/src/transport/udp.rs @@ -21,6 +21,7 @@ pub struct UdpInner { #[derive(Clone)] pub struct UdpConnection { pub external: Option, + remote: Option, cancel_token: Option, inner: Arc, } @@ -36,6 +37,7 @@ impl UdpConnection { r#type: Some(crate::sip::transport::Transport::Udp), addr: SipConnection::resolve_bind_address(addr).into(), }), + remote: None, inner: Arc::new(inner), cancel_token, } @@ -72,6 +74,7 @@ impl UdpConnection { r#type: Some(crate::sip::transport::Transport::Udp), addr: addr.into(), }), + remote: None, inner: Arc::new(UdpInner { addr, conn }), cancel_token, }; @@ -181,13 +184,20 @@ impl UdpConnection { debug!(len, src=%addr, dest=%self.get_addr(), raw_message=undecoded, "udp received"); + let from = SipAddr { + r#type: Some(crate::sip::transport::Transport::Udp), + addr: addr.into(), + }; + sender.send(TransportEvent::Incoming( msg, - SipConnection::Udp(self.clone()), - SipAddr { - r#type: Some(crate::sip::transport::Transport::Udp), - addr: addr.into(), - }, + SipConnection::Udp(Self { + external: self.external.clone(), + remote: Some(from.clone()), + cancel_token: self.cancel_token.clone(), + inner: self.inner.clone(), + }), + from, ))?; } } @@ -244,6 +254,11 @@ impl UdpConnection { &self.inner.addr } } + + pub fn get_remote_addr(&self) -> Option<&SipAddr> { + self.remote.as_ref() + } + pub fn cancel_token(&self) -> Option { self.cancel_token.clone() } diff --git a/src/transport/websocket.rs b/src/transport/websocket.rs index c87cd95d..bede028f 100644 --- a/src/transport/websocket.rs +++ b/src/transport/websocket.rs @@ -93,6 +93,7 @@ impl WebSocketListenerConnection { } else { crate::sip::transport::Transport::Ws }; + let listener_local_addr = self.get_addr().clone(); debug!(local = %self.inner.local_addr, "Starting WebSocket listener"); tokio::spawn(async move { @@ -116,6 +117,7 @@ impl WebSocketListenerConnection { addr: remote_addr.into(), }; let transport_layer_inner_ref = transport_layer_inner.clone(); + let local_addr = listener_local_addr.clone(); tokio::spawn(async move { // Wrap the TCP stream in MaybeTlsStream let maybe_tls_stream = MaybeTlsStream::Plain(stream); @@ -152,6 +154,7 @@ impl WebSocketListenerConnection { let (ws_sink, ws_read) = ws_stream.split(); let connection = WebSocketConnection { inner: Arc::new(WebSocketInner { + local_addr, remote_addr, ws_sink: Mutex::new(ws_sink), ws_read: Mutex::new(Some(ws_read)), @@ -195,6 +198,7 @@ impl fmt::Debug for WebSocketListenerConnection { } pub struct WebSocketInner { + pub local_addr: SipAddr, pub remote_addr: SipAddr, pub ws_sink: Mutex, pub ws_read: Mutex>, @@ -230,10 +234,15 @@ impl WebSocketConnection { .insert("sec-websocket-protocol", "sip".parse().unwrap()); let (ws_stream, _) = connect_async(request).await?; + let local_addr = SipAddr { + r#type: Some(remote.r#type.unwrap_or(crate::sip::transport::Transport::Ws)), + addr: ws_stream.get_ref().get_ref().local_addr()?.into(), + }; let (ws_sink, ws_stream) = ws_stream.split(); let connection = WebSocketConnection { inner: Arc::new(WebSocketInner { + local_addr, remote_addr: remote.clone(), ws_sink: Mutex::new(ws_sink), ws_read: Mutex::new(Some(ws_stream)), @@ -257,6 +266,10 @@ impl WebSocketConnection { #[async_trait::async_trait] impl StreamConnection for WebSocketConnection { fn get_addr(&self) -> &SipAddr { + &self.inner.local_addr + } + + fn get_remote_addr(&self) -> &SipAddr { &self.inner.remote_addr } @@ -372,7 +385,11 @@ impl fmt::Display for WebSocketConnection { Some(crate::sip::transport::Transport::Wss) => "WSS", _ => "WS", }; - write!(f, "{} {}", transport, self.inner.remote_addr) + write!( + f, + "{} {} -> {}", + transport, self.inner.local_addr, self.inner.remote_addr + ) } } From b38fd9624b0e286cf1ab7cf122ca4713bb299258 Mon Sep 17 00:00:00 2001 From: yeoleobun Date: Thu, 7 May 2026 17:17:37 +0800 Subject: [PATCH 2/2] fix: fmt checking --- src/transport/websocket.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/transport/websocket.rs b/src/transport/websocket.rs index bede028f..ab13c5fd 100644 --- a/src/transport/websocket.rs +++ b/src/transport/websocket.rs @@ -235,7 +235,11 @@ impl WebSocketConnection { let (ws_stream, _) = connect_async(request).await?; let local_addr = SipAddr { - r#type: Some(remote.r#type.unwrap_or(crate::sip::transport::Transport::Ws)), + r#type: Some( + remote + .r#type + .unwrap_or(crate::sip::transport::Transport::Ws), + ), addr: ws_stream.get_ref().get_ref().local_addr()?.into(), }; let (ws_sink, ws_stream) = ws_stream.split();