Skip to content

🐞fix(router): buffer NATS ChanSubscribe channel to prevent message drops#2758

Open
Sanchez89 wants to merge 1 commit intowundergraph:mainfrom
Sanchez89:fix/nats-edfs-unbuffered-channel
Open

🐞fix(router): buffer NATS ChanSubscribe channel to prevent message drops#2758
Sanchez89 wants to merge 1 commit intowundergraph:mainfrom
Sanchez89:fix/nats-edfs-unbuffered-channel

Conversation

@Sanchez89
Copy link
Copy Markdown

@Sanchez89 Sanchez89 commented Apr 13, 2026

Description

The EDFS NATS adapter in router/pkg/pubsub/nats/adapter.go creates an unbuffered channel (make(chan *nats.Msg)) for ChanSubscribe. The NATS client performs a non-blocking send on this channel:

case sub.mch <- m:
default:
    goto slowConsumer  // message silently dropped

If the adapter goroutine is busy processing a previous message (metrics recording, updater.Update(), filter evaluation, SSE flush), the unbuffered channel has zero capacity to absorb incoming messages. Every message that arrives while the goroutine is occupied is silently dropped as a slow consumer event.

This is particularly impactful when using @edfs__natsSubscribe on wildcard subjects (e.g. some.topic.>) that receive high message volumes. In our testing, we observed ~60% message loss — 8 NATS messages published, only 3-4 delivered via SSE.

Fix

Buffer the channel with capacity 1024:

// Before
msgChan := make(chan *nats.Msg)       // unbuffered — capacity 0

// After
msgChan := make(chan *nats.Msg, 1024) // buffered — absorbs processing latency

After this one-line change, we observed 0% message loss in the same test scenario.

For reference, the NATS client's own DefaultMaxChanLen is 64 * 1024 (65,536). A buffer of 1024 is conservative but sufficient to absorb burst processing latency.

Future Improvement Suggestion

Consider making the buffer size configurable via the events provider configuration:

events:
  providers:
    nats:
      - id: my-nats
        url: nats://localhost:4222
        channel_buffer_size: 1024

The JetStream path in the same adapter (FetchNoWait(300) polling at line 136) does not suffer from this issue as it doesn't rely on channel-based delivery.

Checklist

  • Followed the coding standards of the project
  • Read the Contributors Guide
  • Tests or benchmarks added/updated
  • Documentation updated

Summary by CodeRabbit

  • Bug Fixes
    • Improved message handling reliability in the NATS pub/sub adapter to prevent potential message delivery delays under high-load conditions.

The EDFS NATS adapter uses an unbuffered channel (`make(chan *nats.Msg)`)
for ChanSubscribe. The NATS client performs a non-blocking send on this
channel — if the adapter goroutine is busy processing a previous message
(metrics, filter evaluation, SSE flush), incoming messages are silently
dropped as slow consumer events.

This is particularly impactful on high-throughput wildcard subscriptions
where the channel frequently has no receiver ready, causing significant
message loss (~60% observed in production-like conditions).

Buffer the channel with capacity 1024 to absorb bursts while the
goroutine processes messages sequentially. For reference, the NATS
client's own DefaultMaxChanLen is 64*1024 (65,536).
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 13, 2026

Walkthrough

In ProviderAdapter.Subscribe for non-JetStream subscriptions, the internal NATS message channel was modified from an unbuffered channel to a buffered channel with capacity 1024, changing how messages queue during subscription handling.

Changes

Cohort / File(s) Summary
NATS Channel Buffering
router/pkg/pubsub/nats/adapter.go
Modified the internal message channel in ProviderAdapter.Subscribe from unbuffered (make(chan *nats.Msg)) to buffered with capacity 1024 (make(chan *nats.Msg, 1024)) for non-JetStream subscription handling.

Estimated code review effort

🎯 1 (Trivial) | ⏱️ ~4 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly identifies the main change: buffering a NATS ChanSubscribe channel to prevent message drops, which directly addresses the core issue in the changeset.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
router/pkg/pubsub/nats/adapter.go (1)

168-168: Make the channel buffer size configurable rather than hardcoded.

Line 168 fixes the immediate drop issue, but a fixed 1024 can still hit slow-consumer drops under different traffic profiles. Please expose this as provider config (with 1024 as default) so operators can tune it without redeploying code; this matches the operational guidance already logged in router/pkg/pubsub/nats/provider_builder.go.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@router/pkg/pubsub/nats/adapter.go` at line 168, The channel buffer size for
msgChan is hardcoded to 1024 causing potential slow-consumer drops; add a
configurable buffer size option to the NATS provider config (use the existing
provider_builder.go config structure), expose a field like NATSMessageBufferSize
with a default of 1024, and initialize msgChan using that value (e.g., msgChan
:= make(chan *nats.Msg, cfg.NATSMessageBufferSize) inside the adapter where
msgChan is created) so operators can tune it without redeploying.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@router/pkg/pubsub/nats/adapter.go`:
- Line 168: The channel buffer size for msgChan is hardcoded to 1024 causing
potential slow-consumer drops; add a configurable buffer size option to the NATS
provider config (use the existing provider_builder.go config structure), expose
a field like NATSMessageBufferSize with a default of 1024, and initialize
msgChan using that value (e.g., msgChan := make(chan *nats.Msg,
cfg.NATSMessageBufferSize) inside the adapter where msgChan is created) so
operators can tune it without redeploying.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: da356541-a885-4602-b423-e8f4e6d56a76

📥 Commits

Reviewing files that changed from the base of the PR and between bb537ba and 6580928.

📒 Files selected for processing (1)
  • router/pkg/pubsub/nats/adapter.go

@jensneuse
Copy link
Copy Markdown
Member

Hey, you might or might not be right on the nats client behaviour in regards to the non blocking send, as I haven't checked, but let's assume you're right. Sending on an unbuffered channel with a default means we're discarding if there's no immediate read. With a buffered channel, you're not solving the problem, you moved it to another place because it will still discard messages when the buffer is full. So if the goal is to not lose messages, which I think is a good goal, the change you made is not the solution. That said, this codebase will never allow such structural changes without proving them through tests. If you still consider fixing this, please solve the core issue and prove it through tests.

To increase the probability that we can accept your implementation, I recommend that you create an appropriately sized RFC to get confirmation that your solution is right. Ideally you could propose multiple competing ideas, assuming that there's more than one way to solve this problem. If we can agree on a solution, the likelihood that we review and merge the code change increases dramatically.

With LLM coding it's now very cheap and easy to "fix a problem in the simplest possible way", but that doesn't mean it's the right way. Someone has to put in the mental work to critically think a problem through, and if we merge external contributions, maintaining the code becomes our responsibility and we have to live with all the consequences. We appreciate that you're trying to use Cosmo and are even interested into improving it, but we see it as our responsibility towards our customers to keep the bar high.

@github-actions
Copy link
Copy Markdown

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions Bot added the Stale label Apr 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants