Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/transport/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ impl ChannelConnection {
&self.inner.addr
}

pub fn get_remote_addr(&self) -> Option<&SipAddr> {
Some(&self.inner.addr)
}

pub async fn serve_loop(&self, sender: TransportSender) -> Result<()> {
let mut incoming = match self.inner.clone().incoming.lock().take() {
Some(incoming) => incoming,
Expand Down
18 changes: 18 additions & 0 deletions src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,24 @@ impl SipConnection {
SipConnection::WebSocketListener(transport) => transport.get_addr(),
}
}

pub fn get_remote_addr(&self) -> Option<&SipAddr> {
match self {
SipConnection::Channel(transport) => transport.get_remote_addr(),
SipConnection::Udp(transport) => transport.get_remote_addr(),
SipConnection::Tcp(transport) => Some(transport.get_remote_addr()),
SipConnection::TcpListener(_) => None,
#[cfg(feature = "rustls")]
SipConnection::Tls(transport) => Some(transport.get_remote_addr()),
#[cfg(feature = "rustls")]
SipConnection::TlsListener(_) => None,
#[cfg(feature = "websocket")]
SipConnection::WebSocket(transport) => Some(transport.get_remote_addr()),
#[cfg(feature = "websocket")]
SipConnection::WebSocketListener(_) => None,
}
}

pub async fn send(&self, msg: SipMessage, destination: Option<&SipAddr>) -> Result<()> {
match self {
SipConnection::Channel(transport) => transport.send(msg).await,
Expand Down
1 change: 1 addition & 0 deletions src/transport/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ where
#[async_trait::async_trait]
pub trait StreamConnection: Send + Sync + 'static {
fn get_addr(&self) -> &SipAddr;
fn get_remote_addr(&self) -> &SipAddr;
async fn send_message(&self, msg: SipMessage) -> Result<()>;
async fn send_raw(&self, data: &[u8]) -> Result<()>;
async fn serve_loop(&self, sender: TransportSender) -> Result<()>;
Expand Down
4 changes: 4 additions & 0 deletions src/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ impl TcpConnection {
#[async_trait::async_trait]
impl StreamConnection for TcpConnection {
fn get_addr(&self) -> &SipAddr {
&self.inner.local_addr
}

fn get_remote_addr(&self) -> &SipAddr {
&self.inner.remote_addr
}

Expand Down
5 changes: 1 addition & 4 deletions src/transport/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ impl TcpListenerConnection {
socket.bind(&local.into())?;
socket.listen(128)?;
let listener = TcpListener::from_std(socket.into())?;
let listener_local_addr = SipAddr {
r#type: Some(crate::sip::transport::Transport::Tcp),
addr: listener.local_addr()?.into(),
};
let listener_local_addr = self.get_addr().clone();
tokio::spawn(async move {
loop {
let (stream, remote_addr) = match listener.accept().await {
Expand Down
1 change: 1 addition & 0 deletions src/transport/tests/test_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async fn test_udp_recv_sip_message() -> Result<()> {
assert!(msg.is_request());
assert_eq!(from, peer_alice.get_addr().to_owned());
assert_eq!(connection.get_addr(), peer_bob.get_addr());
assert_eq!(connection.get_remote_addr(), Some(peer_alice.get_addr()));
}
_ => {
assert!(false, "unexpected event");
Expand Down
16 changes: 11 additions & 5 deletions src/transport/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ impl TlsListenerConnection {
let listener = TcpListener::from_std(socket.into())?;
let (acceptor, resolver) = Self::create_acceptor(&self.inner.config).await?;
*self.inner.cert_resolver.lock() = Some(resolver);
let listener_local_addr = self.get_addr().clone();

tokio::spawn(async move {
loop {
Expand All @@ -333,6 +334,7 @@ impl TlsListenerConnection {

let acceptor_clone = acceptor.clone();
let transport_layer_inner_ref = transport_layer_inner.clone();
let local_addr = listener_local_addr.clone();

tokio::spawn(async move {
// Perform TLS handshake
Expand All @@ -352,6 +354,7 @@ impl TlsListenerConnection {
// Create TLS connection
let tls_connection = match TlsConnection::from_server_stream(
tls_stream,
local_addr,
remote_sip_addr.clone(),
Some(transport_layer_inner_ref.cancel_token.child_token()),
)
Expand Down Expand Up @@ -600,14 +603,10 @@ impl TlsConnection {
// Create TLS connection from existing server TLS stream
pub async fn from_server_stream(
stream: TlsServerStream,
local_addr: SipAddr,
remote_addr: SipAddr,
cancel_token: Option<CancellationToken>,
) -> Result<Self> {
let local_addr = SipAddr {
r#type: Some(crate::sip::transport::Transport::Tls),
addr: stream.get_ref().0.local_addr()?.into(),
};

// Split stream into read and write halves
let (read_half, write_half) = tokio::io::split(stream);

Expand Down Expand Up @@ -640,6 +639,13 @@ impl TlsConnection {
#[async_trait::async_trait]
impl StreamConnection for TlsConnection {
fn get_addr(&self) -> &SipAddr {
match &self.inner {
TlsConnectionInner::Client(inner) => &inner.local_addr,
TlsConnectionInner::Server(inner) => &inner.local_addr,
}
}

fn get_remote_addr(&self) -> &SipAddr {
match &self.inner {
TlsConnectionInner::Client(inner) => &inner.remote_addr,
TlsConnectionInner::Server(inner) => &inner.remote_addr,
Expand Down
38 changes: 18 additions & 20 deletions src/transport/transport_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,9 @@ impl TransportLayer {
.iter()
.map(|t| t.get_addr().to_owned())
.collect();
// Also include local addresses from TCP/TLS client connections.
// For connection-oriented transports, get_addr() returns the remote address
// (used for lookup), but Via/Contact headers need the local address.
let connections = self.inner.connections.read();
for conn in connections.values() {
match conn {
SipConnection::Tcp(tcp) => {
addrs.push(tcp.inner.local_addr.clone());
}
// TLS inner is private — skip for now
SipConnection::Tls(_) => {}
_ => {}
}
addrs.push(conn.get_addr().clone());
}
addrs
}
Expand Down Expand Up @@ -259,10 +249,14 @@ impl TransportLayerInner {
}

pub(super) fn add_connection(&self, connection: SipConnection) {
let mut connections = self.connections.write();
connections.insert(connection.get_addr().to_owned(), connection.clone());
drop(connections);
self.serve_connection(connection);
if let Some(remote_addr) = connection.get_remote_addr().cloned() {
let mut connections = self.connections.write();
connections.insert(remote_addr, connection.clone());
drop(connections);
self.serve_connection(connection);
} else {
warn!(addr = %connection.get_addr(), "connection has no remote address");
}
}

pub(super) fn del_connection(&self, addr: &SipAddr) {
Expand Down Expand Up @@ -392,14 +386,18 @@ impl TransportLayerInner {
pub fn serve_connection(&self, transport: SipConnection) {
let sub_token = self.cancel_token.child_token();
let sender_clone = self.transport_tx.clone();
info!(addr=%transport.get_addr(), "serve_connection: starting serve_loop");
let remote_addr = transport
.get_remote_addr()
.map(|addr| addr.to_string())
.unwrap_or_else(|| "-".to_string());
info!(addr=%transport.get_addr(), remote=%remote_addr, "serve_connection: starting serve_loop");
tokio::spawn(async move {
match sender_clone.send(TransportEvent::New(transport.clone())) {
Ok(()) => {
info!(addr=%transport.get_addr(), "serve_connection: New event sent");
info!(addr=%transport.get_addr(), remote=%remote_addr, "serve_connection: New event sent");
}
Err(e) => {
warn!(addr=%transport.get_addr(), error = ?e, "Error sending new connection event");
warn!(addr=%transport.get_addr(), remote=%remote_addr, error = ?e, "Error sending new connection event");
return;
}
}
Expand All @@ -409,11 +407,11 @@ impl TransportLayerInner {
transport.serve_loop(sender_clone.clone()).await
} => {
if let Err(e) = result {
warn!(addr=%transport.get_addr(), error = %e, "serve_loop error");
warn!(addr=%transport.get_addr(), remote=%remote_addr, error = %e, "serve_loop error");
}
}
}
info!(addr=%transport.get_addr(), "transport serve_loop exited");
info!(addr=%transport.get_addr(), remote=%remote_addr, "transport serve_loop exited");
transport.close().await.ok();
sender_clone.send(TransportEvent::Closed(transport)).ok();
});
Expand Down
25 changes: 20 additions & 5 deletions src/transport/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct UdpInner {
#[derive(Clone)]
pub struct UdpConnection {
pub external: Option<SipAddr>,
remote: Option<SipAddr>,
cancel_token: Option<CancellationToken>,
inner: Arc<UdpInner>,
}
Expand All @@ -36,6 +37,7 @@ impl UdpConnection {
r#type: Some(crate::sip::transport::Transport::Udp),
addr: SipConnection::resolve_bind_address(addr).into(),
}),
remote: None,
inner: Arc::new(inner),
cancel_token,
}
Expand Down Expand Up @@ -72,6 +74,7 @@ impl UdpConnection {
r#type: Some(crate::sip::transport::Transport::Udp),
addr: addr.into(),
}),
remote: None,
inner: Arc::new(UdpInner { addr, conn }),
cancel_token,
};
Expand Down Expand Up @@ -181,13 +184,20 @@ impl UdpConnection {

debug!(len, src=%addr, dest=%self.get_addr(), raw_message=undecoded, "udp received");

let from = SipAddr {
r#type: Some(crate::sip::transport::Transport::Udp),
addr: addr.into(),
};

sender.send(TransportEvent::Incoming(
msg,
SipConnection::Udp(self.clone()),
SipAddr {
r#type: Some(crate::sip::transport::Transport::Udp),
addr: addr.into(),
},
SipConnection::Udp(Self {
external: self.external.clone(),
remote: Some(from.clone()),
cancel_token: self.cancel_token.clone(),
inner: self.inner.clone(),
}),
from,
))?;
}
}
Expand Down Expand Up @@ -244,6 +254,11 @@ impl UdpConnection {
&self.inner.addr
}
}

pub fn get_remote_addr(&self) -> Option<&SipAddr> {
self.remote.as_ref()
}

pub fn cancel_token(&self) -> Option<CancellationToken> {
self.cancel_token.clone()
}
Expand Down
23 changes: 22 additions & 1 deletion src/transport/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl WebSocketListenerConnection {
} else {
crate::sip::transport::Transport::Ws
};
let listener_local_addr = self.get_addr().clone();

debug!(local = %self.inner.local_addr, "Starting WebSocket listener");
tokio::spawn(async move {
Expand All @@ -116,6 +117,7 @@ impl WebSocketListenerConnection {
addr: remote_addr.into(),
};
let transport_layer_inner_ref = transport_layer_inner.clone();
let local_addr = listener_local_addr.clone();
tokio::spawn(async move {
// Wrap the TCP stream in MaybeTlsStream
let maybe_tls_stream = MaybeTlsStream::Plain(stream);
Expand Down Expand Up @@ -152,6 +154,7 @@ impl WebSocketListenerConnection {
let (ws_sink, ws_read) = ws_stream.split();
let connection = WebSocketConnection {
inner: Arc::new(WebSocketInner {
local_addr,
remote_addr,
ws_sink: Mutex::new(ws_sink),
ws_read: Mutex::new(Some(ws_read)),
Expand Down Expand Up @@ -195,6 +198,7 @@ impl fmt::Debug for WebSocketListenerConnection {
}

pub struct WebSocketInner {
pub local_addr: SipAddr,
pub remote_addr: SipAddr,
pub ws_sink: Mutex<WsSink>,
pub ws_read: Mutex<Option<WsRead>>,
Expand Down Expand Up @@ -230,10 +234,19 @@ impl WebSocketConnection {
.insert("sec-websocket-protocol", "sip".parse().unwrap());

let (ws_stream, _) = connect_async(request).await?;
let local_addr = SipAddr {
r#type: Some(
remote
.r#type
.unwrap_or(crate::sip::transport::Transport::Ws),
),
addr: ws_stream.get_ref().get_ref().local_addr()?.into(),
};
let (ws_sink, ws_stream) = ws_stream.split();

let connection = WebSocketConnection {
inner: Arc::new(WebSocketInner {
local_addr,
remote_addr: remote.clone(),
ws_sink: Mutex::new(ws_sink),
ws_read: Mutex::new(Some(ws_stream)),
Expand All @@ -257,6 +270,10 @@ impl WebSocketConnection {
#[async_trait::async_trait]
impl StreamConnection for WebSocketConnection {
fn get_addr(&self) -> &SipAddr {
&self.inner.local_addr
}

fn get_remote_addr(&self) -> &SipAddr {
&self.inner.remote_addr
}

Expand Down Expand Up @@ -372,7 +389,11 @@ impl fmt::Display for WebSocketConnection {
Some(crate::sip::transport::Transport::Wss) => "WSS",
_ => "WS",
};
write!(f, "{} {}", transport, self.inner.remote_addr)
write!(
f,
"{} {} -> {}",
transport, self.inner.local_addr, self.inner.remote_addr
)
}
}

Expand Down
Loading