Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,30 @@ will be silently dropped and `0n` returned. The local
`maxDatagramFrameSize` transport parameter (default: `1200` bytes) controls
what this endpoint advertises to the peer as its own maximum.

### `session.servername`

<!-- YAML
added: REPLACEME
-->

* Type: {string|undefined}

The SNI (Server Name Indication) host name associated with the session, or
`undefined` if none was set. On a client this is the `servername` requested via
[`quic.connect()`][]. On a server it is the name sent by the peer.

### `session.alpnProtocol`

<!-- YAML
added: REPLACEME
-->

* Type: {string|undefined}

The negotiated ALPN protocol. On a server this is available synchronously as
soon as the session is surfaced to the `onsession` callback; on a client it is
`undefined` until the handshake completes.

### `session.certificate`

<!-- YAML
Expand Down Expand Up @@ -3563,7 +3587,11 @@ added: v23.8.0
* `this` {quic.QuicEndpoint}
* `session` {quic.QuicSession}

The callback function that is invoked when a new session is initiated by a remote peer.
The callback function that is invoked when a new server session is initiated by
a remote peer. It is called once the peer's TLS `ClientHello` has been
processed, so the negotiated TLS parameters are immediately available when
the callback runs. Sessions whose handshake is rejected before this point are
never surfaced.

### Callback: `OnStreamCallback`

