@@ -119,8 +119,19 @@ struct writequeue_entry {
119119 int len ;
120120 int end ;
121121 int users ;
122- int idx ; /* get()/commit() idx exchange */
123122 struct connection * con ;
123+ struct list_head msgs ;
124+ struct kref ref ;
125+ };
126+
127+ struct dlm_msg {
128+ struct writequeue_entry * entry ;
129+ void * ppc ;
130+ int len ;
131+ int idx ; /* new()/commit() idx exchange */
132+
133+ struct list_head list ;
134+ struct kref ref ;
124135};
125136
126137struct dlm_node_addr {
@@ -1022,12 +1033,37 @@ static int accept_from_sock(struct listen_connection *con)
10221033 return result ;
10231034}
10241035
1025- static void free_entry (struct writequeue_entry * e )
1036+ static void dlm_page_release (struct kref * kref )
10261037{
1038+ struct writequeue_entry * e = container_of (kref , struct writequeue_entry ,
1039+ ref );
1040+
10271041 __free_page (e -> page );
10281042 kfree (e );
10291043}
10301044
1045+ static void dlm_msg_release (struct kref * kref )
1046+ {
1047+ struct dlm_msg * msg = container_of (kref , struct dlm_msg , ref );
1048+
1049+ kref_put (& msg -> entry -> ref , dlm_page_release );
1050+ kfree (msg );
1051+ }
1052+
1053+ static void free_entry (struct writequeue_entry * e )
1054+ {
1055+ struct dlm_msg * msg , * tmp ;
1056+
1057+ list_for_each_entry_safe (msg , tmp , & e -> msgs , list ) {
1058+ list_del (& msg -> list );
1059+ kref_put (& msg -> ref , dlm_msg_release );
1060+ }
1061+
1062+ list_del (& e -> list );
1063+ atomic_dec (& e -> con -> writequeue_cnt );
1064+ kref_put (& e -> ref , dlm_page_release );
1065+ }
1066+
10311067/*
10321068 * writequeue_entry_complete - try to delete and free write queue entry
10331069 * @e: write queue entry to try to delete
@@ -1040,11 +1076,8 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
10401076 e -> offset += completed ;
10411077 e -> len -= completed ;
10421078
1043- if (e -> len == 0 && e -> users == 0 ) {
1044- list_del (& e -> list );
1045- atomic_dec (& e -> con -> writequeue_cnt );
1079+ if (e -> len == 0 && e -> users == 0 )
10461080 free_entry (e );
1047- }
10481081}
10491082
10501083/*
@@ -1410,20 +1443,29 @@ static struct writequeue_entry *new_writequeue_entry(struct connection *con,
14101443
14111444 entry -> con = con ;
14121445 entry -> users = 1 ;
1446+ kref_init (& entry -> ref );
1447+ INIT_LIST_HEAD (& entry -> msgs );
14131448
14141449 return entry ;
14151450}
14161451
14171452static struct writequeue_entry * new_wq_entry (struct connection * con , int len ,
1418- gfp_t allocation , char * * ppc )
1453+ gfp_t allocation , char * * ppc ,
1454+ void (* cb )(struct dlm_mhandle * mh ),
1455+ struct dlm_mhandle * mh )
14191456{
14201457 struct writequeue_entry * e ;
14211458
14221459 spin_lock (& con -> writequeue_lock );
14231460 if (!list_empty (& con -> writequeue )) {
14241461 e = list_last_entry (& con -> writequeue , struct writequeue_entry , list );
14251462 if (DLM_WQ_REMAIN_BYTES (e ) >= len ) {
1463+ kref_get (& e -> ref );
1464+
14261465 * ppc = page_address (e -> page ) + e -> end ;
1466+ if (cb )
1467+ cb (mh );
1468+
14271469 e -> end += len ;
14281470 e -> users ++ ;
14291471 spin_unlock (& con -> writequeue_lock );
@@ -1437,21 +1479,28 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
14371479 if (!e )
14381480 return NULL ;
14391481
1482+ kref_get (& e -> ref );
14401483 * ppc = page_address (e -> page );
14411484 e -> end += len ;
14421485 atomic_inc (& con -> writequeue_cnt );
14431486
14441487 spin_lock (& con -> writequeue_lock );
1488+ if (cb )
1489+ cb (mh );
1490+
14451491 list_add_tail (& e -> list , & con -> writequeue );
14461492 spin_unlock (& con -> writequeue_lock );
14471493
14481494 return e ;
14491495};
14501496
1451- void * dlm_lowcomms_get_buffer (int nodeid , int len , gfp_t allocation , char * * ppc )
1497+ struct dlm_msg * dlm_lowcomms_new_msg (int nodeid , int len , gfp_t allocation ,
1498+ char * * ppc , void (* cb )(struct dlm_mhandle * mh ),
1499+ struct dlm_mhandle * mh )
14521500{
14531501 struct writequeue_entry * e ;
14541502 struct connection * con ;
1503+ struct dlm_msg * msg ;
14551504 int idx ;
14561505
14571506 if (len > DEFAULT_BUFFER_SIZE ||
@@ -1469,25 +1518,41 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
14691518 return NULL ;
14701519 }
14711520
1472- e = new_wq_entry (con , len , allocation , ppc );
1521+ msg = kzalloc (sizeof (* msg ), allocation );
1522+ if (!msg ) {
1523+ srcu_read_unlock (& connections_srcu , idx );
1524+ return NULL ;
1525+ }
1526+
1527+ kref_init (& msg -> ref );
1528+
1529+ e = new_wq_entry (con , len , allocation , ppc , cb , mh );
14731530 if (!e ) {
14741531 srcu_read_unlock (& connections_srcu , idx );
1532+ kfree (msg );
14751533 return NULL ;
14761534 }
14771535
1536+ msg -> ppc = * ppc ;
1537+ msg -> len = len ;
1538+ msg -> entry = e ;
1539+
14781540 /* we assume if successful commit must called */
1479- e -> idx = idx ;
1541+ msg -> idx = idx ;
14801542
1481- return e ;
1543+ return msg ;
14821544}
14831545
1484- void dlm_lowcomms_commit_buffer ( void * mh )
1546+ void dlm_lowcomms_commit_msg ( struct dlm_msg * msg )
14851547{
1486- struct writequeue_entry * e = ( struct writequeue_entry * ) mh ;
1548+ struct writequeue_entry * e = msg -> entry ;
14871549 struct connection * con = e -> con ;
14881550 int users ;
14891551
14901552 spin_lock (& con -> writequeue_lock );
1553+ kref_get (& msg -> ref );
1554+ list_add (& msg -> list , & e -> msgs );
1555+
14911556 users = -- e -> users ;
14921557 if (users )
14931558 goto out ;
@@ -1496,15 +1561,20 @@ void dlm_lowcomms_commit_buffer(void *mh)
14961561 spin_unlock (& con -> writequeue_lock );
14971562
14981563 queue_work (send_workqueue , & con -> swork );
1499- srcu_read_unlock (& connections_srcu , e -> idx );
1564+ srcu_read_unlock (& connections_srcu , msg -> idx );
15001565 return ;
15011566
15021567out :
15031568 spin_unlock (& con -> writequeue_lock );
1504- srcu_read_unlock (& connections_srcu , e -> idx );
1569+ srcu_read_unlock (& connections_srcu , msg -> idx );
15051570 return ;
15061571}
15071572
1573+ void dlm_lowcomms_put_msg (struct dlm_msg * msg )
1574+ {
1575+ kref_put (& msg -> ref , dlm_msg_release );
1576+ }
1577+
15081578/* Send a message */
15091579static void send_to_sock (struct connection * con )
15101580{
@@ -1590,7 +1660,6 @@ static void clean_one_writequeue(struct connection *con)
15901660
15911661 spin_lock (& con -> writequeue_lock );
15921662 list_for_each_entry_safe (e , safe , & con -> writequeue , list ) {
1593- list_del (& e -> list );
15941663 free_entry (e );
15951664 }
15961665 spin_unlock (& con -> writequeue_lock );
0 commit comments