Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 106 additions & 35 deletions src/bthread/task_control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
DEFINE_bool(task_group_set_worker_name, true,
"Whether to set the name of the worker thread");
DEFINE_string(cpu_set, "",
"Set of CPUs to which cores are bound. "
"for example, 0-3,5,7; default: disable");
"Set of CPUs to which worker threads are bound. "
"Two formats are supported:\n"
" Legacy (bind all tags to one set): \"0-3,5,7\"\n"
" Per-tag: \"0:0-3,5,7;1:6-9,4\" "
"where the number before ':' is the bthread_tag and the part "
"after ':' is a CPU list in the same format as the legacy value. "
"Tags not mentioned get no CPU binding. Default: disable.");

namespace bthread {

Expand Down Expand Up @@ -82,7 +87,7 @@ void run_tagged_worker_startfn(bthread_tag_t tag) {

struct WorkerThreadArgs {
WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {}
TaskControl* c;
TaskControl* c;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate space

bthread_tag_t tag;
};

Expand All @@ -106,15 +111,17 @@ void* TaskControl::worker_thread(void* arg) {
}

g->_tid = pthread_self();

int worker_id = c->_next_worker_id.fetch_add(
1, butil::memory_order_relaxed);
if (!c->_cpus.empty()) {
bind_thread_to_cpu(pthread_self(), c->_cpus[worker_id % c->_cpus.size()]);
// tag_wid is a per-tag monotonic counter: same-tag workers get 0,1,2,...
// Used both for CPU round-robin affinity and the thread name suffix.
int tag_wid = c->_tag_next_worker_id[tag].fetch_add(
1, butil::memory_order_relaxed);
Comment on lines +114 to +117
if (!c->_tag_cpus[tag].empty()) {
const auto& cpus = c->_tag_cpus[tag];
bind_thread_to_cpu(pthread_self(), cpus[tag_wid % cpus.size()]);
}
if (FLAGS_task_group_set_worker_name) {
std::string worker_thread_name = butil::string_printf(
"brpc_wkr:%d-%d", g->tag(), worker_id);
"brpc_wkr:%d-%d", g->tag(), tag_wid);
butil::PlatformThread::SetNameSimple(worker_thread_name.c_str());
}
BT_VLOG << "Created worker=" << pthread_self() << " tid=" << g->_tid
Expand Down Expand Up @@ -191,7 +198,6 @@ TaskControl::TaskControl()
, _init(false)
, _stop(false)
, _concurrency(0)
, _next_worker_id(0)
, _nworkers("bthread_worker_count")
, _pending_time(NULL)
// Delay exposure of following two vars because they rely on TC which
Expand All @@ -208,6 +214,8 @@ TaskControl::TaskControl()
, _priority_queues(FLAGS_task_group_ntags)
, _pl_num_of_each_tag(FLAGS_bthread_parking_lot_of_each_tag)
, _tagged_pl(FLAGS_task_group_ntags)
, _tag_cpus(FLAGS_task_group_ntags)
, _tag_next_worker_id(FLAGS_task_group_ntags)
{}

int TaskControl::init(int concurrency) {
Expand All @@ -222,8 +230,8 @@ int TaskControl::init(int concurrency) {
_concurrency = concurrency;

if (!FLAGS_cpu_set.empty()) {
if (parse_cpuset(FLAGS_cpu_set, _cpus) == -1) {
LOG(ERROR) << "invalid cpuset=" << FLAGS_cpu_set;
if (parse_cpuset(FLAGS_cpu_set) == -1) {
LOG(ERROR) << "invalid cpu_set=" << FLAGS_cpu_set;
return -1;
}
}
Expand Down Expand Up @@ -257,7 +265,7 @@ int TaskControl::init(int concurrency) {
}
#endif // BRPC_BTHREAD_TRACER

_workers.resize(_concurrency);
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags);
const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg);
Expand Down Expand Up @@ -328,40 +336,103 @@ TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
return NULL;
}

int TaskControl::parse_cpuset(std::string value, std::vector<unsigned>& cpus) {
// Parse a single cpu-range-list such as "0-3,5,7" into a sorted, deduplicated
// vector of CPU IDs. Returns 0 on success, -1 on error.
static int parse_one_cpuset(const std::string& value, std::vector<unsigned>& cpus) {
static std::regex r("(\\d+-)?(\\d+)(,(\\d+-)?(\\d+))*");
std::smatch match;
std::set<unsigned> cpuset;
if (value.empty()) {
return -1;
}
if (std::regex_match(value, match, r)) {
for (butil::StringSplitter split(value.data(), ','); split; ++split) {
butil::StringPiece cpu_ids(split.field(), split.length());
cpu_ids.trim_spaces();
butil::StringPiece begin = cpu_ids;
butil::StringPiece end = cpu_ids;
auto dash = cpu_ids.find('-');
if (dash != cpu_ids.npos) {
begin = cpu_ids.substr(0, dash);
end = cpu_ids.substr(dash + 1);
if (!std::regex_match(value, match, r)) {
return -1;
}
for (butil::StringSplitter split(value.data(), ','); split; ++split) {
butil::StringPiece cpu_ids(split.field(), split.length());
cpu_ids.trim_spaces();
butil::StringPiece begin = cpu_ids;
butil::StringPiece end = cpu_ids;
auto dash = cpu_ids.find('-');
if (dash != cpu_ids.npos) {
begin = cpu_ids.substr(0, dash);
end = cpu_ids.substr(dash + 1);
}
unsigned first = UINT_MAX;
unsigned last = 0;
int ret = butil::StringSplitter(begin, '\t').to_uint(&first);
ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
if (ret != 0 || first > last) {
return -1;
}
for (auto i = first; i <= last; ++i) {
cpuset.insert(i);
}
Comment on lines +368 to +370
}
cpus.assign(cpuset.begin(), cpuset.end());
return 0;
}

int TaskControl::parse_cpuset(const std::string& value) {
if (value.empty()) {
return -1;
}
const int ntags = static_cast<int>(_tag_cpus.size());
// Detect per-tag format by the presence of ':' or ';'.
// Legacy format ("0-3,5,7") never contains these characters.
bool per_tag_format = (value.find(';') != std::string::npos ||
value.find(':') != std::string::npos);

if (per_tag_format) {
// Per-tag format: "0:0-3,5,7;1:6-9,4"
for (butil::StringSplitter seg_split(value.data(), ';'); seg_split; ++seg_split) {
std::string segment(seg_split.field(), seg_split.length());
// Trim leading/trailing spaces.
auto s = segment.find_first_not_of(' ');
auto e = segment.find_last_not_of(' ');
if (s == std::string::npos) { continue; } // blank segment
segment = segment.substr(s, e - s + 1);

auto colon = segment.find(':');
if (colon == std::string::npos) {
LOG(ERROR) << "cpu_set per-tag segment missing ':': " << segment;
return -1;
}
unsigned first = UINT_MAX;
unsigned last = 0;
int ret;
ret = butil::StringSplitter(begin, '\t').to_uint(&first);
ret = ret | butil::StringSplitter(end, '\t').to_uint(&last);
if (ret != 0 || first > last) {
std::string tag_str = segment.substr(0, colon);
std::string cpus_str = segment.substr(colon + 1);

unsigned tag_id = 0;
butil::StringPiece tag_sp(tag_str);
if (butil::StringSplitter(tag_sp, '\t').to_uint(&tag_id) != 0) {
LOG(ERROR) << "cpu_set invalid tag '" << tag_str << "'";
return -1;
Comment on lines +401 to +408
}
for (auto i = first; i <= last; ++i) {
cpuset.insert(i);
if ((int)tag_id >= ntags) {
LOG(ERROR) << "cpu_set tag " << tag_id
<< " >= task_group_ntags " << ntags;
return -1;
}

std::vector<unsigned> cpus;
if (parse_one_cpuset(cpus_str, cpus) != 0) {
LOG(ERROR) << "cpu_set invalid cpuset for tag " << tag_id
<< ": " << cpus_str;
return -1;
}
_tag_cpus[tag_id] = std::move(cpus);
}
} else {
// Legacy format: one cpu-set shared by all tags.
std::vector<unsigned> cpus;
if (parse_one_cpuset(value, cpus) != 0) {
LOG(ERROR) << "cpu_set invalid cpuset: " << value;
return -1;
}
for (int i = 0; i < ntags; ++i) {
_tag_cpus[i] = cpus;
}
cpus.assign(cpuset.begin(), cpuset.end());
return 0;
}
return -1;
return 0;
}

void TaskControl::bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id) {
Expand Down
16 changes: 12 additions & 4 deletions src/bthread/task_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ friend bthread_t init_for_pthread_stack_trace();
// If this method is called after init(), it never returns NULL.
TaskGroup* choose_one_group(bthread_tag_t tag);

static int parse_cpuset(std::string value, std::vector<unsigned>& cpus);
// Parse FLAGS_cpu_set into _tag_cpus. Two formats are accepted:
// Legacy (all tags share one set): "0-3,5,7"
// Per-tag: "0:0-3,5,7;1:6-9,4"
// Tags not mentioned get an empty cpu list (= no binding).
// Returns -1 on parse error.
int parse_cpuset(const std::string& value);

static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id);

Expand Down Expand Up @@ -143,9 +148,6 @@ friend bthread_t init_for_pthread_stack_trace();
bool _stop;
butil::atomic<int> _concurrency;
std::vector<pthread_t> _workers;
std::vector<unsigned> _cpus;
butil::atomic<int> _next_worker_id;

bvar::Adder<int64_t> _nworkers;
butil::Mutex _pending_time_mutex;
butil::atomic<bvar::LatencyRecorder*> _pending_time;
Expand All @@ -168,6 +170,12 @@ friend bthread_t init_for_pthread_stack_trace();

size_t _pl_num_of_each_tag;
std::vector<TaggedParkingLot> _tagged_pl;
// Per-tag CPU binding lists. _tag_cpus[tag] is the round-robin list of
// CPU IDs to which workers of that tag are bound. Empty means no binding.
std::vector<std::vector<unsigned>> _tag_cpus;
// Per-tag monotonic counter for round-robin CPU assignment.
// Incremented once per worker created for that tag (in worker_thread).
std::vector<butil::atomic<int>> _tag_next_worker_id;

#ifdef BRPC_BTHREAD_TRACER
TaskTracer _task_tracer;
Expand Down
Loading