Skip to content

Commit 4affdc0

Browse files
Defer body.close until final chunk is written. (#53)
1 parent 5b4ab72 commit 4affdc0

4 files changed

Lines changed: 150 additions & 49 deletions

File tree

lib/protocol/http1/connection.rb

Lines changed: 56 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -614,32 +614,34 @@ def write_fixed_length_body(body, length, head)
614614

615615
if head
616616
@stream.flush
617+
else
618+
@stream.flush unless body.ready?
617619

618-
body.close
620+
chunk_length = 0
621+
# Use a manual read loop (not body.each) so that body.close runs after the response is fully written and flushed. This ensures completion callbacks (e.g. rack.response_finished) don't delay the client.
622+
while chunk = body.read
623+
chunk_length += chunk.bytesize
624+
625+
if chunk_length > length
626+
raise ContentLengthError, "Trying to write #{chunk_length} bytes, but content length was #{length} bytes!"
627+
end
628+
629+
@stream.write(chunk)
630+
@stream.flush unless body.ready?
631+
end
619632

620-
return
621-
end
622-
623-
@stream.flush unless body.ready?
624-
625-
chunk_length = 0
626-
body.each do |chunk|
627-
chunk_length += chunk.bytesize
633+
@stream.flush
628634

629-
if chunk_length > length
630-
raise ContentLengthError, "Trying to write #{chunk_length} bytes, but content length was #{length} bytes!"
635+
if chunk_length != length
636+
raise ContentLengthError, "Wrote #{chunk_length} bytes, but content length was #{length} bytes!"
631637
end
632-
633-
@stream.write(chunk)
634-
@stream.flush unless body.ready?
635-
end
636-
637-
@stream.flush
638-
639-
if chunk_length != length
640-
raise ContentLengthError, "Wrote #{chunk_length} bytes, but content length was #{length} bytes!"
641638
end
639+
rescue => error
640+
raise
642641
ensure
642+
# Close the body after the response is fully flushed, so that completion callbacks run after the client has received the response:
643+
body.close(error)
644+
643645
self.send_end_stream!
644646
end
645647

@@ -657,34 +659,36 @@ def write_chunked_body(body, head, trailer = nil)
657659

658660
if head
659661
@stream.flush
662+
else
663+
@stream.flush unless body.ready?
660664

661-
body.close
662-
663-
return
664-
end
665-
666-
@stream.flush unless body.ready?
667-
668-
body.each do |chunk|
669-
next if chunk.size == 0
665+
# Use a manual read loop (not body.each) so that body.close runs after the terminal chunk is written. With body.each, the ensure { close } fires before the terminal "0\r\n\r\n" is sent, delaying the client.
666+
while chunk = body.read
667+
next if chunk.size == 0
668+
669+
@stream.write("#{chunk.bytesize.to_s(16).upcase}\r\n")
670+
@stream.write(chunk)
671+
@stream.write(CRLF)
672+
673+
@stream.flush unless body.ready?
674+
end
670675

671-
@stream.write("#{chunk.bytesize.to_s(16).upcase}\r\n")
672-
@stream.write(chunk)
673-
@stream.write(CRLF)
676+
if trailer&.any?
677+
@stream.write("0\r\n")
678+
write_headers(trailer)
679+
@stream.write("\r\n")
680+
else
681+
@stream.write("0\r\n\r\n")
682+
end
674683

675-
@stream.flush unless body.ready?
676-
end
677-
678-
if trailer&.any?
679-
@stream.write("0\r\n")
680-
write_headers(trailer)
681-
@stream.write("\r\n")
682-
else
683-
@stream.write("0\r\n\r\n")
684+
@stream.flush
684685
end
685-
686-
@stream.flush
686+
rescue => error
687+
raise
687688
ensure
689+
# Close the body after the complete chunked response (including terminal chunk) is flushed, so that completion callbacks don't block the client from seeing the response as complete:
690+
body.close(error)
691+
688692
self.send_end_stream!
689693
end
690694

@@ -697,12 +701,11 @@ def write_body_and_close(body, head)
697701
@persistent = false
698702

699703
@stream.write("\r\n")
700-
@stream.flush unless body.ready?
701704

702-
if head
703-
body.close
704-
else
705-
body.each do |chunk|
705+
unless head
706+
@stream.flush unless body.ready?
707+
708+
while chunk = body.read
706709
@stream.write(chunk)
707710

708711
@stream.flush unless body.ready?
@@ -711,7 +714,12 @@ def write_body_and_close(body, head)
711714

712715
@stream.flush
713716
@stream.close_write
717+
rescue => error
718+
raise
714719
ensure
720+
# Close the body after the stream is fully written and half-closed, so that completion callbacks run after the client has received the full response:
721+
body.close(error)
722+
715723
self.send_end_stream!
716724
end
717725

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Defer `body.close` in `write_chunked_body`, `write_fixed_length_body`, and `write_body_and_close` until after the response is fully written and flushed. Previously, `body.each` called `close` in its `ensure` block before the terminal chunk (chunked encoding) or final flush was written, causing `rack.response_finished` callbacks to delay the client-visible response completion.
6+
37
## v0.37.0
48

59
- `Protocol::HTTP1::BadRequest` now includes `Protocol::HTTP::BadRequest` for better interoperability and handling of bad request errors across different HTTP protocol implementations.
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2026, by Samuel Williams.
5+
6+
require "protocol/http1/connection"
7+
require "protocol/http/body/buffered"
8+
require "protocol/http/body/wrapper"
9+
10+
require "connection_context"
11+
12+
# A body wrapper whose close callback reads the client side of the socket to verify
13+
# that the response framing is already complete at close time.
14+
class StreamCheckingBody < Protocol::HTTP::Body::Wrapper
15+
attr_reader :client_data_at_close
16+
17+
def initialize(body, client_socket)
18+
super(body)
19+
@client_socket = client_socket
20+
@client_data_at_close = nil
21+
end
22+
23+
def close(error = nil)
24+
# Non-blocking read of everything available on the client socket right now.
25+
# If the response was fully flushed before close, all framing bytes are here.
26+
@client_data_at_close = +""
27+
28+
loop do
29+
@client_data_at_close << @client_socket.read_nonblock(65536)
30+
rescue IO::WaitReadable, EOFError
31+
break
32+
end
33+
34+
super
35+
end
36+
end
37+
38+
describe Protocol::HTTP1::Connection do
39+
include_context ConnectionContext
40+
41+
before do
42+
server.open!
43+
client.open!
44+
end
45+
46+
with "#write_body_and_close" do
47+
it "flushes all data to the stream before closing body" do
48+
body = StreamCheckingBody.new(
49+
Protocol::HTTP::Body::Buffered.new(["Hello", " ", "World"]),
50+
sockets.first,
51+
)
52+
53+
server.write_body_and_close(body, false)
54+
55+
expect(body.client_data_at_close).not.to be_nil
56+
expect(body.client_data_at_close).to be(:include?, "Hello")
57+
expect(body.client_data_at_close).to be(:include?, "World")
58+
end
59+
end
60+
61+
with "#write_chunked_body" do
62+
it "writes terminal chunk before closing body" do
63+
body = StreamCheckingBody.new(
64+
Protocol::HTTP::Body::Buffered.new(["Hello", " ", "World"]),
65+
sockets.first,
66+
)
67+
68+
server.write_chunked_body(body, false)
69+
70+
expect(body.client_data_at_close).not.to be_nil
71+
# Terminal chunk should already be on the wire:
72+
expect(body.client_data_at_close).to be(:include?, "\r\n0\r\n\r\n")
73+
end
74+
end
75+
76+
with "#write_fixed_length_body" do
77+
it "writes all data before closing body" do
78+
body = StreamCheckingBody.new(
79+
Protocol::HTTP::Body::Buffered.new(["Hello", " ", "World"]),
80+
sockets.first,
81+
)
82+
83+
server.write_fixed_length_body(body, 11, false)
84+
85+
expect(body.client_data_at_close).not.to be_nil
86+
expect(body.client_data_at_close).to be(:include?, "Hello World")
87+
end
88+
end
89+
end

test/protocol/http1/hijack.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
server.open!
3333

3434
expect(body).to receive(:ready?).and_return(false)
35-
expect(body).to receive(:each).and_return(nil)
35+
expect(body).to receive(:read).and_return(nil)
3636
server.write_response(response_version, 101, {"upgrade" => "websocket"})
3737
server.write_body(response_version, body)
3838

0 commit comments

Comments
 (0)