From 9a91eb14b06618cb50fafa09a8ebd2c5bc8f1d37 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 11:31:24 +0000 Subject: [PATCH 1/2] Fix WebSocket daemon initial queue sync on session join The issue was a race condition where the session-joined message could be processed before the QueueContext had registered its subscription handler. Changes: - Store initial queue data in a ref when session-joined is received - Replay pending initial data to new subscribers when they register - Always send initial-queue-data notification (even when empty) to ensure the QueueContext can set initialQueueDataReceivedFromPeers flag - Clear pending data on disconnect, session end, and connection close --- .../connection-manager/daemon-context.tsx | 55 ++++++++++++++++--- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/app/components/connection-manager/daemon-context.tsx b/app/components/connection-manager/daemon-context.tsx index bc952284..d1e1f1e5 100644 --- a/app/components/connection-manager/daemon-context.tsx +++ b/app/components/connection-manager/daemon-context.tsx @@ -98,6 +98,12 @@ type DataHandler = { callback: (data: ReceivedPeerData) => void; }; +// Pending initial data that should be sent to new subscribers +interface PendingInitialData { + queue: ClimbQueueItem[]; + currentClimbQueueItem: ClimbQueueItem | null; +} + export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => { const { daemonUrl } = useDaemonUrl(); const pathname = usePathname(); @@ -122,6 +128,8 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr const hasJoinedSession = useRef(false); const lastHeartbeatResponse = useRef(0); const connectionHealth = useRef<'HEALTHY' | 'DEGRADED' | 'POOR'>('HEALTHY'); + // Store pending initial data to replay to late subscribers + const pendingInitialData = useRef(null); // Generate session ID from pathname const sessionId = pathname.replace(/\//g, '-').slice(1) || 'default'; @@ -130,6 +138,26 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr const handlerId = uuidv4(); dataHandlers.current.push({ id: handlerId, callback }); + // If there's pending initial data, immediately send it to this new subscriber + // This handles the race condition where session-joined arrives before subscribers register + if (pendingInitialData.current) { + const data = pendingInitialData.current; + // Use setTimeout to ensure this runs after the current execution context + // This gives React time to complete the effect that's registering this subscriber + setTimeout(() => { + try { + callback({ + type: 'initial-queue-data', + queue: data.queue, + currentClimbQueueItem: data.currentClimbQueueItem, + source: 'daemon', + }); + } catch (error) { + console.error('Error sending pending initial data to subscriber:', error); + } + }, 0); + } + return () => { dataHandlers.current = dataHandlers.current.filter((handler) => handler.id !== handlerId); }; @@ -174,15 +202,21 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr leaderId: data.users.find((u) => u.isLeader)?.id || null, })); - // Notify queue context about initial data - if (data.queue.length > 0 || data.currentClimbQueueItem) { - notifySubscribers({ - type: 'initial-queue-data', - queue: data.queue, - currentClimbQueueItem: data.currentClimbQueueItem, - source: 'daemon', - }); - } + // Always store the initial queue data for late subscribers + // This handles the race condition where subscribers register after session-joined + pendingInitialData.current = { + queue: data.queue, + currentClimbQueueItem: data.currentClimbQueueItem, + }; + + // Always notify subscribers about initial data (even if empty) + // This ensures the QueueContext knows the session state and can set initialQueueDataReceivedFromPeers + notifySubscribers({ + type: 'initial-queue-data', + queue: data.queue, + currentClimbQueueItem: data.currentClimbQueueItem, + source: 'daemon', + }); break; case 'user-joined': @@ -237,6 +271,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr error: `Session ended: ${data.reason}${data.newPath ? `. Navigate to ${data.newPath} to rejoin.` : ''}`, })); hasJoinedSession.current = false; + pendingInitialData.current = null; break; } }, @@ -345,6 +380,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr users: [], })); hasJoinedSession.current = false; + pendingInitialData.current = null; // Attempt to reconnect unless it was a manual close if (event.code !== 1000 && reconnectAttempts.current < 5) { @@ -402,6 +438,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr error: null, }); hasJoinedSession.current = false; + pendingInitialData.current = null; }, []); // Connect when daemon URL is available From 315fb37b3ba8797eb1eebbbc7a9d031e7b646152 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 21 Dec 2025 11:42:09 +0000 Subject: [PATCH 2/2] Add debugging logs for WebSocket daemon queue sync Added console.log statements to trace the queue sync flow: - Server: log when queue state is fetched/persisted, and session-joined sent - Client: log when session-joined is received and initial-queue-data is processed - Client: log when subscribers register and if pending data is sent This will help identify where the initial queue sync is failing. --- .../connection-manager/daemon-context.tsx | 13 +++++++++++++ app/components/queue-control/queue-context.tsx | 9 ++++++++- daemon/src/handlers/message.ts | 4 ++++ daemon/src/handlers/room.ts | 6 ++++++ daemon/src/services/room-manager.ts | 5 +++++ 5 files changed, 36 insertions(+), 1 deletion(-) diff --git a/app/components/connection-manager/daemon-context.tsx b/app/components/connection-manager/daemon-context.tsx index d1e1f1e5..2114a439 100644 --- a/app/components/connection-manager/daemon-context.tsx +++ b/app/components/connection-manager/daemon-context.tsx @@ -137,11 +137,16 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr const subscribeToData = useCallback((callback: (data: ReceivedPeerData) => void) => { const handlerId = uuidv4(); dataHandlers.current.push({ id: handlerId, callback }); + console.log('[Daemon] New subscriber registered, total:', dataHandlers.current.length, 'hasPendingData:', !!pendingInitialData.current); // If there's pending initial data, immediately send it to this new subscriber // This handles the race condition where session-joined arrives before subscribers register if (pendingInitialData.current) { const data = pendingInitialData.current; + console.log('[Daemon] Sending pending initial data to late subscriber:', { + queueLength: data.queue?.length ?? 0, + currentClimb: data.currentClimbQueueItem?.climb?.name ?? null, + }); // Use setTimeout to ensure this runs after the current execution context // This gives React time to complete the effect that's registering this subscriber setTimeout(() => { @@ -193,6 +198,13 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr (data: DaemonMessage) => { switch (data.type) { case 'session-joined': + console.log('[Daemon] session-joined received:', { + queueLength: data.queue?.length ?? 0, + currentClimb: data.currentClimbQueueItem?.climb?.name ?? null, + isLeader: data.isLeader, + subscriberCount: dataHandlers.current.length, + }); + setState((prev) => ({ ...prev, clientId: data.clientId, @@ -211,6 +223,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr // Always notify subscribers about initial data (even if empty) // This ensures the QueueContext knows the session state and can set initialQueueDataReceivedFromPeers + console.log('[Daemon] Notifying', dataHandlers.current.length, 'subscribers with initial-queue-data'); notifySubscribers({ type: 'initial-queue-data', queue: data.queue, diff --git a/app/components/queue-control/queue-context.tsx b/app/components/queue-control/queue-context.tsx index 00970e67..2eb31984 100644 --- a/app/components/queue-control/queue-context.tsx +++ b/app/components/queue-control/queue-context.tsx @@ -59,11 +59,18 @@ export const QueueProvider = ({ parsedParams, children }: QueueContextProps) => ); break; case 'initial-queue-data': + console.log('[QueueContext] Received initial-queue-data:', { + source: data.source, + hostId, + queueLength: data.queue?.length ?? 0, + currentClimb: data.currentClimbQueueItem?.climb?.name ?? null, + }); // Accept data from the host OR from the daemon (which is authoritative in daemon mode) if (hostId !== data.source && data.source !== 'daemon') { - console.log(`Ignoring queue data from ${data.source} since it's not the host(${hostId}) or daemon.`); + console.log(`[QueueContext] Ignoring queue data from ${data.source} since it's not the host(${hostId}) or daemon.`); return; } + console.log('[QueueContext] Applying initial queue data'); dispatch({ type: 'INITIAL_QUEUE_DATA', payload: { diff --git a/daemon/src/handlers/message.ts b/daemon/src/handlers/message.ts index db75f6a7..1874c123 100644 --- a/daemon/src/handlers/message.ts +++ b/daemon/src/handlers/message.ts @@ -184,5 +184,9 @@ async function handleQueueOperation( break; } + console.log(`[Daemon] Persisting queue state for session ${client.sessionId}:`, { + queueLength: queue?.length ?? 0, + currentClimb: currentClimbQueueItem?.climb?.name ?? null, + }); await roomManager.updateQueueState(client.sessionId, queue, currentClimbQueueItem); } diff --git a/daemon/src/handlers/room.ts b/daemon/src/handlers/room.ts index f0ae0e03..78b736a3 100644 --- a/daemon/src/handlers/room.ts +++ b/daemon/src/handlers/room.ts @@ -42,6 +42,12 @@ export async function handleJoinSession( currentClimbQueueItem: result.currentClimbQueueItem, isLeader: result.isLeader, }; + console.log(`[Daemon] Sending session-joined to ${result.clientId}:`, { + sessionId: message.sessionId, + queueLength: result.queue?.length ?? 0, + currentClimb: result.currentClimbQueueItem?.climb?.name ?? null, + isLeader: result.isLeader, + }); sendToClient(ws, sessionJoinedMessage); // Notify other clients about the new user diff --git a/daemon/src/services/room-manager.ts b/daemon/src/services/room-manager.ts index c8e776c5..951c0646 100644 --- a/daemon/src/services/room-manager.ts +++ b/daemon/src/services/room-manager.ts @@ -263,9 +263,14 @@ class RoomManager { .limit(1); if (result.length === 0) { + console.log(`[RoomManager] getQueueState(${sessionId}): No data found in DB`); return { queue: [], currentClimbQueueItem: null }; } + console.log(`[RoomManager] getQueueState(${sessionId}):`, { + queueLength: result[0].queue?.length ?? 0, + currentClimb: result[0].currentClimbQueueItem?.climb?.name ?? null, + }); return { queue: result[0].queue, currentClimbQueueItem: result[0].currentClimbQueueItem,