Skip to content

Commit a050c7b

Browse files
authored
Merge pull request #50 from Simperium/revisions/increase-count
Revisions: Increase count
2 parents 51a737b + f43a1b0 commit a050c7b

2 files changed

Lines changed: 136 additions & 25 deletions

File tree

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "simperium",
3-
"version": "0.2.7",
3+
"version": "0.2.8",
44
"description": "A simperium client for node.js",
55
"main": "./lib/simperium/index.js",
66
"repository": {

src/simperium/channel.js

Lines changed: 135 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,11 @@ Channel.prototype.onChangeVersion = function( data ) {
450450
};
451451

452452
Channel.prototype.onVersion = function( data ) {
453+
// invalid version, give up without emitting
454+
if ( data.slice( -2 ) === '\n?' ) {
455+
return;
456+
}
457+
453458
var ghost = parseVersionMessage( data );
454459

455460
this.emit( 'version', ghost.id, ghost.version, ghost.data );
@@ -647,35 +652,141 @@ LocalQueue.prototype.resendSentChanges = function() {
647652
}
648653
}
649654

655+
/**
656+
* Since revision data is basically immutable we can prevent the
657+
* need to refetch it after it has been loaded once.
658+
*
659+
* E.g. key could be `${ entityId }.${ versionNumber }`
660+
*
661+
* @type {Map<String,Object>} stores specific revisions as a cache
662+
*/
663+
export const revisionCache = new Map();
664+
665+
/**
666+
* Attempts to fetch an entity's revisions
667+
*
668+
* By default, a bucket stores two kinds of history:
669+
* - revisions: the most-recent changes to an entity (60 of these)
670+
* - archive: a "snapshot" of every ten revisions (100 of these)
671+
*
672+
* Together the revisions and archive span changes over the
673+
* 1,060 most-recent changes to an entity, but of course once
674+
* we hit the archive we lose save granularity.
675+
*
676+
* Individual buckets can override the defaults as well and also
677+
* completely eliminate them.
678+
*
679+
* We don't have a listing of which revisions exist for a given entity.
680+
*
681+
* @param {Object} channel used to send messages to the Simperium server
682+
* @param {String} id entity id for which to fetch revisions
683+
* @param {Function} callback called on error or when finished
684+
*/
650685
function collectionRevisions( channel, id, callback ) {
651-
var expectedVersions = -1;
652-
var onGhostRetrieved = function( ghost ) {
653-
var version = Math.min( ghost.version, 30 );
654-
var i;
655-
expectedVersions = version;
656-
657-
// Loop through requested revision count and request each version
658-
for ( i = 0; i < version; i++ ) {
659-
channel.send( 'e:' + id + '.' + ( ghost.version - i ) );
686+
/** @type {Number} ms delay arbitrarily chosen to give up on fetch */
687+
const TIMEOUT = 200;
688+
689+
/** @type {Set} tracks requested revisions */
690+
const requestedVersions = new Set();
691+
692+
/** @type {Array<Object>} contains the revisions and associated data */
693+
const versions = [];
694+
695+
/** @type {Number} remembers newest version of an entity */
696+
let latestVersion;
697+
698+
/** @type {Number} handle for "start finishing" timeout */
699+
let timeout;
700+
701+
/**
702+
* Receive a version update from the server and
703+
* dispatch the next fetch or finish the fetching
704+
*
705+
* @param {String} id entity id
706+
* @param {Number} version version of returned entity
707+
* @param {Object} data value of entity at revision
708+
*/
709+
function onVersion( id, version, data ) {
710+
revisionCache.set( `${ id }.${ version }`, data );
711+
versions.push( { id, version, data } );
712+
713+
// if we have every possible revision already, finish it!
714+
// this bypasses any mandatory delay
715+
if ( versions.length === latestVersion ) {
716+
return finish();
660717
}
661-
};
662718

663-
var versions = [];
664-
var onVersion = function( id, version, data ) {
665-
versions.push( {id: id, version: version, data: data} );
719+
fetchNextVersion( version );
720+
721+
// defer the final response to the application
722+
clearTimeout( timeout );
723+
timeout = setTimeout( finish, TIMEOUT );
724+
}
725+
726+
/**
727+
* Stop listening for versions and stop fetching them
728+
* and pass accumulated data back to application
729+
*/
730+
function finish() {
731+
clearTimeout( timeout );
732+
channel.removeListener( `version.${ id }`, onVersion );
733+
734+
// sort newest first
735+
callback( null, versions.sort( ( a, b ) => b.version - a.version ) );
736+
}
666737

667-
// Check if all versions have been collected
668-
if ( expectedVersions === versions.length ) {
669-
channel.removeListener( 'version.' + id, onVersion );
670-
callback( null, versions.sort( function( a, b ) {
671-
return a.version > b.version ? -1 : 1;
672-
} ) );
738+
/**
739+
* Find the next version which isn't around and issue
740+
* a fetch if possible
741+
*
742+
* @param {Number} prevVersion starting point for finding next version
743+
*/
744+
function fetchNextVersion( prevVersion ) {
745+
let version = prevVersion;
746+
747+
// find the next version to request
748+
// some could have come back already
749+
// or been requested already
750+
while ( version > 0 && requestedVersions.has( version ) ) {
751+
version -= 1;
673752
}
674-
};
675753

676-
channel.on( 'version.' + id, onVersion );
754+
// we have them all
755+
if ( ! version ) {
756+
return;
757+
}
677758

678-
channel.store.get( id ).then( onGhostRetrieved, function( e ) {
679-
callback( e );
680-
} );
759+
requestedVersions.add( version );
760+
761+
// fetch from server or local cache
762+
if ( revisionCache.has( `${ id }.${ version }` ) ) {
763+
onVersion( id, version, revisionCache.get( `${ id }.${ version }` ) );
764+
} else {
765+
channel.send( `e:${ id }.${ version }` );
766+
}
767+
}
768+
769+
// start listening for the responses
770+
channel.on( `version.${ id }`, onVersion );
771+
772+
// request the first revision and start the sequence
773+
// pre-emptively fetch as many as could exist by default
774+
channel.store.get( id ).then( ( { version } ) => {
775+
latestVersion = version;
776+
777+
// grab latest change revisions
778+
for ( let i = 0; i < 60 && ( version - i ) > 0; i++ ) {
779+
fetchNextVersion( version - i );
780+
}
781+
782+
// grab archive revisions
783+
// these are like 1, 11, 21, 31, …, 41, normal revisions [42, 43, 44, 45, …]
784+
const firstArchive = Math.round( ( version - 60 ) / 10 ) * 10 + 1; // 127 -> 67 -> 6 -> 60 -> 61
785+
for ( let i = 0; i < 100 && ( firstArchive - 10 * i ) > 0; i++ ) {
786+
fetchNextVersion( firstArchive - 10 * i );
787+
}
788+
}, callback );
789+
790+
// and set an initial timeout for failed connections
791+
timeout = setTimeout( finish, TIMEOUT * 4 );
681792
}

0 commit comments

Comments
 (0)