@@ -118,6 +118,7 @@ struct writequeue_entry {
118118 int len ;
119119 int end ;
120120 int users ;
121+ bool dirty ;
121122 struct connection * con ;
122123 struct list_head msgs ;
123124 struct kref ref ;
@@ -700,6 +701,42 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
700701 memset ((char * )saddr + * addr_len , 0 , sizeof (struct sockaddr_storage ) - * addr_len );
701702}
702703
704+ static void dlm_page_release (struct kref * kref )
705+ {
706+ struct writequeue_entry * e = container_of (kref , struct writequeue_entry ,
707+ ref );
708+
709+ __free_page (e -> page );
710+ kfree (e );
711+ }
712+
713+ static void dlm_msg_release (struct kref * kref )
714+ {
715+ struct dlm_msg * msg = container_of (kref , struct dlm_msg , ref );
716+
717+ kref_put (& msg -> entry -> ref , dlm_page_release );
718+ kfree (msg );
719+ }
720+
721+ static void free_entry (struct writequeue_entry * e )
722+ {
723+ struct dlm_msg * msg , * tmp ;
724+
725+ list_for_each_entry_safe (msg , tmp , & e -> msgs , list ) {
726+ if (msg -> orig_msg ) {
727+ msg -> orig_msg -> retransmit = false;
728+ kref_put (& msg -> orig_msg -> ref , dlm_msg_release );
729+ }
730+
731+ list_del (& msg -> list );
732+ kref_put (& msg -> ref , dlm_msg_release );
733+ }
734+
735+ list_del (& e -> list );
736+ atomic_dec (& e -> con -> writequeue_cnt );
737+ kref_put (& e -> ref , dlm_page_release );
738+ }
739+
703740static void dlm_close_sock (struct socket * * sock )
704741{
705742 if (* sock ) {
@@ -714,6 +751,7 @@ static void close_connection(struct connection *con, bool and_other,
714751 bool tx , bool rx )
715752{
716753 bool closing = test_and_set_bit (CF_CLOSING , & con -> flags );
754+ struct writequeue_entry * e ;
717755
718756 if (tx && !closing && cancel_work_sync (& con -> swork )) {
719757 log_print ("canceled swork for node %d" , con -> nodeid );
@@ -732,6 +770,26 @@ static void close_connection(struct connection *con, bool and_other,
732770 close_connection (con -> othercon , false, tx , rx );
733771 }
734772
773+ /* if we send a writequeue entry only a half way, we drop the
774+ * whole entry because reconnection and that we not start of the
775+ * middle of a msg which will confuse the other end.
776+ *
777+ * we can always drop messages because retransmits, but what we
778+ * cannot allow is to transmit half messages which may be processed
779+ * at the other side.
780+ *
781+ * our policy is to start on a clean state when disconnects, we don't
782+ * know what's send/received on transport layer in this case.
783+ */
784+ spin_lock (& con -> writequeue_lock );
785+ if (!list_empty (& con -> writequeue )) {
786+ e = list_first_entry (& con -> writequeue , struct writequeue_entry ,
787+ list );
788+ if (e -> dirty )
789+ free_entry (e );
790+ }
791+ spin_unlock (& con -> writequeue_lock );
792+
735793 con -> rx_leftover = 0 ;
736794 con -> retries = 0 ;
737795 clear_bit (CF_CONNECTED , & con -> flags );
@@ -1026,41 +1084,6 @@ static int accept_from_sock(struct listen_connection *con)
10261084 return result ;
10271085}
10281086
1029- static void dlm_page_release (struct kref * kref )
1030- {
1031- struct writequeue_entry * e = container_of (kref , struct writequeue_entry ,
1032- ref );
1033-
1034- __free_page (e -> page );
1035- kfree (e );
1036- }
1037-
1038- static void dlm_msg_release (struct kref * kref )
1039- {
1040- struct dlm_msg * msg = container_of (kref , struct dlm_msg , ref );
1041-
1042- kref_put (& msg -> entry -> ref , dlm_page_release );
1043- kfree (msg );
1044- }
1045-
1046- static void free_entry (struct writequeue_entry * e )
1047- {
1048- struct dlm_msg * msg , * tmp ;
1049-
1050- list_for_each_entry_safe (msg , tmp , & e -> msgs , list ) {
1051- if (msg -> orig_msg ) {
1052- msg -> orig_msg -> retransmit = false;
1053- kref_put (& msg -> orig_msg -> ref , dlm_msg_release );
1054- }
1055- list_del (& msg -> list );
1056- kref_put (& msg -> ref , dlm_msg_release );
1057- }
1058-
1059- list_del (& e -> list );
1060- atomic_dec (& e -> con -> writequeue_cnt );
1061- kref_put (& e -> ref , dlm_page_release );
1062- }
1063-
10641087/*
10651088 * writequeue_entry_complete - try to delete and free write queue entry
10661089 * @e: write queue entry to try to delete
@@ -1072,6 +1095,8 @@ static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
10721095{
10731096 e -> offset += completed ;
10741097 e -> len -= completed ;
1098+ /* signal that page was half way transmitted */
1099+ e -> dirty = true;
10751100
10761101 if (e -> len == 0 && e -> users == 0 )
10771102 free_entry (e );
0 commit comments