Expand Down
24 changes: 23 additions & 1 deletion lib/internal/quic/quic.js
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,9 @@ setCallbacks({
this[kOwner][kFinishClose](context, status);
},
/**
* Called when the QuicEndpoint C++ handle receives a new server-side session
* Called when a new server session is surfaced. The emit happens once the
* session's ClientHello has been processed, so its servername/protocol
* getters are already readable.
* @param {object} session The QuicSession C++ handle
*/
onSessionNew(session) {
Expand Down Expand Up @@ -2877,6 +2879,26 @@ class QuicSession {
}
}

/**
* The SNI servername, or undefined when none was sent.
* @type {string|undefined}
*/
get servername() {
assertIsQuicSession(this);
if (this.destroyed) return undefined;
return this.#handle.getServername();
}

/**
* The negotiated ALPN protocol.
* @type {string|undefined}
*/
get alpnProtocol() {
assertIsQuicSession(this);
if (this.destroyed) return undefined;
return this.#handle.getAlpnProtocol();
}

/** @type {OnDatagramCallback} */
get ondatagram() {
assertIsQuicSession(this);
Expand Down
30 changes: 18 additions & 12 deletions src/quic/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,8 @@ void Endpoint::AddSession(const CID& cid, BaseObjectPtr<Session> session) {
// For server sessions, associate the client's original DCID (ocid) so
// that 0-RTT packets arriving in a separate UDP datagram can be routed
// to this session. This must happen after the session is added (so
// FindSession can resolve the mapping) but before EmitNewSession (which
// runs JS and may yield to libuv, allowing the 0-RTT packet to arrive).
// FindSession can resolve the mapping) and before any JS runs (which
// may yield to libuv, allowing the 0-RTT packet to arrive).
if (session->is_server() && session->config().ocid) {
AssociateCID(session->config().ocid, session->config().scid);
}
Expand All @@ -825,22 +825,22 @@ void Endpoint::AddSession(const CID& cid, BaseObjectPtr<Session> session) {
if (session->is_server() && session->config().retry_scid) {
AssociateCID(session->config().retry_scid, session->config().scid);
}
// Increment the primary session count and ref the handle BEFORE
// EmitNewSession. EmitNewSession calls into JS, which may close/destroy
// the session synchronously. The session's ~Impl calls RemoveSession
// which decrements the count. If we increment after EmitNewSession,
// RemoveSession would see count=0 and the count would be permanently
// off by one.
// Increment the primary session count and ref the handle BEFORE any
// JS can run for this session (the deferred EmitNewSession, or packet
// processing callbacks). JS may close/destroy the session
// synchronously; the session's ~Impl calls RemoveSession which
// decrements the count. If we incremented after, RemoveSession would
// see count=0 and the count would be permanently off by one.
if (primary_session_count_++ == 0) {
idle_timer_.Stop();
udp_.Ref();
}
if (session->is_server()) {
STAT_INCREMENT(Stats, server_sessions);
// We only emit the new session event for server sessions.
EmitNewSession(session);
// It is important to note that the session may be closed/destroyed
// when it is emitted here.
// Note that we don't emit new sessions here - that's deferred until the
// ClientHello has been processed (see Session::ReadPacket), so the
// session is exposed to JS only once its SNI/ALPN are known and invalid
// handshakes never surface.
} else {
STAT_INCREMENT(Stats, client_sessions);
}
Expand Down Expand Up @@ -1980,6 +1980,12 @@ void Endpoint::EmitNewSession(const BaseObjectPtr<Session>& session) {
// the call to MakeCallback. If that's the case, the session object still
// exists but it is in a destroyed state. Care should be taken accessing
// session after this point.

// Deliver any stream events that were held until the stream was setup,
// e.g. 0-RTT streams from the first flight.
if (!session->is_destroyed()) {
session->ReplayDeferredEmits();
}
}

void Endpoint::EmitClose(CloseContext context, int status) {
Expand Down
112 changes: 110 additions & 2 deletions src/quic/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ uint64_t MaxDatagramPayload(uint64_t max_frame_size) {
#define SESSION_JS_METHODS(V) \
V(Destroy, destroy, SIDE_EFFECT) \
V(GetRemoteAddress, getRemoteAddress, NO_SIDE_EFFECT) \
V(GetServername, getServername, NO_SIDE_EFFECT) \
V(GetAlpnProtocol, getAlpnProtocol, NO_SIDE_EFFECT) \
V(GetLocalAddress, getLocalAddress, NO_SIDE_EFFECT) \
V(GetCertificate, getCertificate, NO_SIDE_EFFECT) \
V(GetEphemeralKeyInfo, getEphemeralKey, NO_SIDE_EFFECT) \
Expand Down Expand Up @@ -781,6 +783,8 @@ struct Session::Impl final : public MemoryRetainer {
SocketAddress remote_address_;
std::unique_ptr<Application> application_;
StreamsMap streams_;
// Emits deferred until after session setup is completed
std::vector<std::function<void()>> deferred_emits_;
TimerWrapHandle timer_;
size_t send_scope_depth_ = 0;
QuicError last_error_;
Expand Down Expand Up @@ -1001,6 +1005,36 @@ struct Session::Impl final : public MemoryRetainer {
session->Destroy();
}

// The SNI servername from the TLS handshake; empty (-> undefined) only if
// none was sent.
JS_METHOD(GetServername) {
auto env = Environment::GetCurrent(args);
Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.This());
if (session->is_destroyed()) return;
auto sn = session->tls_session().servername();
if (sn.empty()) return;
Local<Value> ret;
if (ToV8Value(env->context(), sn).ToLocal(&ret)) {
args.GetReturnValue().Set(ret);
}
}

// The negotiated ALPN protocol. Undefined only for clients before the
// handshake is completed, as ALPN is mandatory for QUIC.
JS_METHOD(GetAlpnProtocol) {
auto env = Environment::GetCurrent(args);
Session* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args.This());
if (session->is_destroyed()) return;
auto proto = session->tls_session().protocol();
if (proto.empty()) return;
Local<Value> ret;
if (ToV8Value(env->context(), proto).ToLocal(&ret)) {
args.GetReturnValue().Set(ret);
}
}

JS_METHOD(GetRemoteAddress) {
auto env = Environment::GetCurrent(args);
Session* session;
Expand Down Expand Up @@ -2190,6 +2224,12 @@ const Session::Options& Session::options() const {
void Session::EmitQlog(uint32_t flags, std::string_view data) {
if (!env()->can_call_into_js()) return;

if (!is_destroyed() && must_defer_emits()) {
QueueDeferredEmit(
[this, flags, held = std::string(data)]() { EmitQlog(flags, held); });
return;
}

bool fin = (flags & NGTCP2_QLOG_WRITE_FLAG_FIN) != 0;

// Fun fact... ngtcp2 does not emit the final qlog statement until the
Expand Down Expand Up @@ -2312,6 +2352,16 @@ bool Session::ReadPacket(const uint8_t* data,
// Process deferred operations that couldn't run inside callback
// scopes (e.g., HTTP/3 GOAWAY handling that calls into JS).
application().PostReceive();
// Surface a server session to JS once its ClientHello has been
// processed (OnSelectAlpn fired: SNI + ALPN are known and reliable).
// Held first-flight events - including 0-RTT request streams - replay
// at emit. The !wrapped guard makes this fire exactly once, on
// whichever packet completes the ClientHello (so a multi-datagram
// ClientHello is handled correctly).
if (is_server() && hello_processed_ && !impl_->state()->wrapped &&
!is_destroyed()) {
endpoint().EmitNewSession(BaseObjectPtr<Session>(this));
}
}
return true;
}
Expand Down Expand Up @@ -2975,6 +3025,30 @@ void Session::set_wrapped() {
impl_->state()->wrapped = 1;
}

bool Session::must_defer_emits() const {
// Server sessions are surfaced to JS (via the deferred new-session emit)
// only after the ClientHello has been processed and wrapped; anything
// emitted before then has no JS wrapper to receive it and must be held
// for replay.
return is_server() && !impl_->state()->wrapped;
}

void Session::QueueDeferredEmit(std::function<void()> fn) {
impl_->deferred_emits_.emplace_back(std::move(fn));
}

void Session::ReplayDeferredEmits() {
if (is_destroyed()) return;
DCHECK(impl_->state()->wrapped);
// Runs synchronously immediately after the new-session callback
// returns (still within first-flight processing).
auto emits = std::move(impl_->deferred_emits_);
for (auto& emit : emits) {
if (is_destroyed()) return;
emit();
}
}

void Session::set_priority_supported(bool on) {
DCHECK(!is_destroyed());
impl_->state()->priority_supported = on ? 1 : 0;
Expand Down Expand Up @@ -3284,6 +3358,10 @@ bool Session::HandshakeCompleted() {

Debug(this, "Session handshake completed");
impl_->state()->handshake_completed = 1;
// This implies fully completing a handshake without setting hello_processed
// (set during ALPN negotiation). Should be impossible unless ALPN flow is
// changed drastically, but good to check as it'd lose sessions.
DCHECK(!is_server() || hello_processed_);

STAT_RECORD_TIMESTAMP(Stats, handshake_completed_at);
SetStreamOpenAllowed();
Expand Down Expand Up @@ -3482,6 +3560,7 @@ void Session::set_max_datagram_size(uint16_t size) {

void Session::EmitGoaway(stream_id last_stream_id) {
if (is_destroyed()) return;
if (DeferEmit([this, last_stream_id] { EmitGoaway(last_stream_id); })) return;
if (!env()->can_call_into_js()) return;

CallbackScope<Session> cb_scope(this);
Expand All @@ -3496,6 +3575,14 @@ void Session::EmitGoaway(stream_id last_stream_id) {

void Session::EmitDatagram(Store&& datagram, DatagramReceivedFlags flag) {
DCHECK(!is_destroyed());

if (must_defer_emits()) {
QueueDeferredEmit([this, datagram = std::move(datagram), flag]() mutable {
EmitDatagram(std::move(datagram), flag);
});
return;
}

if (!env()->can_call_into_js()) return;

CallbackScope<Session> cbv_scope(this);
Expand All @@ -3511,6 +3598,8 @@ void Session::EmitDatagram(Store&& datagram, DatagramReceivedFlags flag) {
void Session::EmitDatagramStatus(datagram_id id, quic::DatagramStatus status) {
DCHECK(!is_destroyed());

if (DeferEmit([this, id, status] { EmitDatagramStatus(id, status); })) return;

if (!env()->can_call_into_js()) return;

CallbackScope<Session> cb_scope(this);
Expand Down Expand Up @@ -3672,6 +3761,7 @@ void Session::EmitSessionTicket(Store&& ticket) {

void Session::EmitApplication() {
if (is_destroyed()) return;
if (DeferEmit([this] { EmitApplication(); })) return;
if (!env()->can_call_into_js()) return;

if (!has_application()) {
Expand Down Expand Up @@ -3742,6 +3832,10 @@ void Session::EmitNewToken(const uint8_t* token, size_t len) {
void Session::EmitStream(const BaseObjectWeakPtr<Stream>& stream) {
DCHECK(!is_destroyed());

if (DeferEmit([this, stream] { EmitStream(stream); })) return;

if (!stream) return;

if (!env()->can_call_into_js()) return;
CallbackScope<Session> cb_scope(this);

Expand Down Expand Up @@ -3797,6 +3891,14 @@ void Session::EmitVersionNegotiation(const ngtcp2_pkt_hd& hd,

void Session::EmitOrigins(std::vector<std::string>&& origins) {
DCHECK(!is_destroyed());

if (must_defer_emits()) {
QueueDeferredEmit([this, origins = std::move(origins)]() mutable {
EmitOrigins(std::move(origins));
});
return;
}

if (!HasListenerFlag(impl_->state()->listener_flags,
SessionListenerFlags::ORIGIN))
return;
Expand All @@ -3822,11 +3924,17 @@ void Session::EmitOrigins(std::vector<std::string>&& origins) {

void Session::EmitKeylog(const char* line) {
DCHECK(!is_destroyed());

if (must_defer_emits()) {
QueueDeferredEmit(
[this, str = std::string(line)]() { EmitKeylog(str.c_str()); });
return;
}

if (!env()->can_call_into_js()) return;

auto str = std::string(line);
Local<Value> argv[] = {Undefined(env()->isolate())};
if (!ToV8Value(env()->context(), str).ToLocal(&argv[0])) {
if (!ToV8Value(env()->context(), std::string(line)).ToLocal(&argv[0])) {
Debug(this, "Failed to convert keylog line to V8 string");
return;
}
Expand Down
23 changes: 23 additions & 0 deletions src/quic/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,11 +536,32 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
// before the handshake completes.
void PopulateEarlyTransportParamsState();

void set_hello_processed() { hello_processed_ = true; }

// It's a terrible name but "wrapped" here means that the Session has been
// passed out to JavaScript and should be "wrapped" by whatever handler is
// defined there to manage it.
void set_wrapped();

// True while JS emits must be held for later replay, before the handshake
// is complete and the server session event has been emitted.
bool must_defer_emits() const;

// Replays, in order, any emits held while must_defer_emits() was true.
// Called synchronously right after the new-session emit.
void ReplayDeferredEmits();

// Queues fn to be replayed by ReplayDeferredEmits(). Out-of-line so the
// header does not need the full Impl definition.
void QueueDeferredEmit(std::function<void()> fn);

template <typename F>
bool DeferEmit(F&& fn) {
if (!must_defer_emits()) return false;
QueueDeferredEmit(std::forward<F>(fn));
return true;
}

enum class CloseMethod : uint8_t {
// Immediate close with a roundtrip through JavaScript, causing all
// currently opened streams to be closed. An attempt will be made to
Expand Down Expand Up @@ -656,6 +677,8 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
};
Flags flags_;

bool hello_processed_ = false;

QuicConnectionPointer connection_;
std::unique_ptr<TLSSession> tls_session_;
friend struct NgTcp2CallbackScope;
Expand Down
Loading
Loading