Skip to content

Commit 70dc0c6

Browse files
committed
fix: flock adaptor should only upload updates made by the current peer
1 parent 278c30c commit 70dc0c6

5 files changed

Lines changed: 96 additions & 112 deletions

File tree

packages/loro-adaptors/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"loro-protocol": "workspace:*"
1616
},
1717
"peerDependencies": {
18-
"@loro-dev/flock": "^2.1.0",
18+
"@loro-dev/flock": "^3.1.0",
1919
"loro-crdt": "^1.9.0",
2020
"yjs": "*"
2121
},

packages/loro-adaptors/src/flock-adaptor.ts

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,6 @@ type FlockExportBundle = Awaited<ReturnType<Flock["exportJson"]>>;
1313
const encoder = new TextEncoder();
1414
const decoder = new TextDecoder();
1515

16-
function cloneVersion(version: FlockVersion | undefined): FlockVersion {
17-
const next: FlockVersion = {};
18-
if (!version) return next;
19-
for (const [peer, entry] of Object.entries(version)) {
20-
if (!entry) continue;
21-
const { logicalCounter, physicalTime } = entry;
22-
if (!Number.isFinite(logicalCounter ?? NaN)) continue;
23-
next[peer] = {
24-
logicalCounter: Math.trunc(logicalCounter),
25-
physicalTime: Number.isFinite(physicalTime) ? physicalTime : 0,
26-
};
27-
}
28-
return next;
29-
}
30-
3116
function serializeVersion(version: FlockVersion | undefined): Uint8Array {
3217
return encoder.encode(JSON.stringify(version ?? {}));
3318
}
@@ -48,12 +33,12 @@ function deserializeVersion(bytes: Uint8Array): FlockVersion {
4833
};
4934
const logicalCounter =
5035
typeof entry.logicalCounter === "number" &&
51-
Number.isFinite(entry.logicalCounter)
36+
Number.isFinite(entry.logicalCounter)
5237
? Math.trunc(entry.logicalCounter)
5338
: 0;
5439
const physicalTime =
5540
typeof entry.physicalTime === "number" &&
56-
Number.isFinite(entry.physicalTime)
41+
Number.isFinite(entry.physicalTime)
5742
? entry.physicalTime
5843
: 0;
5944
next[key] = { logicalCounter, physicalTime };
@@ -98,7 +83,7 @@ function serializeBundle(bundle: FlockExportBundle): Uint8Array {
9883
}
9984

10085
function deserializeBundle(bytes: Uint8Array): FlockExportBundle {
101-
if (!bytes.length) return {};
86+
if (!bytes.length) return { version: 0, entries: {} };
10287
try {
10388
const parsed = JSON.parse(decoder.decode(bytes));
10489
if (parsed && typeof parsed === "object") {
@@ -107,7 +92,7 @@ function deserializeBundle(bytes: Uint8Array): FlockExportBundle {
10792
} catch {
10893
// ignore malformed payloads
10994
}
110-
return {};
95+
return { version: 0, entries: {} };
11196
}
11297

11398
export interface FlockAdaptorConfig {
@@ -138,7 +123,7 @@ export class FlockAdaptor implements CrdtDocAdaptor {
138123
constructor(flock: Flock, config: FlockAdaptorConfig = {}) {
139124
this.flock = flock;
140125
this.config = config;
141-
this.lastExportVersion = cloneVersion(this.flock.version());
126+
this.lastExportVersion = this.flock.version();
142127

143128
let resolve!: () => void;
144129
let reject!: (err: Error) => void;
@@ -170,14 +155,13 @@ export class FlockAdaptor implements CrdtDocAdaptor {
170155
this.unsubscribe();
171156
this.unsubscribe = undefined;
172157
}
173-
this.unsubscribe = this.flock.subscribe(batch => {
158+
this.unsubscribe = this.flock.subscribe(async batch => {
174159
if (this.destroyed) return;
175160
if (batch.source !== "local") return;
176161
if (!this.ctx) return;
177-
const update = serializeBundle(
178-
this.flock.exportJson(this.lastExportVersion)
179-
);
180-
this.lastExportVersion = cloneVersion(this.flock.version());
162+
const exported = await this.flock.exportJson({ from: this.lastExportVersion, peerId: this.flock.peerId() });
163+
const update = serializeBundle(exported);
164+
this.lastExportVersion = this.flock.version();
181165
this.ctx.send([update]);
182166
});
183167
}
@@ -194,22 +178,22 @@ export class FlockAdaptor implements CrdtDocAdaptor {
194178
if (this.destroyed) return;
195179
try {
196180
const serverVersion = deserializeVersion(res.version);
197-
this.initServerVersion = cloneVersion(serverVersion);
181+
this.initServerVersion = serverVersion;
198182
const comparison = compareVersions(this.flock.version(), serverVersion);
199183
if (comparison != null && comparison >= 0) {
200184
this.markReachedServerVersion();
201185
}
202186

203187
if (!res.version.length) {
204188
const snapshot = serializeBundle(this.flock.exportJson());
205-
this.lastExportVersion = cloneVersion(this.flock.version());
189+
this.lastExportVersion = (this.flock.version());
206190
this.ctx?.send([snapshot]);
207191
return;
208192
}
209193

210194
if (comparison == null || comparison === 1) {
211195
const delta = serializeBundle(this.flock.exportJson(serverVersion));
212-
this.lastExportVersion = cloneVersion(this.flock.version());
196+
this.lastExportVersion = (this.flock.version());
213197
this.ctx?.send([delta]);
214198
}
215199
} catch (error) {
@@ -232,7 +216,7 @@ export class FlockAdaptor implements CrdtDocAdaptor {
232216
this.ctx?.onImportError(err, [update]);
233217
}
234218
}
235-
this.lastExportVersion = cloneVersion(this.flock.version());
219+
this.lastExportVersion = (this.flock.version());
236220
if (this.initServerVersion && !this.hasReachedServerVersion) {
237221
const comparison = compareVersions(
238222
this.flock.version(),

packages/loro-adaptors/src/server/server-flock-adaptor.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ function serializeBundle(bundle: FlockExportBundle): Uint8Array {
1818

1919
function deserializeBundle(bytes: Uint8Array): FlockExportBundle {
2020
if (!bytes.length) {
21-
return {};
21+
return { version: 0, entries: {} };
2222
}
2323
try {
2424
const parsed = JSON.parse(decoder.decode(bytes));
@@ -28,7 +28,7 @@ function deserializeBundle(bytes: Uint8Array): FlockExportBundle {
2828
} catch {
2929
// ignore malformed payloads
3030
}
31-
return {};
31+
return { version: 0, entries: {} };
3232
}
3333

3434
function serializeVersion(version: FlockVersion | undefined): Uint8Array {

packages/loro-websocket/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
"ws": "^8.18.3"
7373
},
7474
"devDependencies": {
75-
"@loro-dev/flock": "^2.1.0",
75+
"@loro-dev/flock": "^3.1.0",
7676
"get-port": "^7.1.0",
7777
"tsdown": "^0.14.1",
7878
"tsx": "^4.20.5",
@@ -82,4 +82,4 @@
8282
"peerDependencies": {
8383
"loro-crdt": "^1.9.0"
8484
}
85-
}
85+
}

0 commit comments

Comments
 (0)