Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/brpc/event_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
#include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32
#include "bvar/latency_recorder.h" // bvar::LatencyRecorder
#include "bthread/bthread.h" // bthread_start_background
#include "bthread/task_group.h" // TaskGroup::address_meta
#include "brpc/event_dispatcher.h"

DECLARE_int32(task_group_ntags);

namespace brpc {
DECLARE_int32(event_dispatcher_num);

DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
namespace brpc {
DEFINE_bool(event_dispatcher_edisp_unsched, false,
"Disable event dispatcher schedule");

Expand Down Expand Up @@ -66,6 +67,7 @@ void InitializeGlobalDispatchers() {
bthread_attr_t attr =
FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
g_edisp[i * FLAGS_event_dispatcher_num + j].set_priority_index(j);
CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/brpc/event_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ template <typename T> friend class IOEvent;
// Stop bthread of this dispatcher.
void Stop();

void set_priority_index(int idx) { _priority_index = idx; }

// Suspend calling thread until bthread of this dispatcher stops.
void Join();

Expand Down Expand Up @@ -188,6 +190,8 @@ template <typename T> friend class IOEvent;

// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
int _wakeup_fds[2];

int _priority_index{-1};
};

EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);
Expand Down
8 changes: 7 additions & 1 deletion src/brpc/event_dispatcher_epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,13 @@ int EventDispatcher::RemoveConsumer(int fd) {
}

void* EventDispatcher::RunThis(void* arg) {
((EventDispatcher*)arg)->Run();
EventDispatcher* ed = (EventDispatcher*)arg;
if (ed->_priority_index >= 0) {
bthread::TaskMeta* meta =
bthread::TaskGroup::address_meta(bthread_self());
meta->priority_index = ed->_priority_index;
}
ed->Run();
return NULL;
}

Expand Down
40 changes: 32 additions & 8 deletions src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ DEFINE_string(cpu_set, "",
"Set of CPUs to which cores are bound. "
"for example, 0-3,5,7; default: disable");

DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");

namespace bthread {

DEFINE_bool(parking_lot_no_signal_when_no_waiter, false,
Expand Down Expand Up @@ -205,11 +207,28 @@ TaskControl::TaskControl()
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
, _enable_priority_queue(FLAGS_enable_bthread_priority_queue)
, _priority_queues(FLAGS_task_group_ntags)
, _pq_num_of_each_tag(FLAGS_event_dispatcher_num)
, _priority_queues(FLAGS_task_group_ntags * FLAGS_event_dispatcher_num)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If priority_queues only applies to EventDispatcher, then does that mean we only need one slot per EventDispatcher, and not a queue per EventDispatcher?

@yannan-wyn yannan-wyn Jun 8, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the per-ED WSQ may be better than an atomic slot here.

Currently the per-ED WSQ will meet 1-producer and N consumers (1:N model);
And WSQ has a cheaper empty steal path. When the queue is empty, WSQ::steal() only loads _top and _bottom and returns false:

 t = _top.load(acquire);
  b = _bottom.load(acquire);
  if (t >= b) {
      return false;
  }

but atomic slot may

  slot.exchange(INVALID_BTHREAD)

An atomic-slot implementation can be more intuitive, but if workers scan empty slots, it may cause more cacheline invalidation.

Or maybe I can add comments to show this trade-off?
@chenBright

@yannan-wyn yannan-wyn Jun 8, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If priority_queues only applies to EventDispatcher, then does that mean we only need one slot per EventDispatcher, and not a queue per EventDispatcher?

And I perform a E2E bench to show the difference.

Percentile PQ OFF per-ED WSQ atomic slot
avg 55μs 47μs 48μs
p50 53μs 47μs 46μs
p90 63μs 53μs 56μs
p99 77μs 70μs 79μs
p99.9 96μs 96μs 598μs
p99.99 345μs 345μs 1687μs

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And WSQ has a cheaper empty steal path. When the queue is empty, WSQ::steal() only loads _top and _bottom and returns false:

 t = _top.load(acquire);
  b = _bottom.load(acquire);
  if (t >= b) {
      return false;
  }

but atomic slot may

  slot.exchange(INVALID_BTHREAD)

The equivalent operation for slot should be slot.load(), right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps renaming priority_queues to event_dispatcher_priority_queues would be more appropriate.

, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
, _tagged_pl(FLAGS_task_group_ntags)
{}

int TaskControl::init_priority_queues() {
if (!_enable_priority_queue) {
return 0;
}
for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
for (int j = 0; j < _pq_num_of_each_tag; ++j) {
if (priority_queue(i, j).init(BTHREAD_MAX_CONCURRENCY) != 0) {
LOG(ERROR) << "Fail to init priority queue for tag=" << i
<< " ed=" << j;
return -1;
}
}
}
return 0;
}

int TaskControl::init(int concurrency) {
if (_concurrency != 0) {
LOG(ERROR) << "Already initialized";
Expand Down Expand Up @@ -238,10 +257,10 @@ int TaskControl::init(int concurrency) {
_tagged_worker_usage_second.push_back(new bvar::PerSecond<bvar::PassiveStatus<double>>(
"bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1));
_tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count", tag_str));
if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) {
LOG(ERROR) << "Fail to init _priority_q";
return -1;
}
}

if (init_priority_queues() != 0) {
return -1;
}

// Make sure TimerThread is ready.
Expand Down Expand Up @@ -445,7 +464,7 @@ TaskControl::~TaskControl() {
_switch_per_second.hide();
_signal_per_second.hide();
_status.hide();

stop_and_join();
}

Expand Down Expand Up @@ -528,8 +547,12 @@ int TaskControl::_destroy_group(TaskGroup* g) {
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
auto tag = tls_task_group->tag();

if (_priority_queues[tag].steal(tid)) {
return true;
if (_enable_priority_queue) {
for (int i = 0; i < _pq_num_of_each_tag; ++i) {
if (priority_queue(tag, i).steal(tid)) {
return true;
}
}
}

// 1: Acquiring fence is paired with releasing fence in _add_group to
Expand Down Expand Up @@ -689,4 +712,5 @@ std::vector<bthread_t> TaskControl::get_living_bthreads() {
return living_bthread_ids;
}


} // namespace bthread
13 changes: 11 additions & 2 deletions src/bthread/task_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ friend bthread_t init_for_pthread_stack_trace();
std::string stack_trace(bthread_t tid);
#endif // BRPC_BTHREAD_TRACER

void push_priority_queue(bthread_tag_t tag, bthread_t tid) {
_priority_queues[tag].push(tid);
void push_priority_queue(bthread_tag_t tag, int priority_index, bthread_t tid) {
priority_queue(tag, priority_index).push(tid);
}

std::vector<bthread_t> get_living_bthreads();

private:
typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot;
Expand All @@ -123,6 +124,13 @@ friend bthread_t init_for_pthread_stack_trace();
// Tag parking slot
TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; }

// Priority queue for a specific ED within a tag
WorkStealingQueue<bthread_t>& priority_queue(bthread_tag_t tag, int index) {
return _priority_queues[tag * _pq_num_of_each_tag + index];
}

int init_priority_queues();

static void delete_task_group(void* arg);

static void* worker_thread(void* task_control);
Expand Down Expand Up @@ -164,6 +172,7 @@ friend bthread_t init_for_pthread_stack_trace();
std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;

bool _enable_priority_queue;
int _pq_num_of_each_tag;
std::vector<WorkStealingQueue<bthread_t>> _priority_queues;

size_t _pl_num_of_each_tag;
Expand Down
9 changes: 8 additions & 1 deletion src/bthread/task_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
m->priority_index = pg ? (*pg)->_cur_meta->priority_index : -1;
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
Expand Down Expand Up @@ -595,6 +596,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
m->cpuwide_start_ns = start_ns;
m->stat = EMPTY_STAT;
m->tid = make_tid(*m->version_butex, slot);
m->priority_index = _cur_meta->priority_index;
*th = m->tid;
if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) {
LOG(INFO) << "Started bthread " << m->tid;
Expand Down Expand Up @@ -674,6 +676,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) {
TaskGroup* g = *pg;
bthread_t next_tid = 0;
// Find next task to run, if none, switch to idle thread of the group.

#ifndef BTHREAD_FAIR_WSQ
// When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of
// WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9%
Expand Down Expand Up @@ -942,7 +945,11 @@ void TaskGroup::priority_to_run(void* args_in) {
tls_task_group->_control->_task_tracer.set_status(
TASK_STATUS_READY, args->meta);
#endif // BRPC_BTHREAD_TRACER
return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid);
if (args->meta->priority_index < 0) {
return tls_task_group->push_rq(args->meta->tid);
}
return tls_task_group->control()->push_priority_queue(
args->tag, args->meta->priority_index, args->meta->tid);
}

struct SleepArgs {
Expand Down
2 changes: 2 additions & 0 deletions src/bthread/task_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ struct TaskMeta {
// simplified if they can get tid from TaskMeta.
bthread_t tid{INVALID_BTHREAD};

int priority_index{-1};

// User function and argument
void* (*fn)(void*){NULL};
void* arg{NULL};
Expand Down
Loading
Loading