diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp index d4316beff2..15152fc8f7 100644 --- a/src/brpc/event_dispatcher.cpp +++ b/src/brpc/event_dispatcher.cpp @@ -23,13 +23,14 @@ #include "butil/third_party/murmurhash3/murmurhash3.h"// fmix32 #include "bvar/latency_recorder.h" // bvar::LatencyRecorder #include "bthread/bthread.h" // bthread_start_background +#include "bthread/task_group.h" // TaskGroup::address_meta #include "brpc/event_dispatcher.h" DECLARE_int32(task_group_ntags); -namespace brpc { +DECLARE_int32(event_dispatcher_num); -DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher"); +namespace brpc { DEFINE_bool(event_dispatcher_edisp_unsched, false, "Disable event dispatcher schedule"); @@ -66,6 +67,7 @@ void InitializeGlobalDispatchers() { bthread_attr_t attr = FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL; attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags; + g_edisp[i * FLAGS_event_dispatcher_num + j].set_priority_index(j); CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr)); } } diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h index 3fdc9f17b9..d95243ac2d 100644 --- a/src/brpc/event_dispatcher.h +++ b/src/brpc/event_dispatcher.h @@ -114,6 +114,8 @@ template friend class IOEvent; // Stop bthread of this dispatcher. void Stop(); + void set_priority_index(int idx) { _priority_index = idx; } + // Suspend calling thread until bthread of this dispatcher stops. void Join(); @@ -188,6 +190,8 @@ template friend class IOEvent; // Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit int _wakeup_fds[2]; + + int _priority_index{-1}; }; EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag); diff --git a/src/brpc/event_dispatcher_epoll.cpp b/src/brpc/event_dispatcher_epoll.cpp index 5a6c23b0e5..31f60aace3 100644 --- a/src/brpc/event_dispatcher_epoll.cpp +++ b/src/brpc/event_dispatcher_epoll.cpp @@ -190,7 +190,13 @@ int EventDispatcher::RemoveConsumer(int fd) { } void* EventDispatcher::RunThis(void* arg) { - ((EventDispatcher*)arg)->Run(); + EventDispatcher* ed = (EventDispatcher*)arg; + if (ed->_priority_index >= 0) { + bthread::TaskMeta* meta = + bthread::TaskGroup::address_meta(bthread_self()); + meta->priority_index = ed->_priority_index; + } + ed->Run(); return NULL; } diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index ba067e3976..347dbd24b4 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -51,6 +51,8 @@ DEFINE_string(cpu_set, "", "Set of CPUs to which cores are bound. " "for example, 0-3,5,7; default: disable"); +DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher"); + namespace bthread { DEFINE_bool(parking_lot_no_signal_when_no_waiter, false, @@ -205,11 +207,31 @@ TaskControl::TaskControl() , _status(print_rq_sizes_in_the_tc, this) , _nbthreads("bthread_count") , _enable_priority_queue(FLAGS_enable_bthread_priority_queue) - , _priority_queues(FLAGS_task_group_ntags) + , _ed_priority_queue_num_of_each_tag(FLAGS_event_dispatcher_num) + , _ed_priority_queues( + FLAGS_task_group_ntags * FLAGS_event_dispatcher_num) , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag) , _tagged_pl(FLAGS_task_group_ntags) {} +int TaskControl::init_ed_priority_queues() { + if (!_enable_priority_queue) { + return 0; + } + for (int i = 0; i < FLAGS_task_group_ntags; ++i) { + for (int j = 0; + j < _ed_priority_queue_num_of_each_tag; ++j) { + if (ed_priority_queue(i, j).init( + BTHREAD_MAX_CONCURRENCY) != 0) { + LOG(ERROR) << "Fail to init priority queue for tag=" << i + << " ed=" << j; + return -1; + } + } + } + return 0; +} + int TaskControl::init(int concurrency) { if (_concurrency != 0) { LOG(ERROR) << "Already initialized"; @@ -238,10 +260,10 @@ int TaskControl::init(int concurrency) { _tagged_worker_usage_second.push_back(new bvar::PerSecond>( "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i], 1)); _tagged_nbthreads.push_back(new bvar::Adder("bthread_count", tag_str)); - if (_priority_queues[i].init(BTHREAD_MAX_CONCURRENCY) != 0) { - LOG(ERROR) << "Fail to init _priority_q"; - return -1; - } + } + + if (init_ed_priority_queues() != 0) { + return -1; } // Make sure TimerThread is ready. @@ -445,7 +467,7 @@ TaskControl::~TaskControl() { _switch_per_second.hide(); _signal_per_second.hide(); _status.hide(); - + stop_and_join(); } @@ -528,8 +550,13 @@ int TaskControl::_destroy_group(TaskGroup* g) { bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) { auto tag = tls_task_group->tag(); - if (_priority_queues[tag].steal(tid)) { - return true; + if (_enable_priority_queue) { + for (int i = 0; + i < _ed_priority_queue_num_of_each_tag; ++i) { + if (ed_priority_queue(tag, i).steal(tid)) { + return true; + } + } } // 1: Acquiring fence is paired with releasing fence in _add_group to @@ -689,4 +716,5 @@ std::vector TaskControl::get_living_bthreads() { return living_bthread_ids; } + } // namespace bthread diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 4480daa677..93336199b7 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -101,11 +101,13 @@ friend bthread_t init_for_pthread_stack_trace(); std::string stack_trace(bthread_t tid); #endif // BRPC_BTHREAD_TRACER - void push_priority_queue(bthread_tag_t tag, bthread_t tid) { - _priority_queues[tag].push(tid); + void push_ed_priority_queue( + bthread_tag_t tag, int priority_index, bthread_t tid) { + ed_priority_queue(tag, priority_index).push(tid); } std::vector get_living_bthreads(); + private: typedef std::array TaggedGroups; typedef std::array TaggedParkingLot; @@ -123,6 +125,15 @@ friend bthread_t init_for_pthread_stack_trace(); // Tag parking slot TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; } + // Priority queue for a specific ED within a tag + WorkStealingQueue& ed_priority_queue( + bthread_tag_t tag, int index) { + return _ed_priority_queues[ + tag * _ed_priority_queue_num_of_each_tag + index]; + } + + int init_ed_priority_queues(); + static void delete_task_group(void* arg); static void* worker_thread(void* task_control); @@ -164,7 +175,8 @@ friend bthread_t init_for_pthread_stack_trace(); std::vector*> _tagged_nbthreads; bool _enable_priority_queue; - std::vector> _priority_queues; + int _ed_priority_queue_num_of_each_tag; + std::vector> _ed_priority_queues; size_t _pl_num_of_each_tag; std::vector _tagged_pl; diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp index 4706b7f77e..c0804c9a63 100644 --- a/src/bthread/task_group.cpp +++ b/src/bthread/task_group.cpp @@ -526,6 +526,7 @@ int TaskGroup::start_foreground(TaskGroup** pg, m->cpuwide_start_ns = start_ns; m->stat = EMPTY_STAT; m->tid = make_tid(*m->version_butex, slot); + m->priority_index = pg ? (*pg)->_cur_meta->priority_index : -1; *th = m->tid; if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) { LOG(INFO) << "Started bthread " << m->tid; @@ -595,6 +596,7 @@ int TaskGroup::start_background(bthread_t* __restrict th, m->cpuwide_start_ns = start_ns; m->stat = EMPTY_STAT; m->tid = make_tid(*m->version_butex, slot); + m->priority_index = _cur_meta->priority_index; *th = m->tid; if (using_attr.flags & BTHREAD_LOG_START_AND_FINISH) { LOG(INFO) << "Started bthread " << m->tid; @@ -674,6 +676,7 @@ void TaskGroup::ending_sched(TaskGroup** pg) { TaskGroup* g = *pg; bthread_t next_tid = 0; // Find next task to run, if none, switch to idle thread of the group. + #ifndef BTHREAD_FAIR_WSQ // When BTHREAD_FAIR_WSQ is defined, profiling shows that cpu cost of // WSQ::steal() in example/multi_threaded_echo_c++ changes from 1.9% @@ -942,7 +945,11 @@ void TaskGroup::priority_to_run(void* args_in) { tls_task_group->_control->_task_tracer.set_status( TASK_STATUS_READY, args->meta); #endif // BRPC_BTHREAD_TRACER - return tls_task_group->control()->push_priority_queue(args->tag, args->meta->tid); + if (args->meta->priority_index < 0) { + return tls_task_group->push_rq(args->meta->tid); + } + return tls_task_group->control()->push_ed_priority_queue( + args->tag, args->meta->priority_index, args->meta->tid); } struct SleepArgs { diff --git a/src/bthread/task_meta.h b/src/bthread/task_meta.h index a2490b4553..074af430df 100644 --- a/src/bthread/task_meta.h +++ b/src/bthread/task_meta.h @@ -91,6 +91,8 @@ struct TaskMeta { // simplified if they can get tid from TaskMeta. bthread_t tid{INVALID_BTHREAD}; + int priority_index{-1}; + // User function and argument void* (*fn)(void*){NULL}; void* arg{NULL}; diff --git a/test/bthread_priority_queue_unittest.cpp b/test/bthread_priority_queue_unittest.cpp new file mode 100644 index 0000000000..d6c9e43a98 --- /dev/null +++ b/test/bthread_priority_queue_unittest.cpp @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include "bthread/bthread.h" +#include "bthread/task_group.h" + +namespace { + +// Counter incremented by priority bthreads to verify execution +std::atomic g_priority_count(0); +// Mutex + set for collecting executed tids to verify no loss +std::mutex g_tid_mutex; +std::set g_executed_ids; + +void reset_globals() { + g_priority_count.store(0); + std::lock_guard lk(g_tid_mutex); + g_executed_ids.clear(); +} + +struct TaskArg { + int id; +}; + +void* priority_task_fn(void* arg) { + TaskArg* ta = static_cast(arg); + g_priority_count.fetch_add(1, std::memory_order_relaxed); + { + std::lock_guard lk(g_tid_mutex); + g_executed_ids.insert(ta->id); + } + delete ta; + return NULL; +} + +void* normal_task_fn(void* /*arg*/) { + // Just a normal task that does nothing, used as a filler + bthread_usleep(1000); + return NULL; +} + +class PriorityQueueTest : public ::testing::Test { +protected: + static void SetUpTestSuite() { + google::SetCommandLineOption("enable_bthread_priority_queue", "true"); + google::SetCommandLineOption("event_dispatcher_num", "4"); + } + void SetUp() override { + reset_globals(); + } +}; + +// Test 1: End-to-end priority task submission and execution. +// Multiple producers submit priority tasks, verify all tasks are executed. +TEST_F(PriorityQueueTest, e2e_priority_tasks_all_executed) { + const int N = 200; + + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids(N); + for (int i = 0; i < N; ++i) { + TaskArg* arg = new TaskArg{i}; + ASSERT_EQ(0, bthread_start_background(&tids[i], &attr, + priority_task_fn, arg)); + } + + for (int i = 0; i < N; ++i) { + bthread_join(tids[i], NULL); + } + + ASSERT_EQ(N, g_priority_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)N, g_executed_ids.size()); + for (int i = 0; i < N; ++i) { + ASSERT_TRUE(g_executed_ids.count(i)) << "Missing task id=" << i; + } +} + +// Test 2: Mix of priority and normal tasks, all complete correctly. +TEST_F(PriorityQueueTest, mixed_priority_and_normal_tasks) { + const int N_PRIORITY = 100; + const int N_NORMAL = 100; + + bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL; + priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector tids; + tids.reserve(N_PRIORITY + N_NORMAL); + + for (int i = 0; i < N_PRIORITY + N_NORMAL; ++i) { + bthread_t tid; + if (i % 2 == 0 && (i / 2) < N_PRIORITY) { + TaskArg* arg = new TaskArg{i / 2}; + ASSERT_EQ(0, bthread_start_background(&tid, &priority_attr, + priority_task_fn, arg)); + } else { + ASSERT_EQ(0, bthread_start_background(&tid, NULL, + normal_task_fn, NULL)); + } + tids.push_back(tid); + } + + for (auto tid : tids) { + bthread_join(tid, NULL); + } + + ASSERT_EQ(N_PRIORITY, g_priority_count.load()); +} + +// Test 3: start_foreground (bthread_start_urgent) with GLOBAL_PRIORITY. +// Simulates ED calling StartInputEvent: ED bthread calls start_urgent, +// gets preempted into PQ via priority_to_run, child runs and ends, +// ending_sched steals ED from PQ to resume. +TEST_F(PriorityQueueTest, start_foreground_priority_to_run) { + const int N = 200; + + struct EDSimArg { + int n_tasks; + }; + EDSimArg ed_arg{N}; + + auto ed_fn = [](void* arg) -> void* { + EDSimArg* ea = static_cast(arg); + bthread::TaskMeta* meta = + bthread::TaskGroup::address_meta(bthread_self()); + meta->priority_index = 0; + + for (int i = 0; i < ea->n_tasks; ++i) { + TaskArg* ta = new TaskArg{i}; + bthread_t child; + bthread_start_urgent(&child, NULL, priority_task_fn, ta); + } + return NULL; + }; + + bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL; + priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + bthread_t ed_tid; + ASSERT_EQ(0, bthread_start_background(&ed_tid, &priority_attr, + ed_fn, &ed_arg)); + bthread_join(ed_tid, NULL); + + ASSERT_EQ(N, g_priority_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)N, g_executed_ids.size()); +} + +// Test 4: Multiple ED-like bthreads concurrently calling start_urgent. +// Verifies PQ correctness under concurrent preemption from multiple EDs. +TEST_F(PriorityQueueTest, multiple_eds_concurrent_preempt) { + const int NUM_EDS = 4; + const int TASKS_PER_ED = 50; + const int TOTAL = NUM_EDS * TASKS_PER_ED; + std::atomic resume_count(0); + + struct EDArg { + int ed_index; + int n_children; + std::atomic* resume_count; + }; + + auto ed_fn = [](void* arg) -> void* { + EDArg* ea = static_cast(arg); + bthread::TaskMeta* meta = + bthread::TaskGroup::address_meta(bthread_self()); + meta->priority_index = ea->ed_index; + + std::vector children; + children.reserve(ea->n_children); + for (int i = 0; i < ea->n_children; ++i) { + int id = ea->ed_index * ea->n_children + i; + TaskArg* ta = new TaskArg{id}; + bthread_t child; + bthread_start_urgent(&child, NULL, priority_task_fn, ta); + children.push_back(child); + ea->resume_count->fetch_add(1, std::memory_order_relaxed); + } + for (auto c : children) { + bthread_join(c, NULL); + } + return NULL; + }; + + bthread_attr_t priority_attr = BTHREAD_ATTR_NORMAL; + priority_attr.flags |= BTHREAD_GLOBAL_PRIORITY; + + std::vector ed_args(NUM_EDS); + std::vector ed_tids(NUM_EDS); + for (int i = 0; i < NUM_EDS; ++i) { + ed_args[i] = {i, TASKS_PER_ED, &resume_count}; + ASSERT_EQ(0, bthread_start_background(&ed_tids[i], &priority_attr, + ed_fn, &ed_args[i])); + } + + for (int i = 0; i < NUM_EDS; ++i) { + bthread_join(ed_tids[i], NULL); + } + + ASSERT_EQ(TOTAL, g_priority_count.load()); + ASSERT_EQ(TOTAL, resume_count.load()); + std::lock_guard lk(g_tid_mutex); + ASSERT_EQ((size_t)TOTAL, g_executed_ids.size()); +} + +} // namespace