@@ -233,16 +233,9 @@ void vhost_poll_stop(struct vhost_poll *poll)
233233}
234234EXPORT_SYMBOL_GPL (vhost_poll_stop );
235235
236- static bool vhost_worker_queue (struct vhost_worker * worker ,
236+ static void vhost_worker_queue (struct vhost_worker * worker ,
237237 struct vhost_work * work )
238238{
239- if (!worker )
240- return false;
241- /*
242- * vsock can queue while we do a VHOST_SET_OWNER, so we have a smp_wmb
243- * when setting up the worker. We don't have a smp_rmb here because
244- * test_and_set_bit gives us a mb already.
245- */
246239 if (!test_and_set_bit (VHOST_WORK_QUEUED , & work -> flags )) {
247240 /* We can only add the work to the list after we're
248241 * sure it was not in the list.
@@ -251,47 +244,85 @@ static bool vhost_worker_queue(struct vhost_worker *worker,
251244 llist_add (& work -> node , & worker -> work_list );
252245 vhost_task_wake (worker -> vtsk );
253246 }
254-
255- return true;
256247}
257248
258249bool vhost_vq_work_queue (struct vhost_virtqueue * vq , struct vhost_work * work )
259250{
260- return vhost_worker_queue (vq -> worker , work );
251+ struct vhost_worker * worker ;
252+ bool queued = false;
253+
254+ rcu_read_lock ();
255+ worker = rcu_dereference (vq -> worker );
256+ if (worker ) {
257+ queued = true;
258+ vhost_worker_queue (worker , work );
259+ }
260+ rcu_read_unlock ();
261+
262+ return queued ;
261263}
262264EXPORT_SYMBOL_GPL (vhost_vq_work_queue );
263265
264- static void vhost_worker_flush (struct vhost_worker * worker )
266+ void vhost_vq_flush (struct vhost_virtqueue * vq )
265267{
266268 struct vhost_flush_struct flush ;
267269
268270 init_completion (& flush .wait_event );
269271 vhost_work_init (& flush .work , vhost_flush_work );
270272
271- if (vhost_worker_queue ( worker , & flush .work ))
273+ if (vhost_vq_work_queue ( vq , & flush .work ))
272274 wait_for_completion (& flush .wait_event );
273275}
276+ EXPORT_SYMBOL_GPL (vhost_vq_flush );
274277
275- void vhost_vq_flush (struct vhost_virtqueue * vq )
278+ /**
279+ * vhost_worker_flush - flush a worker
280+ * @worker: worker to flush
281+ *
282+ * This does not use RCU to protect the worker, so the device or worker
283+ * mutex must be held.
284+ */
285+ static void vhost_worker_flush (struct vhost_worker * worker )
276286{
277- vhost_worker_flush (vq -> worker );
287+ struct vhost_flush_struct flush ;
288+
289+ init_completion (& flush .wait_event );
290+ vhost_work_init (& flush .work , vhost_flush_work );
291+
292+ vhost_worker_queue (worker , & flush .work );
293+ wait_for_completion (& flush .wait_event );
278294}
279- EXPORT_SYMBOL_GPL (vhost_vq_flush );
280295
281296void vhost_dev_flush (struct vhost_dev * dev )
282297{
283298 struct vhost_worker * worker ;
284299 unsigned long i ;
285300
286- xa_for_each (& dev -> worker_xa , i , worker )
301+ xa_for_each (& dev -> worker_xa , i , worker ) {
302+ mutex_lock (& worker -> mutex );
303+ if (!worker -> attachment_cnt ) {
304+ mutex_unlock (& worker -> mutex );
305+ continue ;
306+ }
287307 vhost_worker_flush (worker );
308+ mutex_unlock (& worker -> mutex );
309+ }
288310}
289311EXPORT_SYMBOL_GPL (vhost_dev_flush );
290312
291313/* A lockless hint for busy polling code to exit the loop */
292314bool vhost_vq_has_work (struct vhost_virtqueue * vq )
293315{
294- return !llist_empty (& vq -> worker -> work_list );
316+ struct vhost_worker * worker ;
317+ bool has_work = false;
318+
319+ rcu_read_lock ();
320+ worker = rcu_dereference (vq -> worker );
321+ if (worker && !llist_empty (& worker -> work_list ))
322+ has_work = true;
323+ rcu_read_unlock ();
324+
325+ return has_work ;
295326}
296327EXPORT_SYMBOL_GPL (vhost_vq_has_work );
297328
@@ -356,7 +387,7 @@ static void vhost_vq_reset(struct vhost_dev *dev,
356387 vq -> busyloop_timeout = 0 ;
357388 vq -> umem = NULL ;
358389 vq -> iotlb = NULL ;
359- vq -> worker = NULL ;
390+ rcu_assign_pointer ( vq -> worker , NULL ) ;
360391 vhost_vring_call_reset (& vq -> call_ctx );
361392 __vhost_vq_meta_reset (vq );
362393}
@@ -578,7 +609,7 @@ static void vhost_workers_free(struct vhost_dev *dev)
578609 return ;
579610
580611 for (i = 0 ; i < dev -> nvqs ; i ++ )
581- dev -> vqs [i ]-> worker = NULL ;
612+ rcu_assign_pointer ( dev -> vqs [i ]-> worker , NULL ) ;
582613 /*
583614 * Free the default worker we created and cleanup workers userspace
584615 * created but couldn't clean up (it forgot or crashed).
@@ -606,6 +637,7 @@ static struct vhost_worker *vhost_worker_create(struct vhost_dev *dev)
606637 if (!vtsk )
607638 goto free_worker ;
608639
640+ mutex_init (& worker -> mutex );
609641 init_llist_head (& worker -> work_list );
610642 worker -> kcov_handle = kcov_common_handle ();
611643 worker -> vtsk = vtsk ;
@@ -630,13 +662,54 @@ static struct vhost_worker *vhost_worker_create(struct vhost_dev *dev)
630662static void __vhost_vq_attach_worker (struct vhost_virtqueue * vq ,
631663 struct vhost_worker * worker )
632664{
633- if (vq -> worker )
634- vq -> worker -> attachment_cnt -- ;
665+ struct vhost_worker * old_worker ;
666+
667+ old_worker = rcu_dereference_check (vq -> worker ,
668+ lockdep_is_held (& vq -> dev -> mutex ));
669+
670+ mutex_lock (& worker -> mutex );
635671 worker -> attachment_cnt ++ ;
636- vq -> worker = worker ;
672+ mutex_unlock (& worker -> mutex );
673+ rcu_assign_pointer (vq -> worker , worker );
674+
675+ if (!old_worker )
676+ return ;
677+ /*
678+ * Take the worker mutex to make sure we see the work queued from
679+ * device wide flushes which doesn't use RCU for execution.
680+ */
681+ mutex_lock (& old_worker -> mutex );
682+ old_worker -> attachment_cnt -- ;
683+ /*
684+ * We don't want to call synchronize_rcu for every vq during setup
685+ * because it will slow down VM startup. If we haven't done
686+ * VHOST_SET_VRING_KICK and not done the driver specific
687+ * SET_ENDPOINT/RUNNUNG then we can skip the sync since there will
688+ * not be any works queued for scsi and net.
689+ */
690+ mutex_lock (& vq -> mutex );
691+ if (!vhost_vq_get_backend (vq ) && !vq -> kick ) {
692+ mutex_unlock (& vq -> mutex );
693+ mutex_unlock (& old_worker -> mutex );
694+ /*
695+ * vsock can queue anytime after VHOST_VSOCK_SET_GUEST_CID.
696+ * Warn if it adds support for multiple workers but forgets to
697+ * handle the early queueing case.
698+ */
699+ WARN_ON (!old_worker -> attachment_cnt &&
700+ !llist_empty (& old_worker -> work_list ));
701+ return ;
702+ }
703+ mutex_unlock (& vq -> mutex );
704+
705+ /* Make sure new vq queue/flush/poll calls see the new worker */
706+ synchronize_rcu ();
707+ /* Make sure whatever was queued gets run */
708+ vhost_worker_flush (old_worker );
709+ mutex_unlock (& old_worker -> mutex );
637710}
638711
639- /* Caller must have device and virtqueue mutex */
712+ /* Caller must have device mutex */
640713static int vhost_vq_attach_worker (struct vhost_virtqueue * vq ,
641714 struct vhost_vring_worker * info )
642715{
@@ -647,15 +720,6 @@ static int vhost_vq_attach_worker(struct vhost_virtqueue *vq,
647720 if (!dev -> use_worker )
648721 return - EINVAL ;
649722
650- /*
651- * We only allow userspace to set a virtqueue's worker if it's not
652- * active and polling is not enabled. We also assume drivers
653- * supporting this will not be internally queueing works directly or
654- * via calls like vhost_dev_flush at this time.
655- */
656- if (vhost_vq_get_backend (vq ) || vq -> kick )
657- return - EBUSY ;
658-
659723 worker = xa_find (& dev -> worker_xa , & index , UINT_MAX , XA_PRESENT );
660724 if (!worker || worker -> id != info -> worker_id )
661725 return - ENODEV ;
@@ -689,8 +753,12 @@ static int vhost_free_worker(struct vhost_dev *dev,
689753 if (!worker || worker -> id != info -> worker_id )
690754 return - ENODEV ;
691755
692- if (worker -> attachment_cnt )
756+ mutex_lock (& worker -> mutex );
757+ if (worker -> attachment_cnt ) {
758+ mutex_unlock (& worker -> mutex );
693759 return - EBUSY ;
760+ }
761+ mutex_unlock (& worker -> mutex );
694762
695763 vhost_worker_destroy (dev , worker );
696764 return 0 ;
@@ -723,6 +791,7 @@ long vhost_worker_ioctl(struct vhost_dev *dev, unsigned int ioctl,
723791{
724792 struct vhost_vring_worker ring_worker ;
725793 struct vhost_worker_state state ;
794+ struct vhost_worker * worker ;
726795 struct vhost_virtqueue * vq ;
727796 long ret ;
728797 u32 idx ;
@@ -760,7 +829,6 @@ long vhost_worker_ioctl(struct vhost_dev *dev, unsigned int ioctl,
760829 if (ret )
761830 return ret ;
762831
763- mutex_lock (& vq -> mutex );
764832 switch (ioctl ) {
765833 case VHOST_ATTACH_VRING_WORKER :
766834 if (copy_from_user (& ring_worker , argp , sizeof (ring_worker ))) {
@@ -771,8 +839,15 @@ long vhost_worker_ioctl(struct vhost_dev *dev, unsigned int ioctl,
771839 ret = vhost_vq_attach_worker (vq , & ring_worker );
772840 break ;
773841 case VHOST_GET_VRING_WORKER :
842+ worker = rcu_dereference_check (vq -> worker ,
843+ lockdep_is_held (& dev -> mutex ));
844+ if (!worker ) {
845+ ret = - EINVAL ;
846+ break ;
847+ }
848+
774849 ring_worker .index = idx ;
775- ring_worker .worker_id = vq -> worker -> id ;
850+ ring_worker .worker_id = worker -> id ;
776851
777852 if (copy_to_user (argp , & ring_worker , sizeof (ring_worker )))
778853 ret = - EFAULT ;
@@ -782,7 +857,6 @@ long vhost_worker_ioctl(struct vhost_dev *dev, unsigned int ioctl,
782857 break ;
783858 }
784859
785- mutex_unlock (& vq -> mutex );
786860 return ret ;
787861}
788862EXPORT_SYMBOL_GPL (vhost_worker_ioctl );
@@ -817,11 +891,6 @@ long vhost_dev_set_owner(struct vhost_dev *dev)
817891 err = - ENOMEM ;
818892 goto err_worker ;
819893 }
820- /*
821- * vsock can already try to queue so make sure the worker
822- * is setup before vhost_vq_work_queue sees vq->worker is set.
823- */
824- smp_wmb ();
825894
826895 for (i = 0 ; i < dev -> nvqs ; i ++ )
827896 __vhost_vq_attach_worker (dev -> vqs [i ], worker );
0 commit comments