11import { Kernel , KernelMessage } from '@jupyterlab/services' ;
22import * as decoding from 'lib0/decoding' ;
33import * as encoding from 'lib0/encoding' ;
4+ import {
5+ Awareness ,
6+ applyAwarenessUpdate ,
7+ encodeAwarenessUpdate
8+ } from 'y-protocols/awareness' ;
49import * as syncProtocol from 'y-protocols/sync' ;
510import * as Y from 'yjs' ;
611import { IDisposable } from '@lumino/disposable' ;
@@ -9,17 +14,45 @@ export enum YMessageType {
914 SYNC = 0 ,
1015 AWARENESS = 1
1116}
17+
18+ export interface IYCommProviderOptions {
19+ comm : Kernel . IComm ;
20+ ydoc : Y . Doc ;
21+ /**
22+ * If omitted, a new Awareness is created for this doc.
23+ * When the UI is backed by a shared Y doc (e.g. @jupyter/ydoc), pass that
24+ * document’s Awareness so comm traffic matches the rest of the session.
25+ */
26+ awareness ?: Awareness ;
27+ }
28+
1229export class YCommProvider implements IDisposable {
13- constructor ( options : { comm : Kernel . IComm ; ydoc : Y . Doc } ) {
30+ constructor ( options : IYCommProviderOptions ) {
1431 this . _comm = options . comm ;
1532 this . _ydoc = options . ydoc ;
33+
34+ if ( options . awareness ) {
35+ this . _awareness = options . awareness ;
36+ this . _ownsAwareness = false ;
37+ } else {
38+ this . _awareness = new Awareness ( this . _ydoc ) ;
39+ this . _ownsAwareness = true ;
40+ }
41+
1642 this . _ydoc . on ( 'update' , this . _updateHandler ) ;
43+ this . _awareness . on ( 'update' , this . _awarenessUpdateHandler ) ;
44+
1745 this . _connect ( ) ;
1846 }
1947
2048 get doc ( ) : Y . Doc {
2149 return this . _ydoc ;
2250 }
51+
52+ get awareness ( ) : Awareness {
53+ return this . _awareness ;
54+ }
55+
2356 get synced ( ) : boolean {
2457 return this . _synced ;
2558 }
@@ -38,9 +71,15 @@ export class YCommProvider implements IDisposable {
3871 if ( this . _isDisposed ) {
3972 return ;
4073 }
74+ this . _ydoc . off ( 'update' , this . _updateHandler ) ;
75+ this . _awareness . off ( 'update' , this . _awarenessUpdateHandler ) ;
76+ if ( this . _ownsAwareness ) {
77+ this . _awareness . destroy ( ) ;
78+ }
4179 this . _comm . close ( ) ;
4280 this . _isDisposed = true ;
4381 }
82+
4483 private _onMsg = ( msg : KernelMessage . ICommMsgMsg < 'iopub' | 'shell' > ) => {
4584 if ( msg . buffers ) {
4685 const buffer = msg . buffers [ 0 ] as ArrayBuffer ;
@@ -54,13 +93,31 @@ export class YCommProvider implements IDisposable {
5493 }
5594 } ;
5695
57- private _updateHandler = ( update , origin ) => {
96+ private _updateHandler = ( update : Uint8Array , origin : unknown ) => {
5897 const encoder = encoding . createEncoder ( ) ;
5998 encoding . writeVarUint ( encoder , YMessageType . SYNC ) ;
6099 syncProtocol . writeUpdate ( encoder , update ) ;
61100 this . _sendOverComm ( encoding . toUint8Array ( encoder ) ) ;
62101 } ;
63102
103+ private _awarenessUpdateHandler = ( change : {
104+ added : number [ ] ;
105+ updated : number [ ] ;
106+ removed : number [ ] ;
107+ } ) => {
108+ const { added, updated, removed } = change ;
109+ const clients = added . concat ( updated , removed ) ;
110+ if ( clients . length === 0 ) {
111+ return ;
112+ }
113+
114+ const encoder = encoding . createEncoder ( ) ;
115+ encoding . writeVarUint ( encoder , YMessageType . AWARENESS ) ;
116+ const awarenessBody = encodeAwarenessUpdate ( this . _awareness , clients ) ;
117+ encoding . writeVarUint8Array ( encoder , awarenessBody ) ;
118+ this . _sendOverComm ( encoding . toUint8Array ( encoder ) ) ;
119+ } ;
120+
64121 private _connect ( ) {
65122 this . _sync ( ) ;
66123 this . _comm . onMsg = this . _onMsg ;
@@ -79,6 +136,8 @@ export class YCommProvider implements IDisposable {
79136
80137 private _comm : Kernel . IComm ;
81138 private _ydoc : Y . Doc ;
139+ private _awareness : Awareness ;
140+ private _ownsAwareness : boolean ;
82141 private _synced : boolean ;
83142 private _isDisposed = false ;
84143}
@@ -106,6 +165,7 @@ namespace Private {
106165 provider . synced = true ;
107166 }
108167 }
168+
109169 export function readMessage (
110170 provider : YCommProvider ,
111171 buf : Uint8Array ,
@@ -115,10 +175,17 @@ namespace Private {
115175 const encoder = encoding . createEncoder ( ) ;
116176 const messageType = decoding . readVarUint ( decoder ) ;
117177
118- if ( messageType === YMessageType . SYNC ) {
119- syncMessageHandler ( encoder , decoder , provider , emitSynced ) ;
120- } else {
121- console . error ( 'Unable to compute message' ) ;
178+ switch ( messageType ) {
179+ case YMessageType . SYNC :
180+ syncMessageHandler ( encoder , decoder , provider , emitSynced ) ;
181+ break ;
182+ case YMessageType . AWARENESS : {
183+ const awarenessUpdate = decoding . readVarUint8Array ( decoder ) ;
184+ applyAwarenessUpdate ( provider . awareness , awarenessUpdate , null ) ;
185+ break ;
186+ }
187+ default :
188+ console . error ( 'Unable to compute message' ) ;
122189 }
123190 return encoder ;
124191 }
0 commit comments