diff --git a/package.json b/package.json index 72085677..7fab9a65 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "scripts": { "ng": "ng", "start": "ng serve demo", - "start-ws": "npx ts-node ./backend/src/index", + "start-ws": "npx ts-node ./src/backend-index", "build": "npm run build:packages && npm run sync-styles", "build:packages": "ts-node --project ./scripts/tsconfig.json ./scripts/build-packages.ts", "build:demo": "ng build demo --configuration production", diff --git a/packages/state/src/shared/yjs-table.ts b/packages/state/src/shared/yjs-table.ts index a57fc056..cbafc730 100644 --- a/packages/state/src/shared/yjs-table.ts +++ b/packages/state/src/shared/yjs-table.ts @@ -1,4 +1,4 @@ -import { AITable } from "@ai-table/grid"; +import { AITable } from '@ai-table/grid'; const IS_LOCAL: WeakSet = new WeakSet(); const IS_REMOTE: WeakSet = new WeakSet(); diff --git a/src/app/component/common/content/content.component.ts b/src/app/component/common/content/content.component.ts index adfa90e0..3af258b9 100644 --- a/src/app/component/common/content/content.component.ts +++ b/src/app/component/common/content/content.component.ts @@ -124,7 +124,7 @@ export class DemoTableContent { } ngOnInit(): void { - if (this.tableService.sharedType) { + if (this.tableService.hasSharedAITable()) { this.tableService.buildRenderRecords(); this.tableService.buildRenderFields(); } else { @@ -163,10 +163,13 @@ export class DemoTableContent { } onChange(options: AITableChangeOptions) { - if (this.tableService.sharedType) { + if (this.tableService.getSharedAITable()) { if (!YjsAITable.isRemote(this.aiTable) && !YjsAITable.isUndo(this.aiTable)) { + // 本地的修改需要等 -> yjs -> promise period -> sync + // 本地的修改需要等 -> yjs -> promise period -> observerDeep(同步) -> 回调中 isLocal 就是 false 了 + // 因此 observerDeep 不可以等 promise period YjsAITable.asLocal(this.aiTable, () => { - applyActionOps(this.tableService.sharedType!, options.actions, this.aiTable); + applyActionOps(this.tableService.getSharedAITable(), options.actions, this.aiTable); }); } } diff --git a/src/app/component/table.component.html b/src/app/component/table.component.html index 9a4f2841..8af48020 100644 --- a/src/app/component/table.component.html +++ b/src/app/component/table.component.html @@ -1,5 +1,5 @@
- + { + encoding.writeVarUint(encoder, messageYjsSyncStep2); + + while (decoding.hasContent(decoder)) { + const guid = decoding.readVarString(decoder); + step1Ref?.push(guid); + if (!room.hasObject(guid)) { + + } + const diffUpdate = Y.encodeStateAsUpdate(room.getObject(guid), decoding.readVarUint8Array(decoder)); + if (diffUpdate.length > 0) { + encoding.writeVarString(encoder, guid); + encoding.writeVarUint8Array(encoder, diffUpdate); + } + } +}; + +export const readSyncStep1 = (decoder: decoding.Decoder, encoder: encoding.Encoder, room: ServerLiveFeedRoom, step1Ref?: string[]) => { + writeSyncStep2(decoder, encoder, room, step1Ref); +}; + +export const readSyncStep2 = (decoder: decoding.Decoder, room: ServerLiveFeedRoom, transactionOrigin: any) => { + try { + while (decoding.hasContent(decoder)) { + const guid = decoding.readVarString(decoder); + Y.applyUpdate(room.getObject(guid), decoding.readVarUint8Array(decoder), transactionOrigin); + } + } catch (error) { + // This catches errors that are thrown by event handlers + console.error('Caught error while handling a Yjs update', error); + } +}; + +export const readUpdate = readSyncStep2; + +export const readSyncMessage = ( + decoder: decoding.Decoder, + encoder: encoding.Encoder, + room: ServerLiveFeedRoom, + transactionOrigin: any, + step1Ref?: string[] +) => { + const messageType = decoding.readVarUint(decoder); + switch (messageType) { + case messageYjsSyncStep1: + readSyncStep1(decoder, encoder, room, step1Ref); + break; + case messageYjsSyncStep2: + readSyncStep2(decoder, room, transactionOrigin); + break; + case messageYjsUpdate: + readUpdate(decoder, room, transactionOrigin); + break; + default: + throw new Error('Unknown message type'); + } + return messageType; +}; diff --git a/src/app/live-feed/bin/utils.ts b/src/app/live-feed/bin/utils.ts new file mode 100644 index 00000000..85cc090e --- /dev/null +++ b/src/app/live-feed/bin/utils.ts @@ -0,0 +1,165 @@ +import * as Y from 'yjs'; +import * as Websocket from 'ws'; +const syncProtocol = require('y-protocols/sync'); +const awarenessProtocol = require('y-protocols/awareness'); +import { IncomingMessage, request } from 'http'; +import { ObservableV2 } from 'lib0/observable'; +import * as encoding from 'lib0/encoding'; +import * as decoding from 'lib0/decoding'; +import * as map from 'lib0/map'; +import { LiveFeedObjectUpdate, LiveFeedRoom } from '../feed-room'; +import { messageYjsSyncStep1, messageYjsUpdate, readSyncMessage } from './server-sync'; +import { LiveFeedObject } from '../feed-object'; + +const wsReadyStateConnecting = 0; +const wsReadyStateOpen = 1; + +/** + * @type {Map} + */ +export const docs = new Map(); + +const messageSync = 0; +const messageAwareness = 1; +// const messageAuth = 2 + +export class ServerLiveFeedRoom extends LiveFeedRoom { + conns: Map> = new Map(); + + constructor(options: { roomId: string; objects: LiveFeedObject[] }) { + super(options); + } +} + +export const getRoom = (roomId: string) => { + return map.setIfUndefined(docs, roomId, () => { + const object = new LiveFeedObject({ guid: roomId, typeName: 'ai-table' }); + const serverRoom = new ServerLiveFeedRoom({ roomId: roomId, objects: [object] }); + docs.set(roomId, serverRoom); + serverRoom.on('update', (updates: LiveFeedObjectUpdate[]) => { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + encoding.writeVarUint(encoder, messageYjsUpdate); + updates.forEach((update) => { + encoding.writeVarString(encoder, update.guid); + encoding.writeVarUint8Array(encoder, update.update); + }); + const message = encoding.toUint8Array(encoder) + serverRoom.conns.forEach((_, conn) => send(serverRoom, conn, message)) + }); + return serverRoom; + }); +}; + +const messageListener = (conn: Websocket, room: ServerLiveFeedRoom, message: Uint8Array) => { + try { + const encoder = encoding.createEncoder(); + const decoder = decoding.createDecoder(message); + const messageType = decoding.readVarUint(decoder); + switch (messageType) { + case messageSync: + encoding.writeVarUint(encoder, messageSync); + const step1Ref: string[] = []; + const syncMessageType = readSyncMessage(decoder, encoder, room, conn, step1Ref); + + // If the `encoder` only contains the type of reply message and no + // message, there is no need to send the message. When `encoder` only + // contains the type of reply, its length is 1. + if (encoding.length(encoder) > 1) { + send(room, conn, encoding.toUint8Array(encoder)); + } + if (syncMessageType === messageYjsSyncStep1 && step1Ref.length > 0) { + syncStep1(conn, room, step1Ref); + } + break; + } + } catch (err) { + console.error(err); + // @ts-ignore + doc.emit('error', [err]); + } +}; + +const syncStep1 = (conn: Websocket, room: ServerLiveFeedRoom, guids: string[]) => { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + encoding.writeVarUint(encoder, messageYjsSyncStep1); + guids.forEach((guid: string) => { + const object = room.getObject(guid); + encoding.writeVarString(encoder, object.guid); + const sv = Y.encodeStateVector(object); + encoding.writeVarUint8Array(encoder, sv); + }); + conn.send(encoding.toUint8Array(encoder)); +}; + +/** + * @param {WSSharedDoc} doc + * @param {any} conn + */ +const closeConn = (room: ServerLiveFeedRoom, conn: Websocket) => { + if (room.conns.has(conn)) { + room.conns.delete(conn); + } + conn.close(); +}; + +/** + * @param {WSSharedDoc} doc + * @param {import('ws').WebSocket} conn + * @param {Uint8Array} m + */ +const send = (room: ServerLiveFeedRoom, conn: Websocket, m: Uint8Array) => { + if (conn.readyState !== wsReadyStateConnecting && conn.readyState !== wsReadyStateOpen) { + closeConn(room, conn); + } + try { + conn.send(m, {}, (err) => { + err != null && closeConn(room, conn); + }); + } catch (e) { + closeConn(room, conn); + } +}; + +const pingTimeout = 30000; + +export const setupWSConnection = ( + conn: Websocket, + req: IncomingMessage, + { roomId = (req.url || '').slice(1).split('?')[0], gc = true } = {} +) => { + conn.binaryType = 'arraybuffer'; + // get doc, initialize if it does not exist yet + const room = getRoom(roomId); + room.conns.set(conn, new Set()); + // listen and reply to events + conn.on('message', /** @param {ArrayBuffer} message */ (message: ArrayBuffer) => messageListener(conn, room, new Uint8Array(message))); + + // Check if connection is still alive + let pongReceived = true; + const pingInterval = setInterval(() => { + if (!pongReceived) { + if (room.conns.has(conn)) { + closeConn(room, conn); + } + clearInterval(pingInterval); + } else if (room.conns.has(conn)) { + pongReceived = false; + try { + conn.ping(); + } catch (e) { + closeConn(room, conn); + clearInterval(pingInterval); + } + } + }, pingTimeout); + conn.on('close', () => { + closeConn(room, conn); + clearInterval(pingInterval); + }); + conn.on('pong', () => { + pongReceived = true; + }); + // 不主动发送 step1 ,因为不确定都有哪些文档需要同步,server 的文档数量和 client 端的有所不同 +}; diff --git a/src/app/live-feed/feed-object.ts b/src/app/live-feed/feed-object.ts new file mode 100644 index 00000000..99330e3d --- /dev/null +++ b/src/app/live-feed/feed-object.ts @@ -0,0 +1,20 @@ +import * as Y from 'yjs'; + +export const convertYDocToFeedObject = (yDoc: Y.Doc, typeName: string) => { + const feedObject = yDoc as LiveFeedObject; + feedObject.typeName = typeName; + return feedObject; +}; + +export class LiveFeedObject extends Y.Doc { + typeName?: string; + + constructor(options: { guid?: string; typeName: string }) { + super({ guid: options.guid }); + this.typeName = options.typeName; + } + + // 检测数据变化,如果发现新增文档则需要将新增的文档转换为 feed-object (调用 convertYDocToFeedObject) + // 然后调用 room 的 addObject 方法,将对象添加到房间内,整体管理 +} + diff --git a/src/app/live-feed/feed-provider.ts b/src/app/live-feed/feed-provider.ts new file mode 100644 index 00000000..61e7b9ce --- /dev/null +++ b/src/app/live-feed/feed-provider.ts @@ -0,0 +1,216 @@ +import { LiveFeedObject } from './feed-object'; +import { Observable } from 'lib0/observable'; +import * as url from 'lib0/url'; +import * as math from 'lib0/math'; +import * as time from 'lib0/time'; +import * as encoding from 'lib0/encoding'; +import * as decoding from 'lib0/decoding'; +import * as syncProtocol from 'y-protocols/sync'; +import * as Y from 'yjs'; +import { messageYjsSyncStep1, messageYjsUpdate, readSyncMessage, writeUpdate } from './sync'; +import { LiveFeedObjectUpdate, LiveFeedRoom } from './feed-room'; + +export const messageSync = 0; +export const messageQueryAwareness = 3; +export const messageAwareness = 1; +export const messageAuth = 2; + +export interface LiveFeedProviderOptions { + params: { [x: string]: string }; + WebSocketPolyfill: typeof WebSocket; + connect: boolean; +} + +export class LiveFeedProvider extends Observable { + room: LiveFeedRoom; + options: LiveFeedProviderOptions; + serverUrl: string; + + shouldConnect = false; + hasConnected = false; + connecting = false; + unsuccessfulReconnects = 0; + lastMessageReceived = 0; + + #synced = false; + + ws?: WebSocket | null; + + constructor( + room: LiveFeedRoom, + serverUrl: string, + options: LiveFeedProviderOptions = { params: {}, WebSocketPolyfill: WebSocket, connect: true } + ) { + super(); + this.room = room; + this.options = options; + this.serverUrl = serverUrl; + this.shouldConnect = options.connect; + + this.room.on('update', (updates: LiveFeedObjectUpdate[]) => { + const pendingUpdates = updates.filter((value) => value.origin !== this); + if (pendingUpdates.length > 0) { + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + encoding.writeVarUint(encoder, messageYjsUpdate); + pendingUpdates.forEach((update) => { + encoding.writeVarString(encoder, update.guid); + encoding.writeVarUint8Array(encoder, update.update); + }); + if (this.hasConnected && this.ws && this.ws.readyState === this.ws.OPEN) { + this.ws.send(encoding.toUint8Array(encoder)); + } + } + }); + if (options.connect) { + this.connect(); + } + } + + get url() { + const encodedParams = url.encodeQueryParams(this.options.params); + return this.serverUrl + '/' + this.room.roomId + (encodedParams.length === 0 ? '' : '?' + encodedParams); + } + + connect() { + console.log('connect'); + this.shouldConnect = true; + if (!this.hasConnected && !this.ws) { + setupWS(this); + } + } + + get synced() { + return this.#synced; + } + + set synced(state) { + if (this.#synced !== state) { + this.#synced = state; + this.emit('synced', [state]); + } + } + + disconnect() { + this.shouldConnect = false; + if (this.ws) { + this.ws.close(); + } + } +} + +const reconnectTimeoutBase = 1200; +const maxReconnectTimeout = 12000; +const messageReconnectTimeout = 60000; + +const setupWS = (provider: LiveFeedProvider) => { + if (provider.shouldConnect && !provider.ws) { + console.log(provider.url); + const websocket = new provider.options.WebSocketPolyfill(provider.url); + websocket.binaryType = 'arraybuffer'; + provider.ws = websocket; + provider.connecting = true; + provider.hasConnected = false; + provider.synced = false; + + websocket.onmessage = (event) => { + provider.lastMessageReceived = time.getUnixTime(); + const encoder = readMessage(provider, new Uint8Array(event.data), true); + if (encoding.length(encoder) > 1) { + websocket.send(encoding.toUint8Array(encoder)); + } + }; + + websocket.onclose = () => { + provider.ws = null; + provider.connecting = false; + if (provider.hasConnected) { + provider.hasConnected = false; + provider.synced = false; + provider.emit('status', [ + { + status: 'disconnected' + } + ]); + } else { + provider.unsuccessfulReconnects++; + } + // Start with no reconnect timeout and increase timeout by + // log10(unsuccessfulReconnects). + // The idea is to increase reconnect timeout slowly and have no reconnect + // timeout at the beginning (log(1) = 0) + setTimeout( + setupWS, + math.min(math.log10(provider.unsuccessfulReconnects + 1) * reconnectTimeoutBase, maxReconnectTimeout), + provider + ); + }; + + websocket.onopen = () => { + provider.lastMessageReceived = time.getUnixTime(); + provider.connecting = false; + provider.hasConnected = true; + provider.unsuccessfulReconnects = 0; + provider.emit('status', [ + { + status: 'connected' + } + ]); + syncStep1(provider); + const _syncInterval = setInterval(() => { + if (!provider.synced && provider.hasConnected) { + syncStep1(provider); + } else { + clearInterval(_syncInterval); + } + }, 1000); + }; + + provider.emit('status', [ + { + status: 'connecting' + } + ]); + } +}; + +const syncStep1 = (provider: LiveFeedProvider) => { + // always send sync step 1 when connected + const encoder = encoding.createEncoder(); + encoding.writeVarUint(encoder, messageSync); + encoding.writeVarUint(encoder, messageYjsSyncStep1); + provider.room.objects.forEach((value: LiveFeedObject) => { + encoding.writeVarString(encoder, value.guid); + const sv = Y.encodeStateVector(value); + encoding.writeVarUint8Array(encoder, sv); + }); + provider.ws?.send(encoding.toUint8Array(encoder)); +}; + +const messageHandlers: any[] = []; +messageHandlers[messageSync] = (encoder: encoding.Encoder, decoder: decoding.Decoder, provider: LiveFeedProvider, emitSynced: boolean) => { + encoding.writeVarUint(encoder, messageSync); + const syncMessageType = readSyncMessage(decoder, encoder, provider.room, provider); + if (emitSynced && syncMessageType === syncProtocol.messageYjsSyncStep2 && !provider.synced) { + provider.synced = true; + } +}; + +/** + * @param {WebsocketProvider} provider + * @param {Uint8Array} buf + * @param {boolean} emitSynced + * @return {encoding.Encoder} + */ +const readMessage = (provider: LiveFeedProvider, buf: Uint8Array, emitSynced: boolean) => { + const decoder = decoding.createDecoder(buf); + const encoder = encoding.createEncoder(); + const messageType = decoding.readVarUint(decoder); + const messageHandler = messageHandlers[messageType]; + if (/** @type {any} */ messageHandler) { + messageHandler(encoder, decoder, provider, emitSynced, messageType); + } else { + console.error('Unable to compute message'); + } + return encoder; +}; diff --git a/src/app/live-feed/feed-room.ts b/src/app/live-feed/feed-room.ts new file mode 100644 index 00000000..c774cb98 --- /dev/null +++ b/src/app/live-feed/feed-room.ts @@ -0,0 +1,88 @@ +import * as Y from 'yjs'; +import { Observable } from 'lib0/observable'; +import { LiveFeedObject } from './feed-object'; + +export class LiveFeedRoom extends Observable { + objects: Map = new Map(); + roomId: string; + + #pendingUpdates: LiveFeedObjectUpdate[] = []; + + #isPendingUpdate = false; + + constructor(options: { roomId: string; objects: LiveFeedObject[] }) { + super(); + this.roomId = options.roomId; + this.initObjects(options.objects); + } + + initObjects(objects: LiveFeedObject[]) { + objects.forEach((object: LiveFeedObject) => { + this.addObject(object); + }); + } + + addObject(object: LiveFeedObject) { + if (!object.typeName) { + throw new Error('can not resolve typeName'); + } + this.objects.set(object.guid, object); + object.getMap(object.typeName).observeDeep((events: Array>, transaction: Y.Transaction) => { + this.emit('change', [{ + events, + guid: object.guid, + transaction + }]); + }); + object.on('update', this.#emitObjectUpdate); + } + + #emitObjectUpdate = (update: Uint8Array, origin: any, doc: Y.Doc, transaction: Y.Transaction) => { + this.#pendingUpdates.push({ + update, + origin, + transaction, + guid: doc.guid + }); + if (!this.#isPendingUpdate) { + this.#isPendingUpdate = true; + Promise.resolve().then(() => { + this.emit('update', [this.#pendingUpdates]); + this.#pendingUpdates = []; + this.#isPendingUpdate = false; + }); + } + }; + + removeObject(object: LiveFeedObject) { + this.objects.delete(object.guid); + object.off('update', this.#emitObjectUpdate); + object.destroy(); + } + + getObject(guid: string) { + const object = this.objects.get(guid); + if (!object) { + throw new Error(`can not resolve feed object by guid: ${guid}`); + } + return object; + } + + hasObject(guid: string) { + const object = this.objects.get(guid); + return !!object; + } +} + +export interface LiveFeedObjectUpdate { + update: Uint8Array; + origin: any; + transaction: Y.Transaction; + guid: string; +} + +export interface LiveFeedObjectChange { + events: Y.YEvent[]; + transaction: Y.Transaction; + guid: string; +} diff --git a/src/app/live-feed/public-api.ts b/src/app/live-feed/public-api.ts new file mode 100644 index 00000000..0e21b949 --- /dev/null +++ b/src/app/live-feed/public-api.ts @@ -0,0 +1,9 @@ +/* + * Public API Surface of live-feed + */ + +export * from './sync'; +export * from './feed-object'; +export * from './feed-provider'; +export * from './feed-room'; +export * from './bin/utils'; diff --git a/src/app/live-feed/sync.ts b/src/app/live-feed/sync.ts new file mode 100644 index 00000000..1db30e61 --- /dev/null +++ b/src/app/live-feed/sync.ts @@ -0,0 +1,62 @@ +import * as encoding from 'lib0/encoding'; +import * as decoding from 'lib0/decoding'; +import * as Y from 'yjs'; +import { LiveFeedRoom } from './feed-room'; + +export const messageYjsSyncStep1 = 0; +export const messageYjsSyncStep2 = 1; +export const messageYjsUpdate = 2; + +export const writeSyncStep2 = (decoder: decoding.Decoder, encoder: encoding.Encoder, room: LiveFeedRoom) => { + encoding.writeVarUint(encoder, messageYjsSyncStep2); + + while (decoding.hasContent(decoder)) { + const guid = decoding.readVarString(decoder); + const diffUpdate = Y.encodeStateAsUpdate(room.getObject(guid), decoding.readVarUint8Array(decoder)); + if (diffUpdate.length > 0) { + encoding.writeVarString(encoder, guid); + encoding.writeVarUint8Array(encoder, diffUpdate); + } + } +}; + +export const readSyncStep1 = (decoder: decoding.Decoder, encoder: encoding.Encoder, room: LiveFeedRoom) => { + writeSyncStep2(decoder, encoder, room); +}; + +export const readSyncStep2 = (decoder: decoding.Decoder, room: LiveFeedRoom, transactionOrigin: any) => { + try { + while (decoding.hasContent(decoder)) { + const guid = decoding.readVarString(decoder); + Y.applyUpdate(room.getObject(guid), decoding.readVarUint8Array(decoder), transactionOrigin); + } + } catch (error) { + // This catches errors that are thrown by event handlers + console.error('Caught error while handling a Yjs update', error); + } +}; + +export const readUpdate = readSyncStep2; + +export const readSyncMessage = (decoder: decoding.Decoder, encoder: encoding.Encoder, room: LiveFeedRoom, transactionOrigin: any) => { + const messageType = decoding.readVarUint(decoder); + switch (messageType) { + case messageYjsSyncStep1: + readSyncStep1(decoder, encoder, room); + break; + case messageYjsSyncStep2: + readSyncStep2(decoder, room, transactionOrigin); + break; + case messageYjsUpdate: + readUpdate(decoder, room, transactionOrigin); + break; + default: + throw new Error('Unknown message type'); + } + return messageType; +}; + +export const writeUpdate = (encoder: encoding.Encoder, update: Uint8Array) => { + encoding.writeVarUint(encoder, messageYjsUpdate); + encoding.writeVarUint8Array(encoder, update); +}; diff --git a/src/app/provider.ts b/src/app/provider.ts deleted file mode 100644 index 6a585655..00000000 --- a/src/app/provider.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { WebsocketProvider } from 'y-websocket'; -import * as Y from 'yjs'; - -export const getProvider = (doc: Y.Doc, room: string, isDev: boolean) => { - // 在线地址:wss://demos.yjs.dev/ws - const prodUrl = `ws${location.protocol.slice(4)}//${location.host}/collaboration`; - const devUrl = `ws${location.protocol.slice(4)}//${location.hostname}:3000`; - const provider = new WebsocketProvider(isDev ? devUrl : prodUrl, room, doc); - return provider; -}; \ No newline at end of file diff --git a/src/app/service/table.service.ts b/src/app/service/table.service.ts index 349facaf..a8d1f9bb 100644 --- a/src/app/service/table.service.ts +++ b/src/app/service/table.service.ts @@ -1,20 +1,11 @@ -import { - AITableView, - AITableViewFields, - AITableViewRecords, - AIViewTable, - applyYjsEvents, - createSharedType, - initSharedType, - initTable, - SharedType, - YjsAITable -} from '@ai-table/state'; +import { AITableView, AITableViewFields, AITableViewRecords, AIViewTable, applyYjsEvents, initTable, YjsAITable } from '@ai-table/state'; import { computed, inject, Injectable, isDevMode, signal, WritableSignal } from '@angular/core'; import { Router } from '@angular/router'; -import { WebsocketProvider } from 'y-websocket'; -import { getProvider } from '../provider'; -import { getDefaultValue, sortDataByView } from '../utils/utils'; +import { createFeedRoom, getDefaultValue, sortDataByView } from '../utils/utils'; +import { LiveFeedProvider } from '../live-feed/feed-provider'; +import { LiveFeedObjectChange, LiveFeedRoom } from '../live-feed/feed-room'; +import * as Y from 'yjs'; +import { initSharedType } from '../utils/shared'; export const LOCAL_STORAGE_KEY = 'ai-table-active-view-id'; @@ -30,9 +21,13 @@ export class TableService { aiTable!: AIViewTable; - provider!: WebsocketProvider | null; + provider!: LiveFeedProvider | null; - sharedType!: SharedType | null; + feedRoom!: LiveFeedRoom | null; + + tableDoc!: Y.Doc; + + recordDocs!: Y.Doc[]; activeViewId: WritableSignal = signal(''); @@ -63,49 +58,73 @@ export class TableService { this.fields = signal(sortDataByView(fields ?? this.fields(), this.activeViewId()) as AITableViewFields); } - handleShared(room: string) { + handleShared(roomId: string) { if (this.provider) { this.disconnect(); return; } - let isInitialized = false; - if (!this.sharedType) { - this.sharedType = createSharedType(); - this.sharedType.observeDeep((events: any) => { + if (!this.feedRoom) { + const { feedRoom, tableDoc, recordDocs } = createFeedRoom(roomId); + this.tableDoc = tableDoc; + this.feedRoom = feedRoom; + this.recordDocs = recordDocs; + this.feedRoom?.on('change', (change: LiveFeedObjectChange) => { if (!YjsAITable.isLocal(this.aiTable)) { if (!isInitialized) { - const data = initTable(this.sharedType!); + const data = initTable(this.getSharedAITable()); this.views.set(data.views); this.buildRenderFields(data.fields); this.buildRenderRecords(data.records); isInitialized = true; } else { - applyYjsEvents(this.aiTable, this.sharedType!, events); + applyYjsEvents(this.aiTable, this.getSharedAITable(), change.events); } } }); } - this.provider = getProvider(this.sharedType.doc!, room, isDevMode()); - this.provider.connect(); + this.provider = getProvider(this.feedRoom!, isDevMode()); this.provider.once('synced', () => { - if (this.provider!.synced && [...this.sharedType!.doc!.store.clients.keys()].length === 0) { + console.log('synced'); + if (this.provider!.synced && [...this.feedRoom!.getObject(roomId).store.clients.keys()].length === 0) { console.log('init shared type'); const value = getDefaultValue(); - initSharedType(this.sharedType!.doc!, { + const { recordDocs } = initSharedType(this.feedRoom!.getObject(roomId), { records: value.records, fields: value.fields, views: this.views() }); + recordDocs.forEach((value) => { + this.feedRoom?.addObject(value); + }); } }); } + getSharedAITable() { + const feedObject = this.feedRoom!.getObject(this.feedRoom!.roomId); + const sharedType = feedObject.getMap('ai-table'); + return sharedType; + } + + hasSharedAITable() { + const feedObject = this.feedRoom && this.feedRoom!.getObject(this.feedRoom!.roomId); + return !!feedObject; + } + disconnect() { if (this.provider) { this.provider.disconnect(); this.provider = null; - this.sharedType = null; + this.feedRoom = null; } } } + +export const getProvider = (room: LiveFeedRoom, isDev: boolean) => { + // 在线地址:wss://demos.yjs.dev/ws + const prodUrl = `ws${location.protocol.slice(4)}//${location.host}/collaboration`; + const devUrl = `ws${location.protocol.slice(4)}//${location.hostname}:3000`; + const provider = new LiveFeedProvider(room, isDev ? devUrl : prodUrl); + return provider; +}; diff --git a/src/app/utils/shared.ts b/src/app/utils/shared.ts new file mode 100644 index 00000000..9783d416 --- /dev/null +++ b/src/app/utils/shared.ts @@ -0,0 +1,71 @@ +import { AITableViewFields, AITableViewRecord, AITableViewRecords, AITableViews, toSyncElement } from '@ai-table/state'; +import * as Y from 'yjs'; +import { LiveFeedObject } from '../live-feed/feed-object'; + +export const initSharedType = ( + tableDoc: Y.Doc, + initializeValue: { + fields: AITableViewFields; + records: AITableViewRecords; + views: AITableViews; + } +) => { + const sharedType = tableDoc.getMap('ai-table'); + toSharedTable(sharedType, initializeValue); + const recordDocs = toSharedRecords(sharedType.doc as Y.Doc, initializeValue); + return { sharedType, recordDocs }; +}; + +export function toSharedTable( + sharedType: Y.Map, + data: { + fields: AITableViewFields; + records: AITableViewRecords; + views: AITableViews; + } +): void { + sharedType.doc!.transact(() => { + const fieldSharedType = new Y.Array(); + fieldSharedType.insert(0, data.fields.map(toSyncElement)); + sharedType.set('fields', fieldSharedType); + const viewsSharedType = new Y.Array(); + sharedType.set('views', viewsSharedType); + viewsSharedType.insert(0, data.views.map(toSyncElement)); + }); +} + +export function toSharedRecords( + tableDoc: Y.Doc, + data: { + fields: AITableViewFields; + records: AITableViewRecords; + views: AITableViews; + } +) { + const recordDocs: LiveFeedObject[] = []; + tableDoc.transact(() => { + data.records.forEach((record) => { + const typeName = 'record-array'; + const recordDoc = new LiveFeedObject({ guid: record._id, typeName }); + const yArray = recordDoc.getArray(typeName); + yArray.insert(0, toRecordSyncElement(record)); + }); + }); + return recordDocs; +} + +export function toRecordSyncElement(record: AITableViewRecord): Array> { + const nonEditableArray = new Y.Array(); + // 临时方案:为了解决删除时协同操作无法精准获取删除的 id 的问题,将原来的[idValue] 改为[{'_id': idValue}] + // 后续可能改为 YMap 或者通过在 views 中存储 positions 解决 + nonEditableArray.insert(0, [{ _id: record['_id'] }]); + + const editableArray = new Y.Array(); + const editableFields = []; + for (const fieldId in record['values']) { + editableFields.push(record['values'][fieldId]); + } + editableArray.insert(0, [...editableFields, record['positions']]); + + return [nonEditableArray, editableArray]; +} diff --git a/src/app/utils/utils.ts b/src/app/utils/utils.ts index e34ac9df..3964255d 100644 --- a/src/app/utils/utils.ts +++ b/src/app/utils/utils.ts @@ -1,5 +1,13 @@ import { AITableFieldType, AITableReferences, AITableSelectOptionStyle } from '@ai-table/grid'; import { AITableViewFields, AITableViewRecords } from '@ai-table/state'; +import { LiveFeedObject } from '../live-feed/feed-object'; +import { LiveFeedRoom } from '../live-feed/feed-room'; + +export const createFeedRoom = (roomId: string) => { + const tableDoc = new LiveFeedObject({ guid: roomId, typeName: 'ai-table' }); + const feedRoom = new LiveFeedRoom({ roomId: roomId, objects: [tableDoc] }); + return { feedRoom, tableDoc, recordDocs: [] }; +}; export function sortDataByView(data: AITableViewRecords | AITableViewFields, activeViewId: string) { const hasPositions = data.every((item) => item.positions && item.positions); diff --git a/backend/src/index.ts b/src/backend-index.ts similarity index 75% rename from backend/src/index.ts rename to src/backend-index.ts index 35cc6ff5..d7e04a1e 100644 --- a/backend/src/index.ts +++ b/src/backend-index.ts @@ -1,9 +1,6 @@ -import * as Y from 'yjs'; import * as WebSocket from 'ws'; -import http, { Server } from 'http'; -const ywsUtils = require('y-websocket/bin/utils'); -const setupWSConnection = ywsUtils.setupWSConnection; -const docs = ywsUtils.docs as Map }>; +import http, { IncomingMessage, Server } from 'http'; +import { docs, setupWSConnection } from './app/live-feed/bin/utils'; const port = (process.env['PORT'] || 3000) as number; const server: Server = http.createServer((request, response) => { @@ -19,7 +16,7 @@ const server: Server = http.createServer((request, response) => { }); const wss = new WebSocket.Server({ server }); -wss.on('connection', (conn, req) => { +wss.on('connection', (conn: WebSocket, req: IncomingMessage) => { setupWSConnection(conn, req, { gc: false }); }); @@ -31,7 +28,7 @@ setInterval(() => { }); const stats = { conns, - docs: docs.size, + rooms: docs.size, websocket: `ws://localhost:${port}`, http: `http://localhost:${port}` }; diff --git a/tsconfig.backend.json b/tsconfig.backend.json index 152513f2..ebaf3b50 100644 --- a/tsconfig.backend.json +++ b/tsconfig.backend.json @@ -14,6 +14,6 @@ "importHelpers": true, "esModuleInterop": true }, - "files": ["backend/src/index.ts"], - "include": ["backend/src/**/*.d.ts"] + "files": ["src/backend-index.ts"], + "include": ["src/**/*.d.ts"] }