Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"scripts": {
"ng": "ng",
"start": "ng serve demo",
"start-ws": "npx ts-node ./backend/src/index",
"start-ws": "npx ts-node ./src/backend-index",
"build": "npm run build:packages && npm run sync-styles",
"build:packages": "ts-node --project ./scripts/tsconfig.json ./scripts/build-packages.ts",
"build:demo": "ng build demo --configuration production",
Expand Down
2 changes: 1 addition & 1 deletion packages/state/src/shared/yjs-table.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AITable } from "@ai-table/grid";
import { AITable } from '@ai-table/grid';

const IS_LOCAL: WeakSet<any> = new WeakSet();
const IS_REMOTE: WeakSet<any> = new WeakSet();
Expand Down
9 changes: 6 additions & 3 deletions src/app/component/common/content/content.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class DemoTableContent {
}

ngOnInit(): void {
if (this.tableService.sharedType) {
if (this.tableService.hasSharedAITable()) {
this.tableService.buildRenderRecords();
this.tableService.buildRenderFields();
} else {
Expand Down Expand Up @@ -163,10 +163,13 @@ export class DemoTableContent {
}

onChange(options: AITableChangeOptions) {
if (this.tableService.sharedType) {
if (this.tableService.getSharedAITable()) {
if (!YjsAITable.isRemote(this.aiTable) && !YjsAITable.isUndo(this.aiTable)) {
// 本地的修改需要等 -> yjs -> promise period -> sync
// 本地的修改需要等 -> yjs -> promise period -> observerDeep(同步) -> 回调中 isLocal 就是 false 了
// 因此 observerDeep 不可以等 promise period
YjsAITable.asLocal(this.aiTable, () => {
applyActionOps(this.tableService.sharedType!, options.actions, this.aiTable);
applyActionOps(this.tableService.getSharedAITable(), options.actions, this.aiTable);
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/app/component/table.component.html
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<div class="d-flex align-items-center mt-5">
<input thyInput [(ngModel)]="room" [disabled]="!!tableService.provider" placeholder="请输入房间号" style="width: 300px" />
<input thyInput [(ngModel)]="roomId" [disabled]="!!tableService.provider" placeholder="请输入房间号" style="width: 300px" />
<a
thyAction
class="ml-2"
Expand Down
4 changes: 2 additions & 2 deletions src/app/component/table.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const initViews = [
export class DemoTable implements OnInit, AfterViewInit, OnDestroy {
provider!: WebsocketProvider | null;

room = 'share-demo-action-1';
roomId = 'share-demo-action-1';

router = inject(Router);

Expand Down Expand Up @@ -76,7 +76,7 @@ export class DemoTable implements OnInit, AfterViewInit, OnDestroy {
}

handleShared() {
this.tableService.handleShared(this.room);
this.tableService.handleShared(this.roomId);
}

updateValue() {
Expand Down
68 changes: 68 additions & 0 deletions src/app/live-feed/bin/server-sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import * as encoding from 'lib0/encoding';
import * as decoding from 'lib0/decoding';
import * as Y from 'yjs';
import { ServerLiveFeedRoom } from './utils';
import { LiveFeedObject } from '../feed-object';

export const messageYjsSyncStep1 = 0;
export const messageYjsSyncStep2 = 1;
export const messageYjsUpdate = 2;

export const writeSyncStep2 = (decoder: decoding.Decoder, encoder: encoding.Encoder, room: ServerLiveFeedRoom, step1Ref?: string[]) => {
encoding.writeVarUint(encoder, messageYjsSyncStep2);

while (decoding.hasContent(decoder)) {
const guid = decoding.readVarString(decoder);
step1Ref?.push(guid);
if (!room.hasObject(guid)) {

}
const diffUpdate = Y.encodeStateAsUpdate(room.getObject(guid), decoding.readVarUint8Array(decoder));
if (diffUpdate.length > 0) {
encoding.writeVarString(encoder, guid);
encoding.writeVarUint8Array(encoder, diffUpdate);
}
}
};

export const readSyncStep1 = (decoder: decoding.Decoder, encoder: encoding.Encoder, room: ServerLiveFeedRoom, step1Ref?: string[]) => {
writeSyncStep2(decoder, encoder, room, step1Ref);
};

export const readSyncStep2 = (decoder: decoding.Decoder, room: ServerLiveFeedRoom, transactionOrigin: any) => {
try {
while (decoding.hasContent(decoder)) {
const guid = decoding.readVarString(decoder);
Y.applyUpdate(room.getObject(guid), decoding.readVarUint8Array(decoder), transactionOrigin);
}
} catch (error) {
// This catches errors that are thrown by event handlers
console.error('Caught error while handling a Yjs update', error);
}
};

export const readUpdate = readSyncStep2;

export const readSyncMessage = (
decoder: decoding.Decoder,
encoder: encoding.Encoder,
room: ServerLiveFeedRoom,
transactionOrigin: any,
step1Ref?: string[]
) => {
const messageType = decoding.readVarUint(decoder);
switch (messageType) {
case messageYjsSyncStep1:
readSyncStep1(decoder, encoder, room, step1Ref);
break;
case messageYjsSyncStep2:
readSyncStep2(decoder, room, transactionOrigin);
break;
case messageYjsUpdate:
readUpdate(decoder, room, transactionOrigin);
break;
default:
throw new Error('Unknown message type');
}
return messageType;
};
165 changes: 165 additions & 0 deletions src/app/live-feed/bin/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import * as Y from 'yjs';
import * as Websocket from 'ws';
const syncProtocol = require('y-protocols/sync');
const awarenessProtocol = require('y-protocols/awareness');
import { IncomingMessage, request } from 'http';
import { ObservableV2 } from 'lib0/observable';
import * as encoding from 'lib0/encoding';
import * as decoding from 'lib0/decoding';
import * as map from 'lib0/map';
import { LiveFeedObjectUpdate, LiveFeedRoom } from '../feed-room';
import { messageYjsSyncStep1, messageYjsUpdate, readSyncMessage } from './server-sync';
import { LiveFeedObject } from '../feed-object';

const wsReadyStateConnecting = 0;
const wsReadyStateOpen = 1;

/**
* @type {Map<string,WSSharedDoc>}
*/
export const docs = new Map();

const messageSync = 0;
const messageAwareness = 1;
// const messageAuth = 2

export class ServerLiveFeedRoom extends LiveFeedRoom {
conns: Map<any, Set<any>> = new Map();

constructor(options: { roomId: string; objects: LiveFeedObject[] }) {
super(options);
}
}

export const getRoom = (roomId: string) => {
return map.setIfUndefined(docs, roomId, () => {
const object = new LiveFeedObject({ guid: roomId, typeName: 'ai-table' });
const serverRoom = new ServerLiveFeedRoom({ roomId: roomId, objects: [object] });
docs.set(roomId, serverRoom);
serverRoom.on('update', (updates: LiveFeedObjectUpdate[]) => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
encoding.writeVarUint(encoder, messageYjsUpdate);
updates.forEach((update) => {
encoding.writeVarString(encoder, update.guid);
encoding.writeVarUint8Array(encoder, update.update);
});
const message = encoding.toUint8Array(encoder)
serverRoom.conns.forEach((_, conn) => send(serverRoom, conn, message))
});
return serverRoom;
});
};

const messageListener = (conn: Websocket, room: ServerLiveFeedRoom, message: Uint8Array) => {
try {
const encoder = encoding.createEncoder();
const decoder = decoding.createDecoder(message);
const messageType = decoding.readVarUint(decoder);
switch (messageType) {
case messageSync:
encoding.writeVarUint(encoder, messageSync);
const step1Ref: string[] = [];
const syncMessageType = readSyncMessage(decoder, encoder, room, conn, step1Ref);

// If the `encoder` only contains the type of reply message and no
// message, there is no need to send the message. When `encoder` only
// contains the type of reply, its length is 1.
if (encoding.length(encoder) > 1) {
send(room, conn, encoding.toUint8Array(encoder));
}
if (syncMessageType === messageYjsSyncStep1 && step1Ref.length > 0) {
syncStep1(conn, room, step1Ref);
}
break;
}
} catch (err) {
console.error(err);
// @ts-ignore
doc.emit('error', [err]);
}
};

const syncStep1 = (conn: Websocket, room: ServerLiveFeedRoom, guids: string[]) => {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, messageSync);
encoding.writeVarUint(encoder, messageYjsSyncStep1);
guids.forEach((guid: string) => {
const object = room.getObject(guid);
encoding.writeVarString(encoder, object.guid);
const sv = Y.encodeStateVector(object);
encoding.writeVarUint8Array(encoder, sv);
});
conn.send(encoding.toUint8Array(encoder));
};

/**
* @param {WSSharedDoc} doc
* @param {any} conn
*/
const closeConn = (room: ServerLiveFeedRoom, conn: Websocket) => {
if (room.conns.has(conn)) {
room.conns.delete(conn);
}
conn.close();
};

/**
* @param {WSSharedDoc} doc
* @param {import('ws').WebSocket} conn
* @param {Uint8Array} m
*/
const send = (room: ServerLiveFeedRoom, conn: Websocket, m: Uint8Array) => {
if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) {
closeConn(room, conn);
}
try {
conn.send(m, {}, (err) => {
err != null && closeConn(room, conn);
});
} catch (e) {
closeConn(room, conn);
}
};

const pingTimeout = 30000;

export const setupWSConnection = (
conn: Websocket,
req: IncomingMessage,
{ roomId = (req.url || '').slice(1).split('?')[0], gc = true } = {}
) => {
conn.binaryType = 'arraybuffer';
// get doc, initialize if it does not exist yet
const room = getRoom(roomId);
room.conns.set(conn, new Set());
// listen and reply to events
conn.on('message', /** @param {ArrayBuffer} message */ (message: ArrayBuffer) => messageListener(conn, room, new Uint8Array(message)));

// Check if connection is still alive
let pongReceived = true;
const pingInterval = setInterval(() => {
if (!pongReceived) {
if (room.conns.has(conn)) {
closeConn(room, conn);
}
clearInterval(pingInterval);
} else if (room.conns.has(conn)) {
pongReceived = false;
try {
conn.ping();
} catch (e) {
closeConn(room, conn);
clearInterval(pingInterval);
}
}
}, pingTimeout);
conn.on('close', () => {
closeConn(room, conn);
clearInterval(pingInterval);
});
conn.on('pong', () => {
pongReceived = true;
});
// 不主动发送 step1 ,因为不确定都有哪些文档需要同步,server 的文档数量和 client 端的有所不同
};
20 changes: 20 additions & 0 deletions src/app/live-feed/feed-object.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import * as Y from 'yjs';

export const convertYDocToFeedObject = (yDoc: Y.Doc, typeName: string) => {
const feedObject = yDoc as LiveFeedObject;
feedObject.typeName = typeName;
return feedObject;
};

export class LiveFeedObject extends Y.Doc {
typeName?: string;

constructor(options: { guid?: string; typeName: string }) {
super({ guid: options.guid });
this.typeName = options.typeName;
}

// 检测数据变化,如果发现新增文档则需要将新增的文档转换为 feed-object (调用 convertYDocToFeedObject)
// 然后调用 room 的 addObject 方法,将对象添加到房间内,整体管理
}

Loading