Skip to content

Commit 70c9a1a

Browse files
committed
sends sent changes on reconnect
After auth and index and/or receive changes from server emits a 'ready' event for the channel. After authing, listens for one ready event and resends any changes in the local queue
1 parent 8a6ddfd commit 70c9a1a

2 files changed

Lines changed: 53 additions & 18 deletions

File tree

src/simperium/channel.js

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ internal.indexingComplete = function() {
197197
this.index_last_id = null;
198198
this.index_cv = null;
199199
this.bucket.removeListener( 'update', this.bucketUpdateListener );
200+
this.emit( 'ready' )
200201
}
201202

202203
export default function Channel( appid, access_token, bucket, store ) {
@@ -323,11 +324,11 @@ Channel.prototype.onAuth = function( data ) {
323324
this.emit( 'unauthorized', auth );
324325
return;
325326
} catch ( error ) {
326-
// Clear any unacknowledged changes on reconnect
327-
this.localQueue.sent = {};
328-
329327
// request cv and then send method
330-
init = function( cv ) {
328+
this.once( 'ready', () => {
329+
this.localQueue.resendSentChanges();
330+
} )
331+
init = ( cv ) => {
331332
if ( cv ) {
332333
this.localQueue.start();
333334
this.sendChangeVersionRequest( cv );
@@ -338,7 +339,7 @@ Channel.prototype.onAuth = function( data ) {
338339
}
339340
};
340341

341-
this.store.getChangeVersion().then( init.bind( this ) );
342+
this.store.getChangeVersion().then( init );
342343

343344
return;
344345
}
@@ -400,6 +401,8 @@ Channel.prototype.onChanges = function( data ) {
400401
changes.forEach( function( change ) {
401402
onChange( change.id, change );
402403
} );
404+
// emit ready after all server changes have been applied
405+
this.emit( 'ready' );
403406
}
404407
;
405408

@@ -590,6 +593,12 @@ LocalQueue.prototype.compressAndSend = function( id, ghost ) {
590593
this.emit( 'send', change );
591594
}
592595

596+
LocalQueue.prototype.resendSentChanges = function() {
597+
for ( let ccid in this.sent ) {
598+
this.emit( 'send', this.sent[ccid] )
599+
}
600+
}
601+
593602
function collectionRevisions( channel, id, callback ) {
594603
var expectedVersions = -1;
595604
var onGhostRetrieved = function( ghost ) {

test/simperium/channel_test.js

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import Channel from '../../src/simperium/channel'
33
import util from 'util'
44
import { parseMessage } from '../../src/simperium/util'
5-
import assert, { equal } from 'assert'
5+
import assert, { equal, ok } from 'assert'
66
import * as fn from './fn'
77
import jsondiff from '../../src/simperium/jsondiff'
88
import defaultGhostStoreProvider from '../../src/simperium/ghost/default'
@@ -12,6 +12,11 @@ import mockBucketStore from './mock_bucket_store'
1212

1313
const differ = jsondiff()
1414
const diff = differ.object_diff.bind( differ )
15+
const cycle = ( ... fns ) => ( ... args ) => {
16+
const [ head, ... rest ] = fns
17+
head( ... args )
18+
fns = rest.concat( head )
19+
}
1520

1621
describe( 'Channel', function() {
1722
var channel, bucket, store;
@@ -158,6 +163,21 @@ describe( 'Channel', function() {
158163
bucket.update( 'mock-id', data );
159164
} );
160165

166+
it( 'should resend sent but unacknowledged changes on reconnect', () => new Promise( resolve => {
167+
channel.localQueue.sent['fake-ccid'] = { fake: 'change' }
168+
169+
channel.on( 'send', cycle(
170+
() => channel.handleMessage( 'i:{"index":[],"current":"cv"}'),
171+
m => {
172+
resolve()
173+
}
174+
) )
175+
176+
channel.handleMessage( 'auth:user@example.com' )
177+
178+
channel.emit( 'ready' )
179+
} ) )
180+
161181
it( 'should send remove operation', function( done ) {
162182
channel.on( 'send', function( msg ) {
163183
var message = parseMessage( msg ),
@@ -221,6 +241,11 @@ describe( 'Channel', function() {
221241
channel.handleMessage( 'c:' + JSON.stringify( [change] ) );
222242
} );
223243

244+
it( 'should emit ready after receiving changes', ( done ) => {
245+
channel.on( 'ready', () => done() )
246+
channel.handleMessage( 'c:[]' );
247+
} )
248+
224249
it( 'should notify bucket after network deletion', function( done ) {
225250
var key = 'deleteTest';
226251

@@ -366,12 +391,10 @@ describe( 'Channel', function() {
366391
} );
367392

368393
describe( 'after authorizing', function() {
369-
beforeEach( function( next ) {
370-
channel.once( 'send', function() {
371-
next();
372-
} );
394+
beforeEach( () => new Promise( resolve => {
395+
channel.once( 'send', () => resolve() );
373396
channel.onConnect();
374-
} );
397+
} ) );
375398

376399
it( 'should request index', function( done ) {
377400
channel.once( 'send', function( data ) {
@@ -402,17 +425,20 @@ describe( 'Channel', function() {
402425
} );
403426
} );
404427

405-
it( 'should emit index event when index complete', function( done ) {
428+
it( 'should emit index and ready event when index complete', () => new Promise( resolve => {
406429
var page = 'i:{"index":[{"id":"objectid","v":1,"d":{"title":"Hello World"}}],"current":"cv"}';
430+
let indexed = false
407431
channel.on( 'index', function( cv ) {
408-
setImmediate( function() {
409-
assert.equal( 'cv', cv );
410-
assert( !bucket.isIndexing );
411-
done();
412-
} )
432+
assert.equal( 'cv', cv );
433+
assert( !bucket.isIndexing );
434+
indexed = true
413435
} );
436+
channel.on( 'ready', () => {
437+
ok( indexed )
438+
resolve()
439+
} )
414440
channel.handleMessage( page );
415-
} );
441+
} ) );
416442

417443
it( 'should request next index page', function( done ) {
418444
var page = 'i:{"index":[{"id":"objectid","v":1,"d":{"title":"Hello World"}}],"mark":"next-mark","current":"cv"}';

0 commit comments

Comments
 (0)