Skip to content

Commit 43c2e84

Browse files
committed
Switch blas_server to use acq/rel semantics
Heavy-weight locking isn't required to pass the work queue pointer between threads and simple atomic acquire/release semantics can be used instead. This is especially important as pthread_mutex_lock() isn't fair. We've observed substantial variation in runtime because of the the unfairness of these locks which complety goes away with this implementation. The locks themselves are left to provide a portable way for idling threads to sleep/wakeup after many unsuccessful iterations waiting.
1 parent 430ee31 commit 43c2e84

1 file changed

Lines changed: 41 additions & 58 deletions

File tree

driver/others/blas_server.c

Lines changed: 41 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,16 @@ typedef struct {
140140

141141
} thread_status_t;
142142

143+
#if (__STDC_VERSION__ >= 201112L)
144+
#define atomic_load_queue(p) __atomic_load_n(p, __ATOMIC_RELAXED)
145+
#define atomic_store_queue(p, v) __atomic_store_n(p, v, __ATOMIC_RELAXED)
146+
#else
147+
#define atomic_load_queue(p) (blas_queue_t*)(*(volatile blas_queue_t**)(p))
148+
#define atomic_store_queue(p, v) (*(volatile blas_queue_t* volatile*)(p) = (v))
149+
#endif
150+
151+
152+
143153
static thread_status_t thread_status[MAX_CPU_NUMBER] __attribute__((aligned(ATTRIBUTE_SIZE)));
144154

145155
#ifndef THREAD_TIMEOUT
@@ -312,40 +322,38 @@ blas_queue_t *tscq;
312322

313323
last_tick = (unsigned int)rpcc();
314324

315-
pthread_mutex_lock (&thread_status[cpu].lock);
316-
tscq=thread_status[cpu].queue;
317-
pthread_mutex_unlock (&thread_status[cpu].lock);
325+
tscq = atomic_load_queue(&thread_status[cpu].queue);
318326

319327
while(!tscq) {
320328
YIELDING;
321329

322330
if ((unsigned int)rpcc() - last_tick > thread_timeout) {
323331

324-
pthread_mutex_lock (&thread_status[cpu].lock);
325332

326-
if (!thread_status[cpu].queue) {
333+
if (!atomic_load_queue(&thread_status[cpu].queue)) {
334+
pthread_mutex_lock (&thread_status[cpu].lock);
327335
thread_status[cpu].status = THREAD_STATUS_SLEEP;
328-
while (thread_status[cpu].status == THREAD_STATUS_SLEEP) {
336+
while (thread_status[cpu].status == THREAD_STATUS_SLEEP &&
337+
!atomic_load_queue(&thread_status[cpu].queue)) {
329338

330339
#ifdef MONITOR
331340
main_status[cpu] = MAIN_SLEEPING;
332341
#endif
333342

334343
pthread_cond_wait(&thread_status[cpu].wakeup, &thread_status[cpu].lock);
335344
}
345+
pthread_mutex_unlock(&thread_status[cpu].lock);
336346
}
337347

338-
pthread_mutex_unlock(&thread_status[cpu].lock);
339-
340348
last_tick = (unsigned int)rpcc();
341349
}
342-
pthread_mutex_lock (&thread_status[cpu].lock);
343-
tscq=thread_status[cpu].queue;
344-
pthread_mutex_unlock (&thread_status[cpu].lock);
350+
351+
tscq = atomic_load_queue(&thread_status[cpu].queue);
345352

346353
}
347354

348-
queue = thread_status[cpu].queue;
355+
queue = atomic_load_queue(&thread_status[cpu].queue);
356+
MB;
349357

350358
if ((long)queue == -1) break;
351359

@@ -360,9 +368,7 @@ blas_queue_t *tscq;
360368
if (queue) {
361369
int (*routine)(blas_arg_t *, void *, void *, void *, void *, BLASLONG) = queue -> routine;
362370

363-
pthread_mutex_lock (&thread_status[cpu].lock);
364-
thread_status[cpu].queue = (blas_queue_t *)1;
365-
pthread_mutex_unlock (&thread_status[cpu].lock);
371+
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)1);
366372

367373
sa = queue -> sa;
368374
sb = queue -> sb;
@@ -442,13 +448,9 @@ blas_queue_t *tscq;
442448

443449
// arm: make sure all results are written out _before_
444450
// thread is marked as done and other threads use them
445-
WMB;
451+
MB;
452+
atomic_store_queue(&thread_status[cpu].queue, (blas_queue_t *)0);
446453

447-
pthread_mutex_lock (&thread_status[cpu].lock);
448-
thread_status[cpu].queue = (blas_queue_t * volatile) ((long)thread_status[cpu].queue & 0); /* Need a trick */
449-
pthread_mutex_unlock (&thread_status[cpu].lock);
450-
451-
WMB;
452454

453455
}
454456

