Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions md/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- [Design Overview](./design.md)
- [Protocol Reference](./protocol.md)
- [Protocol Versioning](./protocol-versions.md)

# Conductor (agent-client-protocol-conductor)

Expand Down
30 changes: 30 additions & 0 deletions md/protocol-versions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Protocol Versioning

The SDK normally exposes the stable ACP v1 schema types through `agent_client_protocol::schema::*`.

For experiments against the draft ACP v2 schema, enable the `unstable_protocol_v2` feature on
`agent-client-protocol`. With that feature enabled, `schema::*` resolves to the schema crate's
`v2` types by default, while the connection layer still speaks either v1 or v2 on the wire.

## Negotiation

Protocol version negotiation is driven by the normal `initialize` request and response:

- Before any `initialize` message is observed, non-initialize ACP messages are treated as v2 when the v2 feature is enabled.
- While an `initialize` request is in flight, the requested `protocolVersion` is used provisionally for wire conversion.
- The `initialize` request is encoded according to the requested `protocolVersion`.
- The `initialize` response records the negotiated wire version on the connection.
- Later known ACP requests, responses, and notifications are downgraded to v1 or left as v2 based on that negotiated version.

The conversion is internal to the SDK. Agent and client handlers continue to use the feature-selected
`schema::*` types. With `unstable_protocol_v2` enabled, that means user code handles v2 types even
when the remote side negotiated v1.

## Scope

The adapter converts known ACP payloads at the untyped JSON-RPC boundary using the schema crate's v2
conversion module. Custom JSON-RPC methods and extension methods are passed through unchanged.

The v2 feature is intentionally separate from the existing `unstable` umbrella because it changes the
SDK's default Rust type namespace. It should be enabled explicitly by experiments that are ready to
compile against the draft v2 types.
8 changes: 7 additions & 1 deletion src/agent-client-protocol-conductor/src/conductor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,13 @@ where
conductor_tx
.send(ConductorMessage::LeftToRight {
target_component_index: component_index + 1,
message: dispatch.map(|r, cx| (r.message, cx), |n| n.message),
message: dispatch.map(
|r, responder| {
let method = r.message.method().to_string();
(r.message, responder.wrap_method(method))
},
|n| n.message,
),
})
.await
.map_err(agent_client_protocol::util::internal_error)
Expand Down
6 changes: 6 additions & 0 deletions src/agent-client-protocol/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## [Unreleased]

### Added

- Add `unstable_protocol_v2` support that uses draft v2 schema types by default while negotiating v1 or v2 wire payloads internally.

## [0.11.1](https://github.com/agentclientprotocol/rust-sdk/compare/v0.11.0...v0.11.1) - 2026-04-21

### Fixed
Expand Down
1 change: 1 addition & 0 deletions src/agent-client-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ unstable_session_additional_directories = ["agent-client-protocol-schema/unstabl
unstable_session_fork = ["agent-client-protocol-schema/unstable_session_fork"]
unstable_session_model = ["agent-client-protocol-schema/unstable_session_model"]
unstable_session_usage = ["agent-client-protocol-schema/unstable_session_usage"]
unstable_protocol_v2 = ["agent-client-protocol-schema/unstable_protocol_v2"]

[dependencies]
agent-client-protocol-schema.workspace = true
Expand Down
96 changes: 75 additions & 21 deletions src/agent-client-protocol/src/jsonrpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Core JSON-RPC server support.

use agent_client_protocol_schema::SessionId;
// Re-export jsonrpcmsg for use in public API
pub use jsonrpcmsg;

Expand Down Expand Up @@ -34,6 +33,7 @@ use crate::jsonrpc::task_actor::{Task, TaskTx};
use crate::mcp_server::McpServer;
use crate::role::HasPeer;
use crate::role::Role;
use crate::schema::{METHOD_SUCCESSOR_MESSAGE, SessionId};
use crate::util::json_cast;
use crate::{Agent, Client, ConnectTo, RoleId};

Expand Down Expand Up @@ -1178,11 +1178,15 @@ impl<
let (outgoing_tx, outgoing_rx) = mpsc::unbounded();
let (new_task_tx, new_task_rx) = mpsc::unbounded();
let (dynamic_handler_tx, dynamic_handler_rx) = mpsc::unbounded();
#[cfg(feature = "unstable_protocol_v2")]
let protocol_state = crate::schema::v2_compat::ProtocolState::default();
let connection = ConnectionTo::new(
me.counterpart(),
outgoing_tx,
new_task_tx,
dynamic_handler_tx,
#[cfg(feature = "unstable_protocol_v2")]
protocol_state.clone(),
);

// Convert transport into server - this returns a channel for us to use
Expand Down Expand Up @@ -1211,6 +1215,8 @@ impl<
outgoing_rx,
reply_tx.clone(),
transport_outgoing_tx,
#[cfg(feature = "unstable_protocol_v2")]
protocol_state.clone(),
),
// Protocol layer: jsonrpcmsg::Message → handler/reply routing
incoming_actor::incoming_protocol_actor(
Expand All @@ -1220,6 +1226,8 @@ impl<
dynamic_handler_rx,
reply_rx,
handler,
#[cfg(feature = "unstable_protocol_v2")]
protocol_state.clone(),
),
task_actor::task_actor(new_task_rx, &connection),
responder.run_with_connection_to(connection.clone()),
Expand Down Expand Up @@ -1341,6 +1349,8 @@ enum OutgoingMessage {
Response {
id: jsonrpcmsg::Id,

method: String,

response: Result<serde_json::Value, crate::Error>,
},

Expand Down Expand Up @@ -1424,6 +1434,8 @@ pub struct ConnectionTo<Counterpart: Role> {
message_tx: OutgoingMessageTx,
task_tx: TaskTx,
dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
#[cfg(feature = "unstable_protocol_v2")]
protocol_state: crate::schema::v2_compat::ProtocolState,
}

impl<Counterpart: Role> ConnectionTo<Counterpart> {
Expand All @@ -1432,12 +1444,16 @@ impl<Counterpart: Role> ConnectionTo<Counterpart> {
message_tx: mpsc::UnboundedSender<OutgoingMessage>,
task_tx: mpsc::UnboundedSender<Task>,
dynamic_handler_tx: mpsc::UnboundedSender<DynamicHandlerMessage<Counterpart>>,
#[cfg(feature = "unstable_protocol_v2")]
protocol_state: crate::schema::v2_compat::ProtocolState,
) -> Self {
Self {
counterpart,
message_tx,
task_tx,
dynamic_handler_tx,
#[cfg(feature = "unstable_protocol_v2")]
protocol_state,
}
}

Expand All @@ -1446,6 +1462,15 @@ impl<Counterpart: Role> ConnectionTo<Counterpart> {
self.counterpart.clone()
}

/// Return the protocol version negotiated by the initialize handshake.
///
/// This is `None` until an `initialize` response is sent or received.
#[cfg(feature = "unstable_protocol_v2")]
#[must_use]
pub fn negotiated_protocol_version(&self) -> Option<crate::schema::ProtocolVersion> {
self.protocol_state.negotiated_protocol_version()
}

/// Spawns a task that will run so long as the JSON-RPC connection is being served.
///
/// This is the primary mechanism for offloading expensive work from handler callbacks
Expand Down Expand Up @@ -1662,12 +1687,14 @@ impl<Counterpart: Role> ConnectionTo<Counterpart> {
let (response_tx, response_rx) = oneshot::channel();
let role_id = peer.role_id();
let remote_style = self.counterpart.remote_style(peer);
let mut response_method = method.clone();
match remote_style.transform_outgoing_message(request) {
Ok(untyped) => {
response_method = response_method_for_outgoing_request(&method, &untyped);
// Transform the message for the target role
let message = OutgoingMessage::Request {
id: id.clone(),
method: method.clone(),
method: response_method.clone(),
role_id,
untyped,
response_tx,
Expand Down Expand Up @@ -1709,8 +1736,13 @@ impl<Counterpart: Role> ConnectionTo<Counterpart> {
}
}

SentRequest::new(id, method.clone(), self.task_tx.clone(), response_rx)
.map(move |json| <Req::Response>::from_value(&method, json))
SentRequest::new(
id,
response_method.clone(),
self.task_tx.clone(),
response_rx,
)
.map(move |json| <Req::Response>::from_value(&response_method, json))
}

/// Send an outgoing notification to the default counterpart peer (no reply expected).
Expand Down Expand Up @@ -1811,6 +1843,16 @@ impl<Counterpart: Role> ConnectionTo<Counterpart> {
}
}

fn response_method_for_outgoing_request(method: &str, untyped: &UntypedMessage) -> String {
if untyped.method == METHOD_SUCCESSOR_MESSAGE
&& let Some(serde_json::Value::String(inner_method)) = untyped.params.get("method")
{
return inner_method.clone();
}

method.to_string()
}

#[derive(Clone, Debug)]
pub struct DynamicHandlerRegistration<R: Role> {
uuid: Uuid,
Expand Down Expand Up @@ -1888,7 +1930,7 @@ pub struct Responder<T: JsonRpcResponse = serde_json::Value> {
///
/// For incoming requests: serializes to JSON and sends over the wire.
/// For incoming responses: sends to the waiting oneshot channel.
send_fn: Box<dyn FnOnce(Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
send_fn: Box<dyn FnOnce(String, Result<T, crate::Error>) -> Result<(), crate::Error> + Send>,
}

impl<T: JsonRpcResponse> std::fmt::Debug for Responder<T> {
Expand All @@ -1910,15 +1952,18 @@ impl Responder<serde_json::Value> {
Self {
method,
id,
send_fn: Box::new(move |response: Result<serde_json::Value, crate::Error>| {
send_raw_message(
&message_tx,
OutgoingMessage::Response {
id: id_clone,
response,
},
)
}),
send_fn: Box::new(
move |method, response: Result<serde_json::Value, crate::Error>| {
send_raw_message(
&message_tx,
OutgoingMessage::Response {
id: id_clone,
method,
response,
},
)
},
),
}
}

Expand Down Expand Up @@ -1970,13 +2015,17 @@ impl<T: JsonRpcResponse> Responder<T> {
self,
wrap_fn: impl FnOnce(&str, Result<U, crate::Error>) -> Result<T, crate::Error> + Send + 'static,
) -> Responder<U> {
let method = self.method.clone();
let Self {
method,
id,
send_fn,
} = self;
Responder {
method: self.method,
id: self.id,
send_fn: Box::new(move |input: Result<U, crate::Error>| {
method,
id,
send_fn: Box::new(move |method, input: Result<U, crate::Error>| {
let t_value = wrap_fn(&method, input);
(self.send_fn)(t_value)
send_fn(method, t_value)
}),
}
}
Expand All @@ -1986,8 +2035,13 @@ impl<T: JsonRpcResponse> Responder<T> {
self,
response: Result<T, crate::Error>,
) -> Result<(), crate::Error> {
tracing::debug!(id = ?self.id, "respond called");
(self.send_fn)(response)
let Self {
method,
id,
send_fn,
} = self;
tracing::debug!(id = ?id, "respond called");
send_fn(method, response)
}

/// Respond to the JSON-RPC request with a value.
Expand Down
Loading
Loading