diff --git a/Cargo.lock b/Cargo.lock index b16200b..5f85f35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,31 @@ dependencies = [ "syn", ] +[[package]] +name = "agent-client-protocol-http" +version = "0.11.1" +dependencies = [ + "agent-client-protocol", + "agent-client-protocol-test", + "async-stream", + "axum", + "eventsource-stream", + "futures", + "jsonrpcmsg", + "reqwest 0.12.28", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-tungstenite", + "tokio-util", + "tower-http", + "tracing", + "tracing-subscriber", + "url", + "uuid", +] + [[package]] name = "agent-client-protocol-rmcp" version = "0.11.1" @@ -204,7 +229,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -215,7 +240,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -251,7 +276,7 @@ dependencies = [ "polling", "rustix", "slab", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -298,7 +323,7 @@ dependencies = [ "rustix", "signal-hook-registry", "slab", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -359,6 +384,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" dependencies = [ "axum-core", + "axum-macros", + "base64", "bytes", "form_urlencoded", "futures-util", @@ -377,8 +404,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -404,6 +433,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aa268c23bfbbd2c4363b9cd302a4f504fb2a9dfe7e3451d66f35dd392e20aca" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "base64" version = "0.22.1" @@ -416,6 +456,15 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "blocking" version = "1.6.2" @@ -452,9 +501,9 @@ checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" [[package]] name = "cc" -version = "1.2.62" +version = "1.2.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" +checksum = "d16d90359e986641506914ba71350897565610e87ce0ad9e6f28569db3dd5c6d" dependencies = [ "find-msvc-tools", "shlex", @@ -556,12 +605,31 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "darling" version = "0.23.0" @@ -596,6 +664,12 @@ dependencies = [ "syn", ] +[[package]] +name = "data-encoding" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8" + [[package]] name = "deranged" version = "0.5.8" @@ -629,6 +703,16 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -665,7 +749,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -689,6 +773,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "expect-test" version = "1.5.1" @@ -846,6 +941,43 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "wasi", + "wasm-bindgen", +] + +[[package]] +name = "getrandom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "r-efi 5.3.0", + "wasip2", + "wasm-bindgen", +] + [[package]] name = "getrandom" version = "0.4.2" @@ -854,7 +986,7 @@ checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" dependencies = [ "cfg-if", "libc", - "r-efi", + "r-efi 6.0.0", "wasip2", "wasip3", ] @@ -964,6 +1096,22 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ca68d021ef39cf6463ab54c1d0f5daf03377b70561305bb89a8f83aab66e0f" +dependencies = [ + "http", + "hyper", + "hyper-util", + "rustls", + "tokio", + "tokio-rustls", + "tower-service", + "webpki-roots", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -1155,6 +1303,16 @@ version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" +[[package]] +name = "iri-string" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25e659a4bb38e810ebc252e53b5814ff908a8c58c2a9ce2fae1bbec24cbf4e20" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is-docker" version = "0.2.0" @@ -1253,6 +1411,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "matchers" version = "0.2.0" @@ -1280,6 +1444,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "mio" version = "1.2.0" @@ -1288,14 +1458,14 @@ checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] name = "nix" -version = "0.31.3" +version = "0.31.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf20d2fde8ff38632c426f1165ed7436270b44f199fc55284c38276f9db47c3d" +checksum = "5d6d0705320c1e6ba1d912b5e37cf18071b6c2e9b7fa8215a1e8a7651966f5d3" dependencies = [ "bitflags", "cfg-if", @@ -1303,13 +1473,23 @@ dependencies = [ "libc", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1399,18 +1579,18 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" [[package]] name = "pin-project" -version = "1.1.12" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf0d9e68100b3a7989b4901972f265cd542e560a3a8a724e1e20322f4d06ce9" +checksum = "f1749c7ed4bcaf4c3d0a3efc28538844fb29bcdd7d2b67b2be7e20ba861ff517" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.12" +version = "1.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a990e22f43e84855daf260dded30524ef4a9021cc7541c26540500a50b624389" +checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", @@ -1445,7 +1625,7 @@ dependencies = [ "hermit-abi", "pin-project-lite", "rustix", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1463,6 +1643,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "prettyplease" version = "0.2.37" @@ -1496,6 +1685,61 @@ dependencies = [ "windows", ] +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2", + "thiserror", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +dependencies = [ + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.60.2", +] + [[package]] name = "quote" version = "1.0.45" @@ -1505,12 +1749,47 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + [[package]] name = "r-efi" version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" +[[package]] +name = "rand" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" +dependencies = [ + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76afc826de14238e6e8c374ddcc1fa19e374fd8dd986b0d2af0d02377261d83c" +dependencies = [ + "getrandom 0.3.4", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -1569,6 +1848,47 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tokio-util", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams 0.4.2", + "web-sys", + "webpki-roots", +] + [[package]] name = "reqwest" version = "0.13.3" @@ -1599,10 +1919,24 @@ dependencies = [ "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", + "wasm-streams 0.5.0", "web-sys", ] +[[package]] +name = "ring" +version = "0.17.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" +dependencies = [ + "cc", + "cfg-if", + "getrandom 0.2.17", + "libc", + "untrusted", + "windows-sys 0.52.0", +] + [[package]] name = "rmcp" version = "1.6.0" @@ -1617,7 +1951,7 @@ dependencies = [ "pastey", "pin-project-lite", "process-wrap", - "reqwest", + "reqwest 0.13.3", "rmcp-macros", "schemars 1.2.1", "serde", @@ -1668,7 +2002,42 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.61.2", +] + +[[package]] +name = "rustls" +version = "0.23.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" +dependencies = [ + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9" +dependencies = [ + "web-time", + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", ] [[package]] @@ -1842,6 +2211,17 @@ dependencies = [ "syn", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1892,7 +2272,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1950,6 +2330,12 @@ dependencies = [ "syn", ] +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "2.0.117" @@ -2068,9 +2454,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.52.3" +version = "1.52.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fc7f01b389ac15039e4dc9531aa973a135d7a4135281b12d7c1bc79fd57fffe" +checksum = "110a78583f19d5cdb2c5ccf321d1290344e71313c6c37d43520d386027d18386" dependencies = [ "bytes", "libc", @@ -2080,7 +2466,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -2094,6 +2480,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.18" @@ -2105,6 +2501,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f72a05e828585856dacd553fba484c242c46e391fb0e58917c942ee9202915c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.18" @@ -2137,20 +2545,20 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.10" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68d6fdd9f81c2819c9a8b0e0cd91660e7746a8e6ea2ba7c6b2b057985f6bcb51" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "bitflags", "bytes", "futures-util", "http", "http-body", + "iri-string", "pin-project-lite", "tower", "tower-layer", "tower-service", - "url", ] [[package]] @@ -2246,6 +2654,28 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c01152af293afb9c7c2a57e4b559c5620b421f6d133261c60dd2d0cdb38e6b8" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", +] + +[[package]] +name = "typenum" +version = "1.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de" + [[package]] name = "unicode-ident" version = "1.0.24" @@ -2264,6 +2694,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" version = "2.5.8" @@ -2294,7 +2730,7 @@ version = "1.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" dependencies = [ - "getrandom", + "getrandom 0.4.2", "js-sys", "wasm-bindgen", ] @@ -2305,6 +2741,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "vte" version = "0.14.1" @@ -2424,6 +2866,19 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-streams" version = "0.5.0" @@ -2459,6 +2914,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki-roots" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52f5ee44c96cf55f1b349600768e3ece3a8f26010c05265ab73f945bb1a2eb9d" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "windows" version = "0.62.2" @@ -2560,6 +3034,24 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets 0.53.5", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -2569,6 +3061,39 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", + "windows_i686_gnullvm 0.52.6", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm 0.53.1", + "windows_aarch64_msvc 0.53.1", + "windows_i686_gnu 0.53.1", + "windows_i686_gnullvm 0.53.1", + "windows_i686_msvc 0.53.1", + "windows_x86_64_gnu 0.53.1", + "windows_x86_64_gnullvm 0.53.1", + "windows_x86_64_msvc 0.53.1", +] + [[package]] name = "windows-threading" version = "0.2.1" @@ -2578,6 +3103,102 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "wit-bindgen" version = "0.51.0" @@ -2701,6 +3322,26 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zerocopy" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eed437bf9d6692032087e337407a86f04cd8d6a16a37199ed57949d415bd68e9" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e3cd084b1788766f53af483dd21f93881ff30d7320490ec3ef7526d203bad4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zerofrom" version = "0.1.7" @@ -2722,6 +3363,12 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zeroize" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" + [[package]] name = "zerotrie" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 3bc03a6..7c558f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "src/agent-client-protocol-conductor", "src/agent-client-protocol-cookbook", "src/agent-client-protocol-derive", + "src/agent-client-protocol-http", "src/agent-client-protocol-rmcp", "src/agent-client-protocol-test", "src/agent-client-protocol-trace-viewer", @@ -23,6 +24,7 @@ homepage = "https://github.com/agentclientprotocol/rust-sdk" agent-client-protocol = { path = "src/agent-client-protocol", version = "0.11.1" } agent-client-protocol-conductor = { path = "src/agent-client-protocol-conductor", version = "0.11.1" } agent-client-protocol-derive = { path = "src/agent-client-protocol-derive", version = "0.11.0" } +agent-client-protocol-http = { path = "src/agent-client-protocol-http", version = "0.11.1" } agent-client-protocol-rmcp = { path = "src/agent-client-protocol-rmcp", version = "0.11.1" } agent-client-protocol-test = { path = "src/agent-client-protocol-test" } agent-client-protocol-trace-viewer = { path = "src/agent-client-protocol-trace-viewer", version = "0.11.0" } @@ -57,6 +59,9 @@ clap = { version = "4.5", features = ["derive"] } # HTTP axum = "0.8" +reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "json"] } +eventsource-stream = "0.2" +url = "2.5" async-process = "2" async-stream = "0.3.6" blocking = "1" diff --git a/md/SUMMARY.md b/md/SUMMARY.md index 7ee0349..b749914 100644 --- a/md/SUMMARY.md +++ b/md/SUMMARY.md @@ -7,6 +7,10 @@ - [Design Overview](./design.md) - [Protocol Reference](./protocol.md) +# Transports + +- [HTTP / WebSocket Transport](./http-transport.md) + # Conductor (agent-client-protocol-conductor) - [Conductor Design](./conductor.md) diff --git a/md/http-transport.md b/md/http-transport.md new file mode 100644 index 0000000..80e9cb3 --- /dev/null +++ b/md/http-transport.md @@ -0,0 +1,36 @@ +# HTTP / WebSocket Transport + +`agent-client-protocol-http` exposes ACP agents over one `/acp` endpoint. + +- `POST /acp` with `initialize` creates a connection and returns `Acp-Connection-Id`. +- Later `POST /acp` requests include `Acp-Connection-Id`; session-scoped requests also include `Acp-Session-Id` or `params.sessionId`. +- `GET /acp` with `Accept: text/event-stream` streams agent messages over SSE. +- `GET /acp` with a WebSocket upgrade uses text frames for JSON-RPC messages. +- `DELETE /acp` tears down the connection. + +## Server + +```rust +use agent_client_protocol_http::AcpHttpServer; + +let app = AcpHttpServer::new(|| my_agent()).into_router(); +let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?; +axum::serve(listener, app).await?; +``` + +## Client + +```rust +use agent_client_protocol_http::HttpClient; + +let transport = HttpClient::new("http://127.0.0.1:8080")?; +my_client().connect_to(transport).await?; +``` + +The same `HttpClient` also speaks WebSocket — pass a `ws://` or `wss://` URL +and it will open a single bidirectional connection instead of using POST + SSE: + +```rust +let transport = HttpClient::new("ws://127.0.0.1:8080")?; +my_client().connect_to(transport).await?; +``` diff --git a/md/introduction.md b/md/introduction.md index 220bdc5..dcced41 100644 --- a/md/introduction.md +++ b/md/introduction.md @@ -22,6 +22,7 @@ The `agent-client-protocol` crate includes a [`concepts`](https://docs.rs/agent- src/ ├── agent-client-protocol/ # Core protocol SDK ├── agent-client-protocol-tokio/ # Tokio utilities (process spawning) +├── agent-client-protocol-http/ # HTTP/SSE/WebSocket transport ├── agent-client-protocol-rmcp/ # Integration with rmcp crate ├── agent-client-protocol-cookbook/ # Usage patterns (rendered as rustdoc) ├── agent-client-protocol-derive/ # Proc macros @@ -37,11 +38,13 @@ src/ graph TD acp[agent-client-protocol
Core SDK] tokio[agent-client-protocol-tokio
Process spawning] + http[agent-client-protocol-http
HTTP/SSE/WebSocket transport] rmcp[agent-client-protocol-rmcp
rmcp integration] conductor[agent-client-protocol-conductor
Proxy orchestration] cookbook[agent-client-protocol-cookbook
Usage patterns] tokio --> acp + http --> acp rmcp --> acp conductor --> acp conductor --> tokio diff --git a/src/agent-client-protocol-http/CHANGELOG.md b/src/agent-client-protocol-http/CHANGELOG.md new file mode 100644 index 0000000..865bde2 --- /dev/null +++ b/src/agent-client-protocol-http/CHANGELOG.md @@ -0,0 +1,6 @@ +# Changelog + +## [Unreleased] + +### Added +- HTTP/SSE/WebSocket transport for ACP agents diff --git a/src/agent-client-protocol-http/Cargo.toml b/src/agent-client-protocol-http/Cargo.toml new file mode 100644 index 0000000..1990127 --- /dev/null +++ b/src/agent-client-protocol-http/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "agent-client-protocol-http" +version = "0.11.1" +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description = "HTTP and WebSocket transport for the Agent Client Protocol (ACP)" +keywords = ["acp", "agent", "protocol", "http", "websocket"] +categories = ["development-tools", "web-programming::http-server"] + +[dependencies] +agent-client-protocol.workspace = true +jsonrpcmsg.workspace = true + +futures.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio = { workspace = true, features = ["sync", "rt", "macros", "time"] } +tokio-util.workspace = true +tracing.workspace = true +thiserror.workspace = true + +# Server +axum = { workspace = true, features = ["ws", "macros"] } +tower-http = { version = "0.6", features = ["cors"] } +async-stream = { workspace = true } +uuid = { workspace = true } + +# Client +reqwest = { workspace = true, features = ["stream"] } +eventsource-stream = { workspace = true } +url = { workspace = true } +tokio-tungstenite = "0.29" + +[dev-dependencies] +agent-client-protocol = { workspace = true } +agent-client-protocol-test.workspace = true +tokio = { workspace = true, features = ["full"] } +tracing-subscriber.workspace = true + +[lints] +workspace = true diff --git a/src/agent-client-protocol-http/README.md b/src/agent-client-protocol-http/README.md new file mode 100644 index 0000000..496e29e --- /dev/null +++ b/src/agent-client-protocol-http/README.md @@ -0,0 +1,8 @@ +# agent-client-protocol-http + +HTTP/WebSocket transport for ACP agents. + +- **Server**: `AcpHttpServer` exposes agents over HTTP + SSE with optional WebSocket upgrade +- **Client**: `HttpClient` connects to remote agents over HTTP + SSE + +See the [documentation](https://docs.rs/agent-client-protocol-http) for usage examples. diff --git a/src/agent-client-protocol-http/src/client.rs b/src/agent-client-protocol-http/src/client.rs new file mode 100644 index 0000000..2be1b30 --- /dev/null +++ b/src/agent-client-protocol-http/src/client.rs @@ -0,0 +1,408 @@ +use std::{collections::HashSet, sync::Arc}; + +use agent_client_protocol::{Agent, Channel, Client, ConnectTo, Error as AcpError, jsonrpcmsg}; +use futures::{SinkExt, StreamExt, future::BoxFuture}; +use jsonrpcmsg::Message; +use thiserror::Error; +use tokio::sync::Mutex; +use tokio_tungstenite::tungstenite::Message as WsMessage; +use tracing::{debug, error, trace, warn}; + +use crate::protocol::{ + HEADER_CONNECTION_ID, HEADER_SESSION_ID, is_initialize_request, method_requires_session_header, + session_id_from_params, +}; + +#[derive(Debug, Error)] +pub enum HttpClientError { + #[error("invalid URL: {0}")] + InvalidUrl(#[from] url::ParseError), + #[error("failed to build HTTP client: {0}")] + Reqwest(#[from] reqwest::Error), +} + +pub struct HttpClient { + endpoint: url::Url, + http: reqwest::Client, +} + +impl std::fmt::Debug for HttpClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HttpClient") + .field("endpoint", &self.endpoint.as_str()) + .finish_non_exhaustive() + } +} + +impl HttpClient { + pub fn new(base_url: impl AsRef) -> Result { + Self::with_client(base_url, reqwest::Client::new()) + } + + pub fn with_client( + base_url: impl AsRef, + http: reqwest::Client, + ) -> Result { + let mut endpoint = url::Url::parse(base_url.as_ref())?; + let path = endpoint.path().trim_end_matches('/').to_string(); + let path = if path.is_empty() { + "/acp".to_string() + } else if path.ends_with("/acp") { + path + } else { + format!("{path}/acp") + }; + endpoint.set_path(&path); + Ok(Self { endpoint, http }) + } + + fn is_websocket(&self) -> bool { + matches!(self.endpoint.scheme(), "ws" | "wss") + } +} + +impl ConnectTo for HttpClient { + async fn connect_to(self, client: impl ConnectTo) -> Result<(), AcpError> { + let (channel, transport) = ConnectTo::::into_channel_and_future(self); + match futures::future::select( + std::pin::pin!(client.connect_to(channel)), + std::pin::pin!(transport), + ) + .await + { + futures::future::Either::Left((result, _)) + | futures::future::Either::Right((result, _)) => result, + } + } + + fn into_channel_and_future(self) -> (Channel, BoxFuture<'static, Result<(), AcpError>>) { + let (caller, transport) = Channel::duplex(); + (caller, Box::pin(run(self, transport))) + } +} + +async fn run(client: HttpClient, channel: Channel) -> Result<(), AcpError> { + if client.is_websocket() { + return run_ws(client, channel).await; + } + let HttpClient { endpoint, http } = client; + let Channel { + rx: mut outgoing, + tx: incoming, + } = channel; + let (open_session_tx, mut open_session_rx) = futures::channel::mpsc::unbounded(); + let state = Arc::new(ClientState { + endpoint, + http, + connection_id: Mutex::new(None), + open_session_streams: Mutex::new(HashSet::new()), + incoming, + open_session_tx, + }); + let mut sse_tasks = Vec::new(); + + loop { + let msg = tokio::select! { + msg = outgoing.next() => match msg { + Some(Ok(msg)) => msg, + Some(Err(e)) => { + error!("upstream channel produced error: {e}"); + return Err(e); + } + None => break, + }, + Some(session_id) = open_session_rx.next() => { + spawn_sse(state.clone(), Some(session_id), &mut sse_tasks); + continue; + } + }; + + if state.connection_id.lock().await.is_none() { + if !is_initialize_request(&msg) { + return Err(AcpError::invalid_request() + .data("ACP HTTP transport: first message must be `initialize`")); + } + if let Err(e) = state.initialize(msg).await { + error!("initialize failed: {e}"); + return Err(AcpError::internal_error().data(format!("initialize: {e}"))); + } + spawn_sse(state.clone(), None, &mut sse_tasks); + continue; + } + + if let Message::Request(req) = &msg { + if let Some(session_id) = req.params.as_ref().and_then(session_id_from_params) { + if state + .open_session_streams + .lock() + .await + .insert(session_id.clone()) + { + spawn_sse(state.clone(), Some(session_id), &mut sse_tasks); + } + } + } + + if let Err(e) = state.post(msg).await { + error!("POST failed: {e}"); + return Err(AcpError::internal_error().data(format!("POST: {e}"))); + } + } + + state.delete().await; + for task in sse_tasks { + task.abort(); + } + Ok(()) +} + +fn spawn_sse( + state: Arc, + session_id: Option, + tasks: &mut Vec>, +) { + tasks.push(tokio::spawn(async move { + let label = session_id.clone(); + if let Err(e) = state.sse(session_id).await { + warn!(session_id = ?label, "SSE stream ended: {e}"); + } + })); +} + +struct ClientState { + endpoint: url::Url, + http: reqwest::Client, + connection_id: Mutex>, + open_session_streams: Mutex>, + incoming: futures::channel::mpsc::UnboundedSender>, + open_session_tx: futures::channel::mpsc::UnboundedSender, +} + +impl ClientState { + async fn initialize(&self, msg: Message) -> Result<(), String> { + let response = self + .http + .post(self.endpoint.clone()) + .header("Content-Type", "application/json") + .header("Accept", "application/json") + .json(&msg) + .send() + .await + .map_err(|e| e.to_string())?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(format!("HTTP {status}: {body}")); + } + + let connection_id = response + .headers() + .get(HEADER_CONNECTION_ID) + .and_then(|v| v.to_str().ok()) + .map(String::from) + .ok_or_else(|| format!("server did not return {HEADER_CONNECTION_ID} header"))?; + let message = response + .json::() + .await + .map_err(|e| e.to_string())?; + + *self.connection_id.lock().await = Some(connection_id); + self.deliver(message); + Ok(()) + } + + async fn post(&self, msg: Message) -> Result<(), String> { + let session_id = match &msg { + Message::Request(req) => { + let session_id = req.params.as_ref().and_then(session_id_from_params); + if method_requires_session_header(&req.method) && session_id.is_none() { + return Err(format!( + "method `{}` requires sessionId in params", + req.method + )); + } + session_id + } + Message::Response(_) => None, + }; + let connection_id = self + .connection_id + .lock() + .await + .clone() + .ok_or_else(|| "POST attempted before initialize".to_string())?; + let mut request = self + .http + .post(self.endpoint.clone()) + .header("Accept", "application/json") + .header(HEADER_CONNECTION_ID, connection_id) + .json(&msg); + if let Some(session_id) = session_id { + request = request.header(HEADER_SESSION_ID, session_id); + } + + let response = request.send().await.map_err(|e| e.to_string())?; + if response.status().as_u16() != 202 && !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(format!("HTTP {status}: {body}")); + } + Ok(()) + } + + async fn delete(&self) { + let Some(connection_id) = self.connection_id.lock().await.clone() else { + return; + }; + if let Err(e) = self + .http + .delete(self.endpoint.clone()) + .header(HEADER_CONNECTION_ID, connection_id) + .send() + .await + { + debug!("DELETE failed (ignored): {e}"); + } + } + + async fn sse(&self, session_id: Option) -> Result<(), String> { + let connection_id = self + .connection_id + .lock() + .await + .clone() + .ok_or_else(|| "SSE attempted before initialize".to_string())?; + let mut request = self + .http + .get(self.endpoint.clone()) + .header("Accept", "text/event-stream") + .header(HEADER_CONNECTION_ID, connection_id); + if let Some(session_id) = &session_id { + request = request.header(HEADER_SESSION_ID, session_id); + } + + let response = request.send().await.map_err(|e| e.to_string())?; + if !response.status().is_success() { + return Err(format!("HTTP {}", response.status())); + } + trace!(session_id = ?session_id, "SSE stream open"); + + let mut events = eventsource_stream::EventStream::new(response.bytes_stream()); + while let Some(event) = events.next().await { + let payload = event.map_err(|e| e.to_string())?.data; + if payload.is_empty() { + continue; + } + match serde_json::from_str::(&payload) { + Ok(msg) => { + if let Message::Response(response) = &msg { + if let Some(session_id) = response + .result + .as_ref() + .and_then(|r| r.get("sessionId")) + .and_then(|v| v.as_str()) + .map(String::from) + { + if self + .open_session_streams + .lock() + .await + .insert(session_id.clone()) + { + drop(self.open_session_tx.unbounded_send(session_id)); + } + } + } + self.deliver(msg); + } + Err(e) => warn!("SSE: malformed JSON-RPC payload: {e}"), + } + } + Ok(()) + } + + fn deliver(&self, msg: Message) { + if self.incoming.unbounded_send(Ok(msg)).is_err() { + debug!("upstream channel closed; dropping inbound message"); + } + } +} + +async fn run_ws(client: HttpClient, channel: Channel) -> Result<(), AcpError> { + let HttpClient { endpoint, .. } = client; + let Channel { + rx: mut outgoing, + tx: incoming, + } = channel; + + let (ws_stream, response) = tokio_tungstenite::connect_async(endpoint.as_str()) + .await + .map_err(|e| { + AcpError::internal_error().data(format!("WebSocket connect failed: {e}")) + })?; + trace!( + status = %response.status(), + "WebSocket connection established" + ); + let (mut ws_tx, mut ws_rx) = ws_stream.split(); + + loop { + tokio::select! { + msg = outgoing.next() => match msg { + Some(Ok(msg)) => { + let text = match serde_json::to_string(&msg) { + Ok(t) => t, + Err(e) => { + error!("failed to serialize outbound message: {e}"); + return Err(AcpError::internal_error() + .data(format!("serialize: {e}"))); + } + }; + if let Err(e) = ws_tx.send(WsMessage::Text(text.into())).await { + error!("WebSocket send failed: {e}"); + return Err(AcpError::internal_error() + .data(format!("ws send: {e}"))); + } + } + Some(Err(e)) => { + error!("upstream channel produced error: {e}"); + return Err(e); + } + None => break, + }, + frame = ws_rx.next() => match frame { + Some(Ok(WsMessage::Text(text))) => { + match serde_json::from_str::(text.as_str()) { + Ok(parsed) => { + if incoming.unbounded_send(Ok(parsed)).is_err() { + debug!("upstream channel closed; stopping WS reader"); + break; + } + } + Err(e) => warn!("WS: malformed JSON-RPC payload: {e}"), + } + } + Some(Ok(WsMessage::Binary(_))) => { + warn!("ignoring binary WebSocket frame (ACP uses text)"); + } + Some(Ok( + WsMessage::Ping(_) | WsMessage::Pong(_) | WsMessage::Frame(_), + )) => {} + Some(Ok(WsMessage::Close(frame))) => { + debug!("server closed WebSocket: {frame:?}"); + break; + } + Some(Err(e)) => { + error!("WebSocket receive error: {e}"); + return Err(AcpError::internal_error() + .data(format!("ws recv: {e}"))); + } + None => break, + }, + } + } + + drop(ws_tx.send(WsMessage::Close(None)).await); + Ok(()) +} diff --git a/src/agent-client-protocol-http/src/connection.rs b/src/agent-client-protocol-http/src/connection.rs new file mode 100644 index 0000000..2714a4f --- /dev/null +++ b/src/agent-client-protocol-http/src/connection.rs @@ -0,0 +1,304 @@ +use std::{ + collections::{HashMap, VecDeque}, + sync::Arc, +}; + +use agent_client_protocol::{Channel, jsonrpcmsg}; +use futures::{SinkExt, StreamExt}; +use jsonrpcmsg::{Id, Message}; +use tokio::sync::{Mutex, RwLock, broadcast, mpsc}; +use tracing::{error, trace}; + +use crate::protocol::session_id_from_params; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +enum IdKey { + String(String), + Number(u64), +} + +impl IdKey { + fn new(id: &Id) -> Option { + match id { + Id::String(s) => Some(Self::String(s.clone())), + Id::Number(n) => Some(Self::Number(*n)), + Id::Null => None, + } + } +} + +#[derive(Clone, Debug)] +pub(crate) enum ResponseRoute { + Connection, + Session(String), +} + +struct OutboundStream { + tx: broadcast::Sender, + replay: Mutex>>, +} + +impl OutboundStream { + fn new() -> Self { + let (tx, _) = broadcast::channel(1024); + Self { + tx, + replay: Mutex::new(Some(VecDeque::new())), + } + } + + async fn push(&self, msg: String) { + let mut replay = self.replay.lock().await; + if let Some(replay) = replay.as_mut() { + if replay.len() == 1024 { + replay.pop_front(); + } + replay.push_back(msg); + } else { + drop(replay); + drop(self.tx.send(msg)); + } + } + + async fn subscribe(&self) -> (Vec, broadcast::Receiver) { + let mut replay = self.replay.lock().await; + let receiver = self.tx.subscribe(); + (replay.take().map(Vec::from).unwrap_or_default(), receiver) + } +} + +pub(crate) struct Connection { + inbound_tx: mpsc::UnboundedSender>, + outbound_rx: Mutex>>, + agent_handle: tokio::task::JoinHandle<()>, + router_handle: Mutex>>, + connection_stream: Arc, + session_streams: RwLock>>, + all_outbound: Arc, + pending_routes: Mutex>, +} + +impl Connection { + pub(crate) fn send_to_agent(&self, msg: Message) -> Result<(), &'static str> { + self.inbound_tx + .send(Ok(msg)) + .map_err(|_| "agent channel closed") + } + + pub(crate) async fn record_pending_route(&self, id: Id, route: ResponseRoute) { + if let Some(key) = IdKey::new(&id) { + self.pending_routes.lock().await.insert(key, route); + } + } + + pub(crate) async fn ensure_session(&self, session_id: &str) { + self.session_stream(session_id).await; + } + + pub(crate) async fn subscribe_connection_stream( + &self, + ) -> (Vec, broadcast::Receiver) { + self.connection_stream.subscribe().await + } + + pub(crate) async fn subscribe_session_stream( + &self, + session_id: &str, + ) -> (Vec, broadcast::Receiver) { + self.session_stream(session_id).await.subscribe().await + } + + pub(crate) async fn subscribe_all_outbound( + &self, + ) -> (Vec, broadcast::Receiver) { + self.all_outbound.subscribe().await + } + + async fn session_stream(&self, session_id: &str) -> Arc { + if let Some(stream) = self.session_streams.read().await.get(session_id) { + return stream.clone(); + } + + self.session_streams + .write() + .await + .entry(session_id.to_string()) + .or_insert_with(|| Arc::new(OutboundStream::new())) + .clone() + } + + pub(crate) async fn start_router(self: &Arc) { + let Some(mut rx) = self.outbound_rx.lock().await.take() else { + return; + }; + + let connection = self.clone(); + *self.router_handle.lock().await = Some(tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + connection.route_outbound(msg).await; + } + })); + } + + async fn route_outbound(&self, msg: Message) { + let serialized = match serde_json::to_string(&msg) { + Ok(s) => s, + Err(e) => { + error!("failed to serialize outbound JSON-RPC message: {e}"); + return; + } + }; + + self.all_outbound.push(serialized.clone()).await; + + let route = match &msg { + Message::Request(req) => req + .params + .as_ref() + .and_then(session_id_from_params) + .map_or(ResponseRoute::Connection, ResponseRoute::Session), + Message::Response(resp) => { + let route = match resp.id.as_ref().and_then(IdKey::new) { + Some(key) => self.pending_routes.lock().await.remove(&key), + None => None, + }; + route.unwrap_or(ResponseRoute::Connection) + } + }; + + match route { + ResponseRoute::Connection => { + trace!(target = "connection", "→ connection-scoped stream"); + self.connection_stream.push(serialized).await; + } + ResponseRoute::Session(sid) => { + trace!(target = %sid, "→ session-scoped stream"); + self.session_stream(&sid).await.push(serialized).await; + } + } + } + + pub(crate) async fn recv_initial(&self) -> Option { + let mut guard = self.outbound_rx.lock().await; + let rx = guard.as_mut()?; + serde_json::to_string(&rx.recv().await?).ok() + } + + pub(crate) async fn shutdown(&self) { + self.agent_handle.abort(); + if let Some(h) = self.router_handle.lock().await.take() { + h.abort(); + } + } +} + +pub(crate) struct ConnectionRegistry { + factory: Arc, + connections: RwLock>>, +} + +pub(crate) trait AgentFactory: Send + Sync + 'static { + fn spawn_agent( + &self, + ) -> ( + Channel, + futures::future::BoxFuture<'static, agent_client_protocol::Result<()>>, + ); +} + +impl AgentFactory for F +where + F: Fn() -> C + Send + Sync + 'static, + C: agent_client_protocol::ConnectTo, +{ + fn spawn_agent( + &self, + ) -> ( + Channel, + futures::future::BoxFuture<'static, agent_client_protocol::Result<()>>, + ) { + self().into_channel_and_future() + } +} + +impl ConnectionRegistry { + pub(crate) fn new(factory: Arc) -> Self { + Self { + factory, + connections: RwLock::new(HashMap::new()), + } + } + + pub(crate) async fn create_connection(&self) -> (String, Arc) { + let (mut channel, agent_future) = self.factory.spawn_agent(); + let (inbound_tx, mut inbound_rx) = + mpsc::unbounded_channel::>(); + let (outbound_tx, outbound_rx) = mpsc::unbounded_channel::(); + + let pump = async move { + let inbound = async { + while let Some(msg) = inbound_rx.recv().await { + if channel.tx.send(msg).await.is_err() { + break; + } + } + drop(channel.tx.close().await); + }; + let outbound = async { + while let Some(msg) = channel.rx.next().await { + match msg { + Ok(m) => { + if outbound_tx.send(m).is_err() { + break; + } + } + Err(e) => { + error!("agent emitted error: {e}"); + break; + } + } + } + }; + futures::join!(inbound, outbound); + }; + + let connection_id = uuid::Uuid::new_v4().to_string(); + let conn_id_for_task = connection_id.clone(); + let agent_handle = tokio::spawn(async move { + let agent = async move { + if let Err(e) = agent_future.await { + error!(connection_id = %conn_id_for_task, "ACP agent task error: {e}"); + } + }; + futures::pin_mut!(agent); + futures::pin_mut!(pump); + futures::future::select(agent, pump).await; + }); + + let connection = Arc::new(Connection { + inbound_tx, + outbound_rx: Mutex::new(Some(outbound_rx)), + agent_handle, + router_handle: Mutex::new(None), + connection_stream: Arc::new(OutboundStream::new()), + session_streams: RwLock::new(HashMap::new()), + all_outbound: Arc::new(OutboundStream::new()), + pending_routes: Mutex::new(HashMap::new()), + }); + + self.connections + .write() + .await + .insert(connection_id.clone(), connection.clone()); + + (connection_id, connection) + } + + pub(crate) async fn get(&self, connection_id: &str) -> Option> { + self.connections.read().await.get(connection_id).cloned() + } + + pub(crate) async fn remove(&self, connection_id: &str) -> Option> { + self.connections.write().await.remove(connection_id) + } +} diff --git a/src/agent-client-protocol-http/src/http_server.rs b/src/agent-client-protocol-http/src/http_server.rs new file mode 100644 index 0000000..6b01577 --- /dev/null +++ b/src/agent-client-protocol-http/src/http_server.rs @@ -0,0 +1,225 @@ +use std::{convert::Infallible, sync::Arc, time::Duration}; + +use axum::{ + body::Body, + extract::State, + http::{HeaderMap, HeaderValue, Request, StatusCode, header}, + response::{IntoResponse, Response, Sse, sse::Event}, +}; +use jsonrpcmsg::Message; +use tokio::sync::broadcast; +use tracing::{debug, error, info, trace}; + +use crate::{ + connection::{ConnectionRegistry, ResponseRoute}, + protocol::{ + EVENT_STREAM_MIME_TYPE, HEADER_CONNECTION_ID, HEADER_SESSION_ID, JSON_MIME_TYPE, + is_initialize_request, method_requires_session_header, session_id_from_params, + }, +}; + +pub(crate) async fn handle_post( + State(registry): State>, + request: Request, +) -> Response { + if !request + .headers() + .get(header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) + .is_some_and(|ct| ct.starts_with(JSON_MIME_TYPE)) + { + return ( + StatusCode::UNSUPPORTED_MEDIA_TYPE, + "Content-Type must be application/json", + ) + .into_response(); + } + + let connection_id = header_value(request.headers(), HEADER_CONNECTION_ID); + let session_id = header_value(request.headers(), HEADER_SESSION_ID); + let body = match axum::body::to_bytes(request.into_body(), usize::MAX).await { + Ok(body) => body, + Err(e) => { + error!("Failed to read request body: {e}"); + return StatusCode::BAD_REQUEST.into_response(); + } + }; + + if matches!(body.first(), Some(&b'[')) { + return StatusCode::NOT_IMPLEMENTED.into_response(); + } + + let message = match serde_json::from_slice::(&body) { + Ok(message) => message, + Err(e) => { + return (StatusCode::BAD_REQUEST, format!("Invalid JSON-RPC: {e}")).into_response(); + } + }; + + if is_initialize_request(&message) { + let (connection_id, connection) = registry.create_connection().await; + if connection.send_to_agent(message).is_err() { + registry.remove(&connection_id).await; + connection.shutdown().await; + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + + let Some(init_response) = connection.recv_initial().await else { + registry.remove(&connection_id).await; + connection.shutdown().await; + return ( + StatusCode::INTERNAL_SERVER_ERROR, + "agent closed before initialize response", + ) + .into_response(); + }; + + connection.start_router().await; + info!(connection_id = %connection_id, "Initialize complete"); + return with_connection_header( + ( + StatusCode::OK, + [(header::CONTENT_TYPE, JSON_MIME_TYPE)], + init_response, + ) + .into_response(), + &connection_id, + ); + } + + let Some(connection_id) = connection_id else { + return (StatusCode::BAD_REQUEST, "Acp-Connection-Id header required").into_response(); + }; + let Some(connection) = registry.get(&connection_id).await else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let route = match &message { + Message::Request(req) => match session_id + .or_else(|| req.params.as_ref().and_then(session_id_from_params)) + { + Some(session_id) => Some(ResponseRoute::Session(session_id)), + None if method_requires_session_header(&req.method) => { + return (StatusCode::BAD_REQUEST, "Acp-Session-Id header required").into_response(); + } + None => Some(ResponseRoute::Connection), + }, + Message::Response(_) => None, + }; + + if let Some(ResponseRoute::Session(session_id)) = &route { + connection.ensure_session(session_id).await; + } + if let (Message::Request(req), Some(route), Some(id)) = (&message, route, message_id(&message)) + { + connection.record_pending_route(id, route).await; + trace!(connection_id = %connection_id, method = %req.method, "POST → agent"); + } else { + trace!(connection_id = %connection_id, ?message, "POST → agent"); + } + + if connection.send_to_agent(message).is_err() { + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); + } + StatusCode::ACCEPTED.into_response() +} + +pub(crate) async fn handle_get( + registry: Arc, + request: Request, +) -> Response { + if !request + .headers() + .get(header::ACCEPT) + .and_then(|v| v.to_str().ok()) + .is_some_and(|accept| accept.contains(EVENT_STREAM_MIME_TYPE)) + { + return ( + StatusCode::NOT_ACCEPTABLE, + "client must accept text/event-stream", + ) + .into_response(); + } + + let Some(connection_id) = header_value(request.headers(), HEADER_CONNECTION_ID) else { + return (StatusCode::BAD_REQUEST, "Acp-Connection-Id header required").into_response(); + }; + let Some(connection) = registry.get(&connection_id).await else { + return StatusCode::NOT_FOUND.into_response(); + }; + + let session_id = header_value(request.headers(), HEADER_SESSION_ID); + let (replay, mut receiver) = match session_id.as_deref() { + Some(session_id) => connection.subscribe_session_stream(session_id).await, + None => connection.subscribe_connection_stream().await, + }; + let stream = async_stream::stream! { + for msg in replay { + trace!(payload = %msg, "SSE → client (replay)"); + yield Ok::<_, Infallible>(Event::default().data(msg)); + } + loop { + match receiver.recv().await { + Ok(msg) => { + trace!(payload = %msg, "SSE → client"); + yield Ok(Event::default().data(msg)); + } + Err(broadcast::error::RecvError::Lagged(n)) => debug!("SSE subscriber lagged {n} messages"), + Err(broadcast::error::RecvError::Closed) => break, + } + } + }; + + let mut response = with_connection_header( + Sse::new(stream) + .keep_alive( + axum::response::sse::KeepAlive::new() + .interval(Duration::from_secs(15)) + .text(""), + ) + .into_response(), + &connection_id, + ); + if let Some(session_id) = session_id { + if let Ok(value) = HeaderValue::from_str(&session_id) { + response.headers_mut().insert(HEADER_SESSION_ID, value); + } + } + response +} + +pub(crate) async fn handle_delete( + State(registry): State>, + request: Request, +) -> Response { + let Some(connection_id) = header_value(request.headers(), HEADER_CONNECTION_ID) else { + return (StatusCode::BAD_REQUEST, "Acp-Connection-Id header required").into_response(); + }; + let Some(connection) = registry.remove(&connection_id).await else { + return StatusCode::NOT_FOUND.into_response(); + }; + connection.shutdown().await; + info!(connection_id = %connection_id, "Connection terminated via DELETE"); + StatusCode::ACCEPTED.into_response() +} + +fn header_value(headers: &HeaderMap, name: &str) -> Option { + headers + .get(name) + .and_then(|v| v.to_str().ok()) + .map(String::from) +} + +fn message_id(message: &Message) -> Option { + match message { + Message::Request(req) => req.id.clone(), + Message::Response(_) => None, + } +} + +fn with_connection_header(mut response: Response, connection_id: &str) -> Response { + if let Ok(value) = HeaderValue::from_str(connection_id) { + response.headers_mut().insert(HEADER_CONNECTION_ID, value); + } + response +} diff --git a/src/agent-client-protocol-http/src/lib.rs b/src/agent-client-protocol-http/src/lib.rs new file mode 100644 index 0000000..11cef13 --- /dev/null +++ b/src/agent-client-protocol-http/src/lib.rs @@ -0,0 +1,9 @@ +mod client; +mod connection; +mod http_server; +mod protocol; +mod server; +mod websocket_server; + +pub use client::{HttpClient, HttpClientError}; +pub use server::{AcpHttpServer, ServerOptions}; diff --git a/src/agent-client-protocol-http/src/protocol.rs b/src/agent-client-protocol-http/src/protocol.rs new file mode 100644 index 0000000..77d7865 --- /dev/null +++ b/src/agent-client-protocol-http/src/protocol.rs @@ -0,0 +1,31 @@ +use jsonrpcmsg::{Message, Params}; + +pub(crate) const HEADER_CONNECTION_ID: &str = "Acp-Connection-Id"; +pub(crate) const HEADER_SESSION_ID: &str = "Acp-Session-Id"; +pub(crate) const EVENT_STREAM_MIME_TYPE: &str = "text/event-stream"; +pub(crate) const JSON_MIME_TYPE: &str = "application/json"; + +pub(crate) fn method_requires_session_header(method: &str) -> bool { + matches!( + method, + "session/prompt" + | "session/cancel" + | "session/load" + | "session/set_mode" + | "session/set_model" + ) +} + +pub(crate) fn is_initialize_request(msg: &Message) -> bool { + matches!(msg, Message::Request(req) if req.method == "initialize" && req.id.is_some()) +} + +pub(crate) fn session_id_from_params(params: &Params) -> Option { + match params { + Params::Object(map) => map + .get("sessionId") + .and_then(|v| v.as_str()) + .map(String::from), + Params::Array(_) => None, + } +} diff --git a/src/agent-client-protocol-http/src/server.rs b/src/agent-client-protocol-http/src/server.rs new file mode 100644 index 0000000..1afc92c --- /dev/null +++ b/src/agent-client-protocol-http/src/server.rs @@ -0,0 +1,124 @@ +use std::sync::Arc; + +use agent_client_protocol::{Client, ConnectTo}; +use axum::{ + Router, + extract::WebSocketUpgrade, + extract::ws::rejection::WebSocketUpgradeRejection, + http::{HeaderName, Method, header}, + response::Response, + routing::{delete, get, post}, +}; +use tower_http::cors::{Any, CorsLayer}; + +use crate::connection::ConnectionRegistry; + +#[derive(Debug, Clone)] +pub struct ServerOptions { + pub path: String, + pub permissive_cors: bool, + pub health_endpoint: bool, +} + +impl Default for ServerOptions { + fn default() -> Self { + Self { + path: "/acp".to_string(), + permissive_cors: true, + health_endpoint: true, + } + } +} + +pub struct AcpHttpServer { + registry: Arc, + options: ServerOptions, +} + +impl std::fmt::Debug for AcpHttpServer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AcpHttpServer") + .field("options", &self.options) + .finish_non_exhaustive() + } +} + +impl AcpHttpServer { + pub fn new(factory: F) -> Self + where + F: Fn() -> C + Send + Sync + 'static, + C: ConnectTo, + { + Self { + registry: Arc::new(ConnectionRegistry::new(Arc::new(factory))), + options: ServerOptions::default(), + } + } + + #[must_use] + pub fn with_options(mut self, options: ServerOptions) -> Self { + self.options = options; + self + } + + pub fn into_router(self) -> Router { + let registry = self.registry.clone(); + let path = self.options.path.clone(); + + let mut router = Router::new() + .route( + &path, + post(crate::http_server::handle_post).with_state(registry.clone()), + ) + .route(&path, get(handle_get).with_state(registry.clone())) + .route( + &path, + delete(crate::http_server::handle_delete).with_state(registry), + ); + + if self.options.health_endpoint { + router = router.route("/health", get(health)); + } + + if self.options.permissive_cors { + router = router.layer(default_cors()); + } + + router + } +} + +async fn health() -> &'static str { + "ok" +} + +fn default_cors() -> CorsLayer { + CorsLayer::new() + .allow_origin(Any) + .allow_methods([Method::GET, Method::POST, Method::DELETE, Method::OPTIONS]) + .allow_headers([ + header::CONTENT_TYPE, + header::ACCEPT, + HeaderName::from_static("acp-connection-id"), + HeaderName::from_static("acp-session-id"), + header::SEC_WEBSOCKET_VERSION, + header::SEC_WEBSOCKET_KEY, + header::CONNECTION, + header::UPGRADE, + ]) + .expose_headers([ + HeaderName::from_static("acp-connection-id"), + HeaderName::from_static("acp-session-id"), + ]) +} + +async fn handle_get( + ws_upgrade: Result, + axum::extract::State(registry): axum::extract::State>, + request: axum::http::Request, +) -> Response { + match ws_upgrade { + Ok(ws) => crate::websocket_server::handle_ws_upgrade(registry, ws).await, + Err(_) => crate::http_server::handle_get(registry, request).await, + } +} diff --git a/src/agent-client-protocol-http/src/websocket_server.rs b/src/agent-client-protocol-http/src/websocket_server.rs new file mode 100644 index 0000000..b7e6932 --- /dev/null +++ b/src/agent-client-protocol-http/src/websocket_server.rs @@ -0,0 +1,133 @@ +use std::sync::Arc; + +use axum::{ + extract::ws::{Message as WsMessage, WebSocket, WebSocketUpgrade}, + http::HeaderValue, + response::Response, +}; +use futures::{SinkExt, StreamExt}; +use jsonrpcmsg::Message; +use tracing::{debug, error, info, trace, warn}; + +use crate::{ + connection::{ConnectionRegistry, ResponseRoute}, + protocol::{HEADER_CONNECTION_ID, session_id_from_params}, +}; + +pub(crate) async fn handle_ws_upgrade( + registry: Arc, + ws: WebSocketUpgrade, +) -> Response { + let (connection_id, connection) = registry.create_connection().await; + + connection.start_router().await; + + let conn_id_for_handler = connection_id.clone(); + let registry_for_handler = registry.clone(); + let mut response = ws.on_upgrade(move |socket| async move { + run_ws( + socket, + registry_for_handler, + conn_id_for_handler, + connection, + ) + .await; + }); + + if let Ok(v) = HeaderValue::from_str(&connection_id) { + response.headers_mut().insert(HEADER_CONNECTION_ID, v); + } + info!(connection_id = %connection_id, "WebSocket connection created"); + response +} + +async fn run_ws( + socket: WebSocket, + registry: Arc, + connection_id: String, + connection: Arc, +) { + let (mut ws_tx, mut ws_rx) = socket.split(); + let (replay, mut outbound_rx) = connection.subscribe_all_outbound().await; + + debug!(connection_id = %connection_id, "Starting WebSocket message loop"); + + for text in replay { + trace!(connection_id = %connection_id, payload = %text, "Agent → Client (replay): {} bytes", text.len()); + if ws_tx.send(WsMessage::Text(text.into())).await.is_err() { + error!(connection_id = %connection_id, "WebSocket send failed during replay"); + if let Some(conn) = registry.remove(&connection_id).await { + conn.shutdown().await; + } + return; + } + } + + loop { + tokio::select! { + msg_result = ws_rx.next() => { + match msg_result { + Some(Ok(WsMessage::Text(text))) => { + let text_str = text.to_string(); + trace!(connection_id = %connection_id, payload = %text_str, "Client → Agent: {} bytes", text_str.len()); + match serde_json::from_str::(&text_str) { + Ok(parsed) => { + if let Message::Request(req) = &parsed { + if let (Some(id), Some(params)) = (req.id.clone(), req.params.as_ref()) { + if let Some(sid) = session_id_from_params(params) { + connection.ensure_session(&sid).await; + connection + .record_pending_route(id, ResponseRoute::Session(sid)) + .await; + } + } + } + if connection.send_to_agent(parsed).is_err() { + error!(connection_id = %connection_id, "Agent channel closed"); + break; + } + } + Err(e) => { + warn!(connection_id = %connection_id, "Ignoring malformed JSON-RPC frame: {e}"); + } + } + } + Some(Ok(WsMessage::Close(frame))) => { + debug!(connection_id = %connection_id, "Client closed connection: {:?}", frame); + break; + } + Some(Ok(WsMessage::Ping(_) | WsMessage::Pong(_))) => {} + Some(Ok(WsMessage::Binary(_))) => { + warn!(connection_id = %connection_id, "Ignoring binary message (ACP uses text)"); + } + Some(Err(e)) => { + error!(connection_id = %connection_id, "WebSocket error: {e}"); + break; + } + None => break, + } + } + + recv = outbound_rx.recv() => { + match recv { + Ok(text) => { + trace!(connection_id = %connection_id, payload = %text, "Agent → Client: {} bytes", text.len()); + if ws_tx.send(WsMessage::Text(text.into())).await.is_err() { + error!(connection_id = %connection_id, "WebSocket send failed"); + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + warn!(connection_id = %connection_id, "WebSocket lagged {n} messages"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + } + } + + debug!(connection_id = %connection_id, "Cleaning up WebSocket connection"); + if let Some(conn) = registry.remove(&connection_id).await { + conn.shutdown().await; + } +}