@@ -566,12 +568,9 @@ int blas_thread_init(void){
566568

567569
for(i = 0; i < blas_num_threads - 1; i++){
568570

569-
thread_status[i].queue = (blas_queue_t *)NULL;
571+
atomic_store_queue(&thread_status[i].queue, (blas_queue_t *)0);
570572
thread_status[i].status = THREAD_STATUS_WAKEUP;
571573

572-
pthread_mutex_init(&thread_status[i].lock, NULL);
573-
pthread_cond_init (&thread_status[i].wakeup, NULL);
574-
575574
#ifdef NEED_STACKATTR
576575
ret=pthread_create(&blas_threads[i], &attr,
577576
&blas_thread_server, (void *)i);
@@ -655,7 +654,8 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
655654
if (queue -> mode & BLAS_NODE) {
656655

657656
do {
658-
while((thread_status[i].node != node || thread_status[i].queue) && (i < blas_num_threads - 1)) i ++;
657+
658+
while((thread_status[i].node != node || atomic_load_queue(&thread_status[i].queue)) && (i < blas_num_threads - 1)) i ++;
659659

660660
if (i < blas_num_threads - 1) break;
661661

@@ -669,36 +669,26 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
669669
} while (1);
670670

671671
} else {
672-
pthread_mutex_lock (&thread_status[i].lock);
673-
tsiq = thread_status[i].queue;
674-
pthread_mutex_unlock (&thread_status[i].lock);
672+
tsiq = atomic_load_queue(&thread_status[i].queue);
675673
while(tsiq) {
676674
i ++;
677675
if (i >= blas_num_threads - 1) i = 0;
678-
pthread_mutex_lock (&thread_status[i].lock);
679-
tsiq = thread_status[i].queue;
680-
pthread_mutex_unlock (&thread_status[i].lock);
676+
tsiq = atomic_load_queue(&thread_status[i].queue);
681677
}
682678
}
683679
#else
684-
pthread_mutex_lock (&thread_status[i].lock);
685-
tsiq=thread_status[i].queue ;
686-
pthread_mutex_unlock (&thread_status[i].lock);
680+
tsiq = atomic_load_queue(&thread_status[i].queue);
687681
while(tsiq) {
688682
i ++;
689683
if (i >= blas_num_threads - 1) i = 0;
690-
pthread_mutex_lock (&thread_status[i].lock);
691-
tsiq=thread_status[i].queue ;
692-
pthread_mutex_unlock (&thread_status[i].lock);
684+
tsiq = atomic_load_queue(&thread_status[i].queue);
693685
}
694686
#endif
695687

696688
queue -> assigned = i;
697-
WMB;
698-
pthread_mutex_lock (&thread_status[i].lock);
699-
thread_status[i].queue = queue;
700-
pthread_mutex_unlock (&thread_status[i].lock);
701-
WMB;
689+
MB;
690+
691+
atomic_store_queue(&thread_status[i].queue, queue);
702692

703693
queue = queue -> next;
704694
pos ++;
@@ -718,9 +708,7 @@ int exec_blas_async(BLASLONG pos, blas_queue_t *queue){
718708

719709
pos = current -> assigned;
720710

721-
pthread_mutex_lock (&thread_status[pos].lock);
722-
tspq=thread_status[pos].queue;
723-
pthread_mutex_unlock (&thread_status[pos].lock);
711+
tspq = atomic_load_queue(&thread_status[pos].queue);
724712

725713
if ((BLASULONG)tspq > 1) {
726714
pthread_mutex_lock (&thread_status[pos].lock);
@@ -752,24 +740,20 @@ int exec_blas_async_wait(BLASLONG num, blas_queue_t *queue){
752740

753741
while ((num > 0) && queue) {
754742

755-
pthread_mutex_lock(&thread_status[queue->assigned].lock);
756-
tsqq=thread_status[queue -> assigned].queue;
757-
pthread_mutex_unlock(&thread_status[queue->assigned].lock);
743+
tsqq = atomic_load_queue(&thread_status[queue->assigned].queue);
758744

759745

760746
while(tsqq) {
761747
YIELDING;
762-
pthread_mutex_lock(&thread_status[queue->assigned].lock);
763-
tsqq=thread_status[queue -> assigned].queue;
764-
pthread_mutex_unlock(&thread_status[queue->assigned].lock);
765-
766-
748+
tsqq = atomic_load_queue(&thread_status[queue->assigned].queue);
767749
};
768750

769751
queue = queue -> next;
770752
num --;
771753
}
772754

755+
MB;
756+
773757
#ifdef SMP_DEBUG
774758
fprintf(STDERR, "Done.\n\n");
775759
#endif
@@ -880,7 +864,7 @@ void goto_set_num_threads(int num_threads) {
880864

881865
for(i = blas_num_threads - 1; i < num_threads - 1; i++){
882866

883-
thread_status[i].queue = (blas_queue_t *)NULL;
867+
atomic_store_queue(&thread_status[i].queue, (blas_queue_t *)0);
884868
thread_status[i].status = THREAD_STATUS_WAKEUP;
885869

886870
pthread_mutex_init(&thread_status[i].lock, NULL);
@@ -971,12 +955,11 @@ int BLASFUNC(blas_thread_shutdown)(void){
971955

972956
for (i = 0; i < blas_num_threads - 1; i++) {
973957

974-
pthread_mutex_lock (&thread_status[i].lock);
975958

976-
thread_status[i].queue = (blas_queue_t *)-1;
959+
pthread_mutex_lock (&thread_status[i].lock);
977960

961+
atomic_store_queue(&thread_status[i].queue, (blas_queue_t *)-1);
978962
thread_status[i].status = THREAD_STATUS_WAKEUP;
979-
980963
pthread_cond_signal (&thread_status[i].wakeup);
981964

982965
pthread_mutex_unlock(&thread_status[i].lock);

0 commit comments

Comments
 (0)