@@ -445,6 +445,11 @@ Channel.prototype.onChangeVersion = function( data ) {
445445} ;
446446
447447Channel . prototype . onVersion = function ( data ) {
448+ // invalid version, give up without emitting
449+ if ( data . slice ( - 2 ) === '\n?' ) {
450+ return ;
451+ }
452+
448453 var ghost = parseVersionMessage ( data ) ;
449454
450455 this . emit ( 'version' , ghost . id , ghost . version , ghost . data ) ;
@@ -642,45 +647,120 @@ LocalQueue.prototype.resendSentChanges = function() {
642647 }
643648}
644649
650+ /**
651+ * Attempts to fetch an entity's revisions
652+ *
653+ * By default, a bucket stores two kinds of history:
654+ * - revisions: the most-recent changes to an entity (60 of these)
655+ * - archive: a "snapshot" of every ten revisions (100 of these)
656+ *
657+ * Together the revisions and archive span changes over the
658+ * 1,060 most-recent changes to an entity, but of course once
659+ * we hit the archive we lose save granularity.
660+ *
661+ * Individual buckets can override the defaults as well and also
662+ * completely eliminate them.
663+ *
664+ * We don't have a listing of which revisions exist for a given entity.
665+ *
666+ * @param {Object } channel used to send messages to the Simperium server
667+ * @param {String } id entity id for which to fetch revisions
668+ * @param {Function } callback called on error or when finished
669+ */
645670function collectionRevisions ( channel , id , callback ) {
646- let expectedVersions ;
647- let timeout ;
648- const TIMEOUT = 10000 ; // arbitrarily chosen delay
671+ /** @type {Number } ms delay arbitrarily chosen to give up on fetch */
672+ const TIMEOUT = 200 ;
673+
674+ /** @type {Set } tracks requested revisions */
675+ const requestedVersions = new Set ( ) ;
649676
677+ /** @type {Array<Object> } contains the revisions and associated data */
650678 const versions = [ ] ;
651679
652- const finish = listener => {
653- channel . removeListener ( 'version.' + id , listener ) ;
654- callback ( null , versions . sort ( ( a , b ) => a . version - b . version ) ) ;
655- clearTimeout ( timeout ) ;
656- } ;
680+ /** @type { Number } remembers newest version of an entity */
681+ let latestVersion ;
682+
683+ /** @type { Number } handle for "start finishing" timeout */
684+ let timeout ;
657685
658- const onVersion = ( id , version , data ) => {
686+ /**
687+ * Receive a version update from the server and
688+ * dispatch the next fetch or finish the fetching
689+ *
690+ * @param {String } id entity id
691+ * @param {Number } version version of returned entity
692+ * @param {Object } data value of entity at revision
693+ */
694+ function onVersion ( id , version , data ) {
659695 versions . push ( { id, version, data } ) ;
660696
661- // Check if all versions have been collected
662- if ( expectedVersions === versions . length ) {
663- finish ( onVersion ) ;
697+ // if we have every possible revision already, finish it!
698+ // this bypasses any mandatory delay
699+ if ( versions . length === latestVersion ) {
700+ return finish ( ) ;
664701 }
665- } ;
666702
667- channel . on ( 'version.' + id , onVersion ) ;
703+ fetchNextVersion ( version ) ;
704+
705+ // defer the final response to the application
706+ clearTimeout ( timeout ) ;
707+ timeout = setTimeout ( finish , TIMEOUT ) ;
708+ }
709+
710+ /**
711+ * Stop listening for versions and stop fetching them
712+ * and pass accumulated data back to application
713+ */
714+ function finish ( ) {
715+ clearTimeout ( timeout ) ;
716+ channel . removeListener ( `version.${ id } ` , onVersion ) ;
717+
718+ // sort newest first
719+ callback ( null , versions . sort ( ( a , b ) => b . version - a . version ) ) ;
720+ }
721+
722+ /**
723+ * Find the next version which isn't around and issue
724+ * a fetch if possible
725+ *
726+ * @param {Number } prevVersion starting point for finding next version
727+ */
728+ function fetchNextVersion ( prevVersion ) {
729+ let version = prevVersion - 1 ;
730+
731+ // find the next version to request
732+ // some could have come back already
733+ // or been requested already
734+ while ( version > 0 && requestedVersions . has ( version ) ) {
735+ version -= 1 ;
736+ }
737+
738+ channel . send ( `e:${ id } .${ version } ` ) ;
739+ requestedVersions . add ( version ) ;
740+ }
668741
669- channel . store . get ( id ) . then ( ghost => {
670- // the default bucket options allow for storing
671- // the 60 most-recent revisions of a note plus
672- // 100 archive versions (these store one out of
673- // every ten versions). we'll get up to this many
674- const version = Math . min ( ghost . version , 160 ) ;
675- expectedVersions = version ;
742+ // start listening for the responses
743+ channel . on ( `version.${ id } ` , onVersion ) ;
744+
745+ // request the first revision and start the sequence
746+ // pre-emptively fetch as many as could exist by default
747+ channel . store . get ( id ) . then ( ( { version } ) => {
748+ latestVersion = version ;
749+
750+ // grab latest change revisions
751+ for ( let i = 0 ; i < 60 && ( version - i ) > 0 ; i ++ ) {
752+ fetchNextVersion ( version - i ) ;
753+ }
676754
677- // Loop through requested revision count and request each version
678- for ( let i = 0 ; i < version ; i ++ ) {
679- channel . send ( 'e:' + id + '.' + ( ghost . version - i ) ) ;
755+ // grab archive revisions
756+ // these are like 1, 11, 21, 31, …, 41, normal revisions [42, 43, 44, 45, …]
757+ const firstArchive = Math . round ( ( version - 60 ) / 10 ) * 10 + 1 ; // 127 -> 67 -> 6 -> 60 -> 61
758+ for ( let i = 0 ; i < 100 && ( firstArchive - 10 * i ) > 0 ; i ++ ) {
759+ fetchNextVersion ( firstArchive - 10 * i ) ;
680760 }
681761 } , callback ) ;
682762
683- // give up after a timeout
684- timeout = setTimeout ( ( ) => finish ( onVersion ) , TIMEOUT ) ;
763+ // and set an initial timeout for failed connections
764+ timeout = setTimeout ( finish , TIMEOUT * 4 ) ;
685765}
686766
0 commit comments