Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 59 additions & 9 deletions app/components/connection-manager/daemon-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ type DataHandler = {
callback: (data: ReceivedPeerData) => void;
};

// Pending initial data that should be sent to new subscribers
interface PendingInitialData {
queue: ClimbQueueItem[];
currentClimbQueueItem: ClimbQueueItem | null;
}

export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => {
const { daemonUrl } = useDaemonUrl();
const pathname = usePathname();
Expand All @@ -122,13 +128,40 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
const hasJoinedSession = useRef(false);
const lastHeartbeatResponse = useRef<number>(0);
const connectionHealth = useRef<'HEALTHY' | 'DEGRADED' | 'POOR'>('HEALTHY');
// Store pending initial data to replay to late subscribers
const pendingInitialData = useRef<PendingInitialData | null>(null);

// Generate session ID from pathname
const sessionId = pathname.replace(/\//g, '-').slice(1) || 'default';

const subscribeToData = useCallback((callback: (data: ReceivedPeerData) => void) => {
const handlerId = uuidv4();
dataHandlers.current.push({ id: handlerId, callback });
console.log('[Daemon] New subscriber registered, total:', dataHandlers.current.length, 'hasPendingData:', !!pendingInitialData.current);

// If there's pending initial data, immediately send it to this new subscriber
// This handles the race condition where session-joined arrives before subscribers register
if (pendingInitialData.current) {
const data = pendingInitialData.current;
console.log('[Daemon] Sending pending initial data to late subscriber:', {
queueLength: data.queue?.length ?? 0,
currentClimb: data.currentClimbQueueItem?.climb?.name ?? null,
});
// Use setTimeout to ensure this runs after the current execution context
// This gives React time to complete the effect that's registering this subscriber
setTimeout(() => {
try {
callback({
type: 'initial-queue-data',
queue: data.queue,
currentClimbQueueItem: data.currentClimbQueueItem,
source: 'daemon',
});
} catch (error) {
console.error('Error sending pending initial data to subscriber:', error);
}
}, 0);
}

return () => {
dataHandlers.current = dataHandlers.current.filter((handler) => handler.id !== handlerId);
Expand Down Expand Up @@ -165,6 +198,13 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
(data: DaemonMessage) => {
switch (data.type) {
case 'session-joined':
console.log('[Daemon] session-joined received:', {
queueLength: data.queue?.length ?? 0,
currentClimb: data.currentClimbQueueItem?.climb?.name ?? null,
isLeader: data.isLeader,
subscriberCount: dataHandlers.current.length,
});

setState((prev) => ({
...prev,
clientId: data.clientId,
Expand All @@ -174,15 +214,22 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
leaderId: data.users.find((u) => u.isLeader)?.id || null,
}));

// Notify queue context about initial data
if (data.queue.length > 0 || data.currentClimbQueueItem) {
notifySubscribers({
type: 'initial-queue-data',
queue: data.queue,
currentClimbQueueItem: data.currentClimbQueueItem,
source: 'daemon',
});
}
// Always store the initial queue data for late subscribers
// This handles the race condition where subscribers register after session-joined
pendingInitialData.current = {
queue: data.queue,
currentClimbQueueItem: data.currentClimbQueueItem,
};

// Always notify subscribers about initial data (even if empty)
// This ensures the QueueContext knows the session state and can set initialQueueDataReceivedFromPeers
console.log('[Daemon] Notifying', dataHandlers.current.length, 'subscribers with initial-queue-data');
notifySubscribers({
type: 'initial-queue-data',
queue: data.queue,
currentClimbQueueItem: data.currentClimbQueueItem,
source: 'daemon',
});
break;

case 'user-joined':
Expand Down Expand Up @@ -237,6 +284,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
error: `Session ended: ${data.reason}${data.newPath ? `. Navigate to ${data.newPath} to rejoin.` : ''}`,
}));
hasJoinedSession.current = false;
pendingInitialData.current = null;
break;
}
},
Expand Down Expand Up @@ -345,6 +393,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
users: [],
}));
hasJoinedSession.current = false;
pendingInitialData.current = null;

// Attempt to reconnect unless it was a manual close
if (event.code !== 1000 && reconnectAttempts.current < 5) {
Expand Down Expand Up @@ -402,6 +451,7 @@ export const DaemonProvider: React.FC<{ children: React.ReactNode }> = ({ childr
error: null,
});
hasJoinedSession.current = false;
pendingInitialData.current = null;
}, []);

// Connect when daemon URL is available
Expand Down
9 changes: 8 additions & 1 deletion app/components/queue-control/queue-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,18 @@ export const QueueProvider = ({ parsedParams, children }: QueueContextProps) =>
);
break;
case 'initial-queue-data':
console.log('[QueueContext] Received initial-queue-data:', {
source: data.source,
hostId,
queueLength: data.queue?.length ?? 0,
currentClimb: data.currentClimbQueueItem?.climb?.name ?? null,
});
// Accept data from the host OR from the daemon (which is authoritative in daemon mode)
if (hostId !== data.source && data.source !== 'daemon') {
console.log(`Ignoring queue data from ${data.source} since it's not the host(${hostId}) or daemon.`);
console.log(`[QueueContext] Ignoring queue data from ${data.source} since it's not the host(${hostId}) or daemon.`);
return;
}
console.log('[QueueContext] Applying initial queue data');
dispatch({
type: 'INITIAL_QUEUE_DATA',
payload: {
Expand Down
4 changes: 4 additions & 0 deletions daemon/src/handlers/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,9 @@ async function handleQueueOperation(
break;
}

console.log(`[Daemon] Persisting queue state for session ${client.sessionId}:`, {
queueLength: queue?.length ?? 0,
currentClimb: currentClimbQueueItem?.climb?.name ?? null,
});
await roomManager.updateQueueState(client.sessionId, queue, currentClimbQueueItem);
}
6 changes: 6 additions & 0 deletions daemon/src/handlers/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ export async function handleJoinSession(
currentClimbQueueItem: result.currentClimbQueueItem,
isLeader: result.isLeader,
};
console.log(`[Daemon] Sending session-joined to ${result.clientId}:`, {
sessionId: message.sessionId,
queueLength: result.queue?.length ?? 0,
currentClimb: result.currentClimbQueueItem?.climb?.name ?? null,
isLeader: result.isLeader,
});
sendToClient(ws, sessionJoinedMessage);

// Notify other clients about the new user
Expand Down
5 changes: 5 additions & 0 deletions daemon/src/services/room-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,14 @@ class RoomManager {
.limit(1);

if (result.length === 0) {
console.log(`[RoomManager] getQueueState(${sessionId}): No data found in DB`);
return { queue: [], currentClimbQueueItem: null };
}

console.log(`[RoomManager] getQueueState(${sessionId}):`, {
queueLength: result[0].queue?.length ?? 0,
currentClimb: result[0].currentClimbQueueItem?.climb?.name ?? null,
});
return {
queue: result[0].queue,
currentClimbQueueItem: result[0].currentClimbQueueItem,
Expand Down