@@ -224,6 +224,80 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
224224 return NULL ;
225225}
226226
227+ struct dlm_dir_dump {
228+ /* init values to match if whole
229+ * dump fits to one seq. Sanity check only.
230+ */
231+ uint64_t seq_init ;
232+ uint64_t nodeid_init ;
233+ /* compare local pointer with last lookup,
234+ * just a sanity check.
235+ */
236+ struct list_head * last ;
237+
238+ unsigned int sent_res ; /* for log info */
239+ unsigned int sent_msg ; /* for log info */
240+
241+ struct list_head list ;
242+ };
243+
244+ static void drop_dir_ctx (struct dlm_ls * ls , int nodeid )
245+ {
246+ struct dlm_dir_dump * dd , * safe ;
247+
248+ write_lock (& ls -> ls_dir_dump_lock );
249+ list_for_each_entry_safe (dd , safe , & ls -> ls_dir_dump_list , list ) {
250+ if (dd -> nodeid_init == nodeid ) {
251+ log_error (ls , "drop dump seq %llu" ,
252+ (unsigned long long )dd -> seq_init );
253+ list_del (& dd -> list );
254+ kfree (dd );
255+ }
256+ }
257+ write_unlock (& ls -> ls_dir_dump_lock );
258+ }
259+
260+ static struct dlm_dir_dump * lookup_dir_dump (struct dlm_ls * ls , int nodeid )
261+ {
262+ struct dlm_dir_dump * iter , * dd = NULL ;
263+
264+ read_lock (& ls -> ls_dir_dump_lock );
265+ list_for_each_entry (iter , & ls -> ls_dir_dump_list , list ) {
266+ if (iter -> nodeid_init == nodeid ) {
267+ dd = iter ;
268+ break ;
269+ }
270+ }
271+ read_unlock (& ls -> ls_dir_dump_lock );
272+
273+ return dd ;
274+ }
275+
276+ static struct dlm_dir_dump * init_dir_dump (struct dlm_ls * ls , int nodeid )
277+ {
278+ struct dlm_dir_dump * dd ;
279+
280+ dd = lookup_dir_dump (ls , nodeid );
281+ if (dd ) {
282+ log_error (ls , "found ongoing dir dump for node %d, will drop it" ,
283+ nodeid );
284+ drop_dir_ctx (ls , nodeid );
285+ }
286+
287+ dd = kzalloc (sizeof (* dd ), GFP_ATOMIC );
288+ if (!dd )
289+ return NULL ;
290+
291+ dd -> seq_init = ls -> ls_recover_seq ;
292+ dd -> nodeid_init = nodeid ;
293+
294+ write_lock (& ls -> ls_dir_dump_lock );
295+ list_add (& dd -> list , & ls -> ls_dir_dump_list );
296+ write_unlock (& ls -> ls_dir_dump_lock );
297+
298+ return dd ;
299+ }
300+
227301/* Find the rsb where we left off (or start again), then send rsb names
228302 for rsb's we're master of and whose directory node matches the requesting
229303 node. inbuf is the rsb name last sent, inlen is the name's length */
@@ -234,20 +308,46 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
234308 struct list_head * list ;
235309 struct dlm_rsb * r ;
236310 int offset = 0 , dir_nodeid ;
311+ struct dlm_dir_dump * dd ;
237312 __be16 be_namelen ;
238313
239314 read_lock (& ls -> ls_masters_lock );
240315
241316 if (inlen > 1 ) {
317+ dd = lookup_dir_dump (ls , nodeid );
318+ if (!dd ) {
319+ log_error (ls , "failed to lookup dir dump context nodeid: %d" ,
320+ nodeid );
321+ goto out ;
322+ }
323+
324+ /* next chunk in dump */
242325 r = find_rsb_root (ls , inbuf , inlen );
243326 if (!r ) {
244327 log_error (ls , "copy_master_names from %d start %d %.*s" ,
245328 nodeid , inlen , inlen , inbuf );
246329 goto out ;
247330 }
248331 list = r -> res_masters_list .next ;
332+
333+ /* sanity checks */
334+ if (dd -> last != & r -> res_masters_list ||
335+ dd -> seq_init != ls -> ls_recover_seq ) {
336+ log_error (ls , "failed dir dump sanity check seq_init: %llu seq: %llu" ,
337+ (unsigned long long )dd -> seq_init ,
338+ (unsigned long long )ls -> ls_recover_seq );
339+ goto out ;
340+ }
249341 } else {
342+ dd = init_dir_dump (ls , nodeid );
343+ if (!dd ) {
344+ log_error (ls , "failed to allocate dir dump context" );
345+ goto out ;
346+ }
347+
348+ /* start dump */
250349 list = ls -> ls_masters_list .next ;
350+ dd -> last = list ;
251351 }
252352
253353 for (offset = 0 ; list != & ls -> ls_masters_list ; list = list -> next ) {
@@ -269,7 +369,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
269369 be_namelen = cpu_to_be16 (0 );
270370 memcpy (outbuf + offset , & be_namelen , sizeof (__be16 ));
271371 offset += sizeof (__be16 );
272- ls -> ls_recover_dir_sent_msg ++ ;
372+ dd -> sent_msg ++ ;
273373 goto out ;
274374 }
275375
@@ -278,7 +378,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
278378 offset += sizeof (__be16 );
279379 memcpy (outbuf + offset , r -> res_name , r -> res_length );
280380 offset += r -> res_length ;
281- ls -> ls_recover_dir_sent_res ++ ;
381+ dd -> sent_res ++ ;
382+ dd -> last = list ;
282383 }
283384
284385 /*
@@ -288,10 +389,18 @@ void dlm_copy_master_names(struct dlm_ls *ls, const char *inbuf, int inlen,
288389
289390 if ((list == & ls -> ls_masters_list ) &&
290391 (offset + sizeof (uint16_t ) <= outlen )) {
392+ /* end dump */
291393 be_namelen = cpu_to_be16 (0xFFFF );
292394 memcpy (outbuf + offset , & be_namelen , sizeof (__be16 ));
293395 offset += sizeof (__be16 );
294- ls -> ls_recover_dir_sent_msg ++ ;
396+ dd -> sent_msg ++ ;
397+ log_rinfo (ls , "dlm_recover_directory nodeid %d sent %u res out %u messages" ,
398+ nodeid , dd -> sent_res , dd -> sent_msg );
399+
400+ write_lock (& ls -> ls_dir_dump_lock );
401+ list_del_init (& dd -> list );
402+ write_unlock (& ls -> ls_dir_dump_lock );
403+ kfree (dd );
295404 }
296405 out :
297406 read_unlock (& ls -> ls_masters_lock );
0 commit comments