Skip to content

Add concurrency_limit middleware#447

Open
zdenek-crha wants to merge 1 commit into
tower-rs:mainfrom
zdenek-crha:concurrency_limit_middleware
Open

Add concurrency_limit middleware#447
zdenek-crha wants to merge 1 commit into
tower-rs:mainfrom
zdenek-crha:concurrency_limit_middleware

Conversation

@zdenek-crha

Copy link
Copy Markdown

Motivation

The tower::limit::ConcurrencyLimit middleware does not work properly when used with services that return response with streaming body. The middleware considers concurrency only from call to service to response future resolution.

But response with streaming body can't be considered finished until body has been consumed.

Solution

Add concurrency_limit module with ConcurrencyLimit middleware implementation that holds on the semaphore permit until response and its body has been consumed.

It uses original middleware from tower crate and tower-http::metrics::InFlightRequests middleware as inspiration.

Questions/Considerations

As far as module organization goes, it would be nicer to add this as tower-http::limit::concurrency, similar to the module layout in tower crate. But the tower-http::limit module is fully dedicated to RequestBody middleware now and moving it into sub-module would probably be breaking change.

I have considered leaving RequestBody middleware where it is and just add tower-http::limit::concurrency sub-module, but it felt confusing. Adding another top level module felt as cleanest alternative I had.

The tower::limit::ConcurrencyLimit middleware does not work properly
when used with services that return response with streaming body. The
middleware considers concurrency only from call to service to response
future resolution.

But response with streaming body can't be considered finished until body
has been consumed.

Add concurrency_limit module with ConcurrencyLimit middleware
implementation that holds on the semaphore permit until response and its
body has been consumed.

It uses original middleware from `tower` crate and
`tower-http::metrics::InFlightRequests` middleware as inspiration.
@zdenek-crha zdenek-crha force-pushed the concurrency_limit_middleware branch from 3d91fa0 to 1524515 Compare December 10, 2023 12:13

@jlizen jlizen left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zdenek-crha this is great! Sorry it took so long to get a review.

I'm on board with the approach but had some small tweaks.

Let me know if you no longer have time to work on this, no problem, I don't mind adding a revision on top.

@@ -0,0 +1,312 @@
//! Limit the max number of concurrently processed requests.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(totally optional): might be nice to mention that this works well with the new OnEarlyDrop layer in case the request is waiting for a while and the client gives up.

Comment thread tower-http/src/lib.rs
#[cfg(feature = "follow-redirect")]
pub mod follow_redirect;

#[cfg(feature = "limit")]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we couldn't locate this as a submodule within limit and just re-export its types? We also could reorganize the existing code using the #[path] annotation without breaking changes.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do the work no problem, but I would appreciate guidance on to which shape to steer the code. My approach probably would be:

  • move docs and private modules from limit.rs to public limit/body_limit.rs
  • keep current re-exports in limit.rs for backward compatibility
  • have same re-exports in limit/body_limit.rs to be consistent with to-be-added module

Then, in second commit:

  • add limit/concurrency_limit.rs
  • mimic the limit/body_limit.rs structure with private modules and re-exports for consistency

I like limit/body.rs and limit/concurrency.rs better because they are shorter and don't duplicate the 'limit' word in path. But having two, nested body modules is not nice, so would probably go with the longer variant.

With the original re-exports remaining in limit.rs, there should not be breaking change (but I may be missing something).

I have not used the #[path] attribute before. I've read the docs, but I'm not sure how it could be used here. (but I'm willing to learn :-D)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems fine to me.

move docs and private modules from limit.rs to public limit/body_limit.rs

Specifically we should maintain the same visibility of types from a consumer perspective.

I like limit/body.rs and limit/concurrency.rs better because they are shorter and don't duplicate the 'limit' word in path. But having two, nested body modules is not nice, so would probably go with the longer variant.

No strong opinion here.

//!
//! The `tower::limit::concurrency::ConcurrencyLimit` service uses a different definition of
//! 'request processing'. It starts when request is received by `tower::Service::call`, and ends
//! immediatelly after response is produced.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! immediatelly after response is produced.
//! immediately after response is produced.

//! In some cases it may not work properly with [`http::Response`], as it does not account for
//! process of consuming response body.
//!
//! When stream is used as response body, the process of consumig it (ie streaming to called) may

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! When stream is used as response body, the process of consumig it (ie streaming to called) may
//! When stream is used as response body, the process of consuming it (ie streaming to caller) may

//! process of consuming response body.
//!
//! When stream is used as response body, the process of consumig it (ie streaming to called) may
//! take longer and use more resources than just producing the response itself. And often it the

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! take longer and use more resources than just producing the response itself. And often it the
//! take longer and use more resources than just producing the response itself. And, often, it is the

//!
//! When stream is used as response body, the process of consumig it (ie streaming to called) may
//! take longer and use more resources than just producing the response itself. And often it the
//! number of streams we are processing concurrently we want to limit.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! number of streams we are processing concurrently we want to limit.
//! number of streams we are processing concurrently that we want to limit.

}
}

/// Middleware that limits max number fo concurrent in-flight requests.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Middleware that limits max number fo concurrent in-flight requests.
/// Middleware that limits max number of concurrent in-flight requests.

}

impl SharedConcurrencyLimitLayer {
/// Create new [`ConcurrencyLimitLayer`] with shared semaphore

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Create new [`ConcurrencyLimitLayer`] with shared semaphore
/// Create new [`SharedConcurrencyLimitLayer`] with a shared semaphore.

}
}

#[cfg(test)]

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have tests for:

  • ConcurrencyLimitLayer (on its own), probably including probing the unique semaphore management per cloned layer
  • Probe the actual concurrency limiting behavior using eg 2 requests but a limit of 1
  • Request that returns an error, validate that the permit is still released
  • Body drop without consumption

@zdenek-crha

zdenek-crha commented May 22, 2026

Copy link
Copy Markdown
Author

@jlizen Wow, I have completely forgot I authored this one :-)

I will be able to find some time to push it forward next week.

I don't use github PRs much, do you prefer another commit with fixes on top or adding the fixes into existing commit and force-push?

Since the PR is quite old, maybe even rebase and re-do it on current master?

@jlizen

jlizen commented May 22, 2026

Copy link
Copy Markdown
Member

Any of them is fine with me. If it were me, I would probably start fresh from main, and manually pull the changes back over, then force push on top. But whatever feels least annoying.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants