diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp index ba067e3976..56773b8b4c 100644 --- a/src/bthread/task_control.cpp +++ b/src/bthread/task_control.cpp @@ -48,8 +48,13 @@ DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags"); DEFINE_bool(task_group_set_worker_name, true, "Whether to set the name of the worker thread"); DEFINE_string(cpu_set, "", - "Set of CPUs to which cores are bound. " - "for example, 0-3,5,7; default: disable"); + "Set of CPUs to which worker threads are bound. " + "Two formats are supported:\n" + " Legacy (bind all tags to one set): \"0-3,5,7\"\n" + " Per-tag: \"0:0-3,5,7;1:6-9,4\" " + "where the number before ':' is the bthread_tag and the part " + "after ':' is a CPU list in the same format as the legacy value. " + "Tags not mentioned get no CPU binding. Default: disable."); namespace bthread { @@ -82,7 +87,7 @@ void run_tagged_worker_startfn(bthread_tag_t tag) { struct WorkerThreadArgs { WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {} - TaskControl* c; + TaskControl* c; bthread_tag_t tag; }; @@ -106,15 +111,17 @@ void* TaskControl::worker_thread(void* arg) { } g->_tid = pthread_self(); - - int worker_id = c->_next_worker_id.fetch_add( - 1, butil::memory_order_relaxed); - if (!c->_cpus.empty()) { - bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]); + // tag_wid is a per-tag monotonic counter: same-tag workers get 0,1,2,... + // Used both for CPU round-robin affinity and the thread name suffix. + int tag_wid = c->_tag_next_worker_id[tag].fetch_add( + 1, butil::memory_order_relaxed); + if (!c->_tag_cpus[tag].empty()) { + const auto& cpus = c->_tag_cpus[tag]; + bind_thread_to_cpu(pthread_self(), cpus[tag_wid % cpus.size()]); } if (FLAGS_task_group_set_worker_name) { std::string worker_thread_name = butil::string_printf( - "brpc_wkr:%d-%d", g->tag(), worker_id); + "brpc_wkr:%d-%d", g->tag(), tag_wid); butil::PlatformThread::SetNameSimple(worker_thread_name.c_str()); } BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid @@ -191,7 +198,6 @@ TaskControl::TaskControl() , _init(false) , _stop(false) , _concurrency(0) - , _next_worker_id(0) , _nworkers("bthread_worker_count") , _pending_time(NULL) // Delay exposure of following two vars because they rely on TC which @@ -208,6 +214,8 @@ TaskControl::TaskControl() , _priority_queues(FLAGS_task_group_ntags) , _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag) , _tagged_pl(FLAGS_task_group_ntags) + , _tag_cpus(FLAGS_task_group_ntags) + , _tag_next_worker_id(FLAGS_task_group_ntags) {} int TaskControl::init(int concurrency) { @@ -222,8 +230,8 @@ int TaskControl::init(int concurrency) { _concurrency = concurrency; if (!FLAGS_cpu_set.empty()) { - if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) { - LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set; + if (parse_cpuset(FLAGS_cpu_set) == -1) { + LOG(ERROR) << "invalid cpu_set=" << FLAGS_cpu_set; return -1; } } @@ -257,7 +265,7 @@ int TaskControl::init(int concurrency) { } #endif // BRPC_BTHREAD_TRACER - _workers.resize(_concurrency); + _workers.resize(_concurrency); for (int i = 0; i < _concurrency; ++i) { auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags); const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg); @@ -328,40 +336,103 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) { return NULL; } -int TaskControl::parse_cpuset(std::string value, std::vector& cpus) { +// Parse a single cpu-range-list such as "0-3,5,7" into a sorted, deduplicated +// vector of CPU IDs. Returns 0 on success, -1 on error. +static int parse_one_cpuset(const std::string& value, std::vector& cpus) { static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*"); std::smatch match; std::set cpuset; if (value.empty()) { return -1; } - if (std::regex_match(value, match, r)) { - for (butil::StringSplitter split(value.data(), ','); split; ++split) { - butil::StringPiece cpu_ids(split.field(), split.length()); - cpu_ids.trim_spaces(); - butil::StringPiece begin = cpu_ids; - butil::StringPiece end = cpu_ids; - auto dash = cpu_ids.find('-'); - if (dash != cpu_ids.npos) { - begin = cpu_ids.substr(0, dash); - end = cpu_ids.substr(dash + 1); + if (!std::regex_match(value, match, r)) { + return -1; + } + for (butil::StringSplitter split(value.data(), ','); split; ++split) { + butil::StringPiece cpu_ids(split.field(), split.length()); + cpu_ids.trim_spaces(); + butil::StringPiece begin = cpu_ids; + butil::StringPiece end = cpu_ids; + auto dash = cpu_ids.find('-'); + if (dash != cpu_ids.npos) { + begin = cpu_ids.substr(0, dash); + end = cpu_ids.substr(dash + 1); + } + unsigned first = UINT_MAX; + unsigned last = 0; + int ret = butil::StringSplitter(begin, '\t').to_uint(&first); + ret = ret | butil::StringSplitter(end, '\t').to_uint(&last); + if (ret != 0 || first > last) { + return -1; + } + for (auto i = first; i <= last; ++i) { + cpuset.insert(i); + } + } + cpus.assign(cpuset.begin(), cpuset.end()); + return 0; +} + +int TaskControl::parse_cpuset(const std::string& value) { + if (value.empty()) { + return -1; + } + const int ntags = static_cast(_tag_cpus.size()); + // Detect per-tag format by the presence of ':' or ';'. + // Legacy format ("0-3,5,7") never contains these characters. + bool per_tag_format = (value.find(';') != std::string::npos || + value.find(':') != std::string::npos); + + if (per_tag_format) { + // Per-tag format: "0:0-3,5,7;1:6-9,4" + for (butil::StringSplitter seg_split(value.data(), ';'); seg_split; ++seg_split) { + std::string segment(seg_split.field(), seg_split.length()); + // Trim leading/trailing spaces. + auto s = segment.find_first_not_of(' '); + auto e = segment.find_last_not_of(' '); + if (s == std::string::npos) { continue; } // blank segment + segment = segment.substr(s, e - s + 1); + + auto colon = segment.find(':'); + if (colon == std::string::npos) { + LOG(ERROR) << "cpu_set per-tag segment missing ':': " << segment; + return -1; } - unsigned first = UINT_MAX; - unsigned last = 0; - int ret; - ret = butil::StringSplitter(begin, '\t').to_uint(&first); - ret = ret | butil::StringSplitter(end, '\t').to_uint(&last); - if (ret != 0 || first > last) { + std::string tag_str = segment.substr(0, colon); + std::string cpus_str = segment.substr(colon + 1); + + unsigned tag_id = 0; + butil::StringPiece tag_sp(tag_str); + if (butil::StringSplitter(tag_sp, '\t').to_uint(&tag_id) != 0) { + LOG(ERROR) << "cpu_set invalid tag '" << tag_str << "'"; return -1; } - for (auto i = first; i <= last; ++i) { - cpuset.insert(i); + if ((int)tag_id >= ntags) { + LOG(ERROR) << "cpu_set tag " << tag_id + << " >= task_group_ntags " << ntags; + return -1; } + + std::vector cpus; + if (parse_one_cpuset(cpus_str, cpus) != 0) { + LOG(ERROR) << "cpu_set invalid cpuset for tag " << tag_id + << ": " << cpus_str; + return -1; + } + _tag_cpus[tag_id] = std::move(cpus); + } + } else { + // Legacy format: one cpu-set shared by all tags. + std::vector cpus; + if (parse_one_cpuset(value, cpus) != 0) { + LOG(ERROR) << "cpu_set invalid cpuset: " << value; + return -1; + } + for (int i = 0; i < ntags; ++i) { + _tag_cpus[i] = cpus; } - cpus.assign(cpuset.begin(), cpuset.end()); - return 0; } - return -1; + return 0; } void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) { diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h index 4480daa677..3857eb8f7a 100644 --- a/src/bthread/task_control.h +++ b/src/bthread/task_control.h @@ -91,7 +91,12 @@ friend bthread_t init_for_pthread_stack_trace(); // If this method is called after init(), it never returns NULL. TaskGroup* choose_one_group(bthread_tag_t tag); - static int parse_cpuset(std::string value, std::vector& cpus); + // Parse FLAGS_cpu_set into _tag_cpus. Two formats are accepted: + // Legacy (all tags share one set): "0-3,5,7" + // Per-tag: "0:0-3,5,7;1:6-9,4" + // Tags not mentioned get an empty cpu list (= no binding). + // Returns -1 on parse error. + int parse_cpuset(const std::string& value); static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id); @@ -143,9 +148,6 @@ friend bthread_t init_for_pthread_stack_trace(); bool _stop; butil::atomic _concurrency; std::vector _workers; - std::vector _cpus; - butil::atomic _next_worker_id; - bvar::Adder _nworkers; butil::Mutex _pending_time_mutex; butil::atomic _pending_time; @@ -168,6 +170,12 @@ friend bthread_t init_for_pthread_stack_trace(); size_t _pl_num_of_each_tag; std::vector _tagged_pl; + // Per-tag CPU binding lists. _tag_cpus[tag] is the round-robin list of + // CPU IDs to which workers of that tag are bound. Empty means no binding. + std::vector> _tag_cpus; + // Per-tag monotonic counter for round-robin CPU assignment. + // Incremented once per worker created for that tag (in worker_thread). + std::vector> _tag_next_worker_id; #ifdef BRPC_BTHREAD_TRACER TaskTracer _task_tracer;