🐞fix(router): buffer NATS ChanSubscribe channel to prevent message drops#2758
🐞fix(router): buffer NATS ChanSubscribe channel to prevent message drops#2758Sanchez89 wants to merge 1 commit intowundergraph:mainfrom
Conversation
The EDFS NATS adapter uses an unbuffered channel (`make(chan *nats.Msg)`) for ChanSubscribe. The NATS client performs a non-blocking send on this channel — if the adapter goroutine is busy processing a previous message (metrics, filter evaluation, SSE flush), incoming messages are silently dropped as slow consumer events. This is particularly impactful on high-throughput wildcard subscriptions where the channel frequently has no receiver ready, causing significant message loss (~60% observed in production-like conditions). Buffer the channel with capacity 1024 to absorb bursts while the goroutine processes messages sequentially. For reference, the NATS client's own DefaultMaxChanLen is 64*1024 (65,536).
WalkthroughIn Changes
Estimated code review effort🎯 1 (Trivial) | ⏱️ ~4 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
router/pkg/pubsub/nats/adapter.go (1)
168-168: Make the channel buffer size configurable rather than hardcoded.Line 168 fixes the immediate drop issue, but a fixed
1024can still hit slow-consumer drops under different traffic profiles. Please expose this as provider config (with1024as default) so operators can tune it without redeploying code; this matches the operational guidance already logged inrouter/pkg/pubsub/nats/provider_builder.go.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router/pkg/pubsub/nats/adapter.go` at line 168, The channel buffer size for msgChan is hardcoded to 1024 causing potential slow-consumer drops; add a configurable buffer size option to the NATS provider config (use the existing provider_builder.go config structure), expose a field like NATSMessageBufferSize with a default of 1024, and initialize msgChan using that value (e.g., msgChan := make(chan *nats.Msg, cfg.NATSMessageBufferSize) inside the adapter where msgChan is created) so operators can tune it without redeploying.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@router/pkg/pubsub/nats/adapter.go`:
- Line 168: The channel buffer size for msgChan is hardcoded to 1024 causing
potential slow-consumer drops; add a configurable buffer size option to the NATS
provider config (use the existing provider_builder.go config structure), expose
a field like NATSMessageBufferSize with a default of 1024, and initialize
msgChan using that value (e.g., msgChan := make(chan *nats.Msg,
cfg.NATSMessageBufferSize) inside the adapter where msgChan is created) so
operators can tune it without redeploying.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: da356541-a885-4602-b423-e8f4e6d56a76
📒 Files selected for processing (1)
router/pkg/pubsub/nats/adapter.go
|
Hey, you might or might not be right on the nats client behaviour in regards to the non blocking send, as I haven't checked, but let's assume you're right. Sending on an unbuffered channel with a default means we're discarding if there's no immediate read. With a buffered channel, you're not solving the problem, you moved it to another place because it will still discard messages when the buffer is full. So if the goal is to not lose messages, which I think is a good goal, the change you made is not the solution. That said, this codebase will never allow such structural changes without proving them through tests. If you still consider fixing this, please solve the core issue and prove it through tests. To increase the probability that we can accept your implementation, I recommend that you create an appropriately sized RFC to get confirmation that your solution is right. Ideally you could propose multiple competing ideas, assuming that there's more than one way to solve this problem. If we can agree on a solution, the likelihood that we review and merge the code change increases dramatically. With LLM coding it's now very cheap and easy to "fix a problem in the simplest possible way", but that doesn't mean it's the right way. Someone has to put in the mental work to critically think a problem through, and if we merge external contributions, maintaining the code becomes our responsibility and we have to live with all the consequences. We appreciate that you're trying to use Cosmo and are even interested into improving it, but we see it as our responsibility towards our customers to keep the bar high. |
|
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
Description
The EDFS NATS adapter in
router/pkg/pubsub/nats/adapter.gocreates an unbuffered channel (make(chan *nats.Msg)) forChanSubscribe. The NATS client performs a non-blocking send on this channel:If the adapter goroutine is busy processing a previous message (metrics recording,
updater.Update(), filter evaluation, SSE flush), the unbuffered channel has zero capacity to absorb incoming messages. Every message that arrives while the goroutine is occupied is silently dropped as a slow consumer event.This is particularly impactful when using
@edfs__natsSubscribeon wildcard subjects (e.g.some.topic.>) that receive high message volumes. In our testing, we observed ~60% message loss — 8 NATS messages published, only 3-4 delivered via SSE.Fix
Buffer the channel with capacity 1024:
After this one-line change, we observed 0% message loss in the same test scenario.
For reference, the NATS client's own
DefaultMaxChanLenis64 * 1024(65,536). A buffer of 1024 is conservative but sufficient to absorb burst processing latency.Future Improvement Suggestion
Consider making the buffer size configurable via the events provider configuration:
The JetStream path in the same adapter (
FetchNoWait(300)polling at line 136) does not suffer from this issue as it doesn't rely on channel-based delivery.Checklist
Summary by CodeRabbit