@@ -81,10 +81,13 @@ struct connection {
8181#define CF_CONNECTED 10
8282#define CF_RECONNECT 11
8383#define CF_DELAY_CONNECT 12
84+ #define CF_EOF 13
8485 struct list_head writequeue ; /* List of outgoing writequeue_entries */
8586 spinlock_t writequeue_lock ;
87+ atomic_t writequeue_cnt ;
8688 void (* connect_action ) (struct connection * ); /* What to do to connect */
8789 void (* shutdown_action )(struct connection * con ); /* What to do to shutdown */
90+ bool (* eof_condition )(struct connection * con ); /* What to do to eof check */
8891 int retries ;
8992#define MAX_CONNECT_RETRIES 3
9093 struct hlist_node list ;
@@ -179,6 +182,11 @@ static struct connection *__find_con(int nodeid, int r)
179182 return NULL ;
180183}
181184
185+ static bool tcp_eof_condition (struct connection * con )
186+ {
187+ return atomic_read (& con -> writequeue_cnt );
188+ }
189+
182190static int dlm_con_init (struct connection * con , int nodeid )
183191{
184192 con -> rx_buflen = dlm_config .ci_buffer_size ;
@@ -190,13 +198,15 @@ static int dlm_con_init(struct connection *con, int nodeid)
190198 mutex_init (& con -> sock_mutex );
191199 INIT_LIST_HEAD (& con -> writequeue );
192200 spin_lock_init (& con -> writequeue_lock );
201+ atomic_set (& con -> writequeue_cnt , 0 );
193202 INIT_WORK (& con -> swork , process_send_sockets );
194203 INIT_WORK (& con -> rwork , process_recv_sockets );
195204 init_waitqueue_head (& con -> shutdown_wait );
196205
197206 if (dlm_config .ci_protocol == 0 ) {
198207 con -> connect_action = tcp_connect_to_sock ;
199208 con -> shutdown_action = dlm_tcp_shutdown ;
209+ con -> eof_condition = tcp_eof_condition ;
200210 } else {
201211 con -> connect_action = sctp_connect_to_sock ;
202212 }
@@ -723,6 +733,7 @@ static void close_connection(struct connection *con, bool and_other,
723733 clear_bit (CF_CONNECTED , & con -> flags );
724734 clear_bit (CF_DELAY_CONNECT , & con -> flags );
725735 clear_bit (CF_RECONNECT , & con -> flags );
736+ clear_bit (CF_EOF , & con -> flags );
726737 mutex_unlock (& con -> sock_mutex );
727738 clear_bit (CF_CLOSING , & con -> flags );
728739}
@@ -860,16 +871,26 @@ static int receive_from_sock(struct connection *con)
860871 return - EAGAIN ;
861872
862873out_close :
863- mutex_unlock (& con -> sock_mutex );
864874 if (ret == 0 ) {
865- close_connection (con , false, true, false);
866875 log_print ("connection %p got EOF from %d" ,
867876 con , con -> nodeid );
868- /* handling for tcp shutdown */
869- clear_bit (CF_SHUTDOWN , & con -> flags );
870- wake_up (& con -> shutdown_wait );
877+
878+ if (con -> eof_condition && con -> eof_condition (con )) {
879+ set_bit (CF_EOF , & con -> flags );
880+ mutex_unlock (& con -> sock_mutex );
881+ } else {
882+ mutex_unlock (& con -> sock_mutex );
883+ close_connection (con , false, true, false);
884+
885+ /* handling for tcp shutdown */
886+ clear_bit (CF_SHUTDOWN , & con -> flags );
887+ wake_up (& con -> shutdown_wait );
888+ }
889+
871890 /* signal to breaking receive worker */
872891 ret = -1 ;
892+ } else {
893+ mutex_unlock (& con -> sock_mutex );
873894 }
874895 return ret ;
875896}
@@ -1021,6 +1042,7 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
10211042
10221043 if (e -> len == 0 && e -> users == 0 ) {
10231044 list_del (& e -> list );
1045+ atomic_dec (& e -> con -> writequeue_cnt );
10241046 free_entry (e );
10251047 }
10261048}
@@ -1417,6 +1439,7 @@ static struct writequeue_entry *new_wq_entry(struct connection *con, int len,
14171439
14181440 * ppc = page_address (e -> page );
14191441 e -> end += len ;
1442+ atomic_inc (& con -> writequeue_cnt );
14201443
14211444 spin_lock (& con -> writequeue_lock );
14221445 list_add_tail (& e -> list , & con -> writequeue );
@@ -1536,6 +1559,21 @@ static void send_to_sock(struct connection *con)
15361559 writequeue_entry_complete (e , ret );
15371560 }
15381561 spin_unlock (& con -> writequeue_lock );
1562+
1563+ /* close if we got EOF */
1564+ if (test_and_clear_bit (CF_EOF , & con -> flags )) {
1565+ mutex_unlock (& con -> sock_mutex );
1566+ close_connection (con , false, false, true);
1567+
1568+ /* handling for tcp shutdown */
1569+ clear_bit (CF_SHUTDOWN , & con -> flags );
1570+ wake_up (& con -> shutdown_wait );
1571+ } else {
1572+ mutex_unlock (& con -> sock_mutex );
1573+ }
1574+
1575+ return ;
1576+
15391577out :
15401578 mutex_unlock (& con -> sock_mutex );
15411579 return ;
0 commit comments