Skip to content

Conversation

@corrideat
Copy link
Member

No description provided.

@taoeffect
Copy link
Member

Thanks @corrideat! I'll have a closer look, in the meantime, here's a quick review from Opus 4.5:

Details

Critical Bug: First Retry Has Zero Delay

In the runWithRetry function, the first scheduled retry will fire immediately (0ms delay) because attemptNo is still 0 when the delay is calculated:

// 4. Schedule retry
setTimeout(() => {
  if (++attemptNo > options.maxOpRetries) {
    console.warn(`Giving up ${type} for channel`, channelID)
    return
  }
  send()
}, options.opRetryInterval * attemptNo)  // attemptNo is 0 here!

The flow is:

  1. send() called, attemptNo = 0
  2. Request sent
  3. setTimeout scheduled with delay 2000 * 0 = 0ms
  4. Callback fires immediately, ++attemptNo makes it 1
  5. Second request sent (unintended immediate retry!)

Fix:

// 4. Schedule retry
setTimeout(() => {
  if (++attemptNo > options.maxOpRetries) {
    console.warn(`Giving up ${type} for channel`, channelID)
    return
  }
  send()
}, options.opRetryInterval * (attemptNo + 1))

Potential Issue: Retries Continue After Success

When a SUB/UNSUB succeeds, the entry is deleted from the pending map, but the scheduled setTimeout callbacks are never cleared. They will continue to fire (until maxOpRetries is reached), only to be cancelled by the instance check. This is wasteful.

Consider tracking and clearing timeouts on success:

function runWithRetry (
  client: PubSubClient,
  channelID: string,
  type: RequestTypeEnum,
  instance: object
) {
  let attemptNo = 0
  let retryTimeoutID: ReturnType<typeof setTimeout> | undefined
  const { socket, options } = client

  const send = () => {
    if (client.socket !== socket || socket?.readyState !== WebSocket.OPEN) return

    const currentInstance = type === REQUEST_TYPE.SUB
      ? client.pendingSubscriptionMap.get(channelID)
      : client.pendingUnsubscriptionMap.get(channelID)

    if (currentInstance !== instance) return

    const kvFilter = client.kvFilter.get(channelID)
    const payload: JSONType = (type === REQUEST_TYPE.SUB && kvFilter)
      ? { channelID, kvFilter }
      : { channelID }

    socket.send(createRequest(type, payload))

    retryTimeoutID = setTimeout(() => {
      if (++attemptNo > options.maxOpRetries) {
        console.warn(`Giving up ${type} for channel`, channelID)
        return
      }
      send()
    }, options.opRetryInterval * (attemptNo + 1))
  }

  send()
}

Code Clarity: Redundant-Looking Loop in Close Handler

The close handler has two sequential operations on pendingSubscriptionMap that could confuse future maintainers:

if (client.shouldReconnect) {
  const instance = {}
  client.subscriptionSet.forEach((channelID) => {
    if (!client.pendingUnsubscriptionMap.has(channelID)) {
      client.pendingSubscriptionMap.set(channelID, instance)
    }
  })
  for (const [channelID] of client.pendingSubscriptionMap) {
    client.pendingSubscriptionMap.set(channelID, instance)
  }
}

The second loop re-sets items that may have just been added. While this is intentional (to reset instances for existing pending subscriptions so old retry timers are invalidated), it's not immediately clear. Consider adding a comment or refactoring:

if (client.shouldReconnect) {
  const instance = {}
  // Invalidate all existing pending subscription instances so that their
  // retry timers will stop (due to instance mismatch checks in runWithRetry)
  // and prepare them to be retried with the new socket.
  for (const [channelID] of client.pendingSubscriptionMap) {
    client.pendingSubscriptionMap.set(channelID, instance)
  }
  // Re-add current subscriptions as pending so they get restored on reconnect.
  client.subscriptionSet.forEach((channelID) => {
    if (!client.pendingUnsubscriptionMap.has(channelID)) {
      client.pendingSubscriptionMap.set(channelID, instance)
    }
  })
}

Minor: No Retry for UNSUB on Reconnect

The open handler only processes pending subscriptions:

// Send any pending subscription request.
for (const [channelID, instance] of client.pendingSubscriptionMap) {
  runWithRetry(client, channelID, REQUEST_TYPE.SUB, instance)
}
// There should be no pending unsubscription since we just got connected.

While the comment explains the rationale (and pendingUnsubscriptionMap is cleared on close), if unsub() is called between close and reconnection, those entries would exist but not be processed. This is probably intentional, but you may want to verify this edge case is acceptable.


Summary

Issue Severity Location
First retry fires immediately (0ms delay) High runWithRetry, line with options.opRetryInterval * attemptNo
Retry timeouts not cleared on success Low runWithRetry function
Code clarity in close handler Low close handler's double loop

@corrideat
Copy link
Member Author

Addressed the zero-delay issue (result of refactoring) and the double loop in close, by flipping the order of the loops.

The second issue (Retry timeouts not cleared on success) seems like something where the benefit is a bit unclear. It is true that the timeout will not be cleared on success or error, but then it'll fail fairly quickly at the 'Closure check' or 'Cancellation check' steps. I could keep track of these and cancel them, for example upon receiving an OK response or when closing the connection, but that'll add extra logic, and the way it is now shouldn't break anything or be too wasteful.

Copy link
Member

@taoeffect taoeffect left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this latest Opus 4.5 review require any changes in your mind?

Details

I've reviewed the changes carefully. Overall, the refactoring from Set to Map for tracking pending operations with instance tokens is a reasonable approach for implementing cancellable retries. However, I found several issues worth addressing.


1. Retry Timers Are Never Cleared (Memory/Resource Leak)

The setTimeout calls in runWithRetry are never tracked or cleared. When destroy() is called or the socket closes, these timers continue to exist and will fire (though they exit early).

Problematic code in runWithRetry:

    // 4. Schedule retry
    setTimeout(() => {
      if (++attemptNo > options.maxOpRetries) {
        console.warn(`Giving up ${type} for channel`, channelID)
        return
      }
      send()
    }, options.opRetryInterval * (attemptNo + 1))

Suggested improvement — Track retry timers so they can be cleared:

function runWithRetry (
  client: PubSubClient,
  channelID: string,
  type: RequestTypeEnum,
  instance: object
) {
  let attemptNo = 0
  let retryTimeoutID: TimeoutID | undefined
  const { socket, options } = client

  const send = () => {
    // 1. Closure check: ensure socket instance hasn't been replaced
    if (client.socket !== socket || socket?.readyState !== WebSocket.OPEN) return

    // 2. Cancellation check
    const currentInstance = type === REQUEST_TYPE.SUB
      ? client.pendingSubscriptionMap.get(channelID)
      : client.pendingUnsubscriptionMap.get(channelID)

    if (currentInstance !== instance) {
      clearTimeout(retryTimeoutID)
      return
    }

    // 3. Send logic
    const kvFilter = client.kvFilter.get(channelID)
    const payload: JSONType = (type === REQUEST_TYPE.SUB && kvFilter)
      ? { channelID, kvFilter }
      : { channelID }

    socket.send(createRequest(type, payload))

    // 4. Schedule retry
    retryTimeoutID = setTimeout(() => {
      if (++attemptNo > options.maxOpRetries) {
        console.warn(`Giving up ${type} for channel`, channelID)
        return
      }
      send()
    }, options.opRetryInterval * (attemptNo + 1))
  }

  send()
}

You may also want to store all active retry timer IDs at the client level so clearAllTimers() can cancel them on destroy.


2. Duplicate Requests Sent Before Server Response

The retry is scheduled immediately after each send, regardless of whether a response arrives. With opRetryInterval: 2000, if the server takes longer than 2 seconds to respond, a duplicate request will be sent.

Problematic code:

    socket.send(createRequest(type, payload))

    // 4. Schedule retry
    setTimeout(() => {
      // ... retry logic
    }, options.opRetryInterval * (attemptNo + 1))

Suggestion: Consider whether this is acceptable behavior. If server latency commonly exceeds 2 seconds, you may want to:

  • Increase opRetryInterval default
  • Add a comment documenting that duplicate requests are expected and the server should be idempotent
  • Or implement a different pattern where retry only happens if the connection drops (not as a timeout)

3. Redundant kvFilter.delete() Call

The kvFilter is deleted in two places for UNSUB operations:

In unsub() method:

  unsub (channelID: string) {
    const client = this

    if (!client.pendingUnsubscriptionMap.has(channelID)) {
      // ...
      client.kvFilter.delete(channelID)  // <-- First deletion

      runWithRetry(client, channelID, REQUEST_TYPE.UNSUB, instance)
    }
  }

In OK response handler:

      case REQUEST_TYPE.UNSUB: {
        console.debug(`[pubsub] Unsubscribed from ${channelID}`)
        client.pendingUnsubscriptionMap.delete(channelID)
        client.subscriptionSet.delete(channelID)
        client.kvFilter.delete(channelID)  // <-- Second deletion (redundant)
        break
      }

Suggestion: Remove the deletion from the OK handler since it's already done eagerly in unsub():

      case REQUEST_TYPE.UNSUB: {
        console.debug(`[pubsub] Unsubscribed from ${channelID}`)
        client.pendingUnsubscriptionMap.delete(channelID)
        client.subscriptionSet.delete(channelID)
        // kvFilter already deleted in unsub()
        break
      }

4. unsub() Allows Unsubscribing From Non-Subscribed Channels

Unlike the new sub() method which checks !client.subscriptionSet.has(channelID), the unsub() method has no such guard:

  sub (channelID: string) {
    const client = this

    if (!client.pendingSubscriptionMap.has(channelID) && !client.subscriptionSet.has(channelID)) {
      // ... guarded against already-subscribed
    }
  },

  unsub (channelID: string) {
    const client = this

    if (!client.pendingUnsubscriptionMap.has(channelID)) {
      // ... no check for subscriptionSet or pendingSubscriptionMap
    }
  }

Suggestion: Add a guard to prevent unnecessary UNSUB requests:

  unsub (channelID: string) {
    const client = this

    // Only unsub if we're subscribed or have a pending subscription
    if (!client.pendingUnsubscriptionMap.has(channelID) &&
        (client.subscriptionSet.has(channelID) || client.pendingSubscriptionMap.has(channelID))) {
      const instance = {}
      client.pendingSubscriptionMap.delete(channelID)
      client.pendingUnsubscriptionMap.set(channelID, instance)
      client.kvFilter.delete(channelID)

      runWithRetry(client, channelID, REQUEST_TYPE.UNSUB, instance)
    }
  }

5. Potential Confusion: Total Attempts vs. Retries

With maxOpRetries: 4, the code allows for 5 total attempts (1 initial + 4 retries). The option name suggests retries, so this is technically correct, but worth documenting clearly.

Suggestion: Add a clarifying comment:

const defaultOptions: Options = {
  // ...
  maxOpRetries: 4,  // Maximum retry attempts after initial send (5 total attempts)
  opRetryInterval: 2000  // Base interval between retries (linear backoff: 2s, 4s, 6s, 8s)
}

6. Minor: setKvFilter Lacks Retry Mechanism

For consistency, setKvFilter sends without the retry mechanism:

  setKvFilter (channelID: string, kvFilter?: string[]) {
    // ...
    if (client.subscriptionSet.has(channelID)) {
      if (socket?.readyState === WebSocket.OPEN) {
        socket.send(
          createRequest(REQUEST_TYPE.KV_FILTER, kvFilter ? { channelID, kvFilter } : { channelID })
        )
      }
    }
  },

Is this intentional? If the KV_FILTER message is lost, the filter won't be applied. Consider whether this should also use a retry mechanism or document why it doesn't need one.


Summary

Issue Severity Action
Retry timers not cleared Medium Track and clear timers on destroy/cancel
Duplicate requests on slow servers Low-Medium Document or increase interval
Redundant kvFilter.delete() Low Remove from OK handler
unsub() missing guard Low Add subscription check
Unclear total attempts Low Add documenting comment
setKvFilter no retry Low Document intentionality or add retry

@corrideat
Copy link
Member Author

@taoeffect I've also noticed this piece of logic:

        // If we are definetely offline then do not try to reconnect now,
        // unless the server is local.
        if (!isDefinetelyOffline() || client.isLocal) {
          client.scheduleConnectionAttempt()
        }

Personally, I'm not sure this makes a lot of sense to keep. For one, logic is already complex as is to have special cases like client.isLocal. For another, isDefinetelyOffline will most of the time be false (resulting in the branch being taken). Pretty much the only situation I've encountered that makes navigator.onLine be false is manually placing the browser in offline mode (in which case, connecting to anything is probably unexpected, even if it works for some reason).

@taoeffect
Copy link
Member

taoeffect commented Dec 12, 2025

I challenged Opus with making runWithRetry more generic so that it could be used more naturally with any message, without having to do this unnatural Re-use client.pendingSubscriptionMap for KV_FILTER thing. After some back and forth here's what it came up with:

Details

You're right. A single unified map with composite keys would be cleaner than creating a separate map for each message type.

Solution: Unified pendingOperations Map

Key Ideas

  1. Single map with composite keys: "${type}:${identifier}" (e.g., "sub:abc123", "kv_filter:abc123")
  2. Operation objects store everything needed: type, payload generator, and instance for cancellation tracking
  3. Payload is generated at send time: Handles cases where data (like kvFilter) might change between retries

Implementation

1. Add Types and Helpers

// ====== Types (add near the top) ====== //

type PendingOperation = {
  type: RequestTypeEnum
  getPayload: () => JSONObject
  instance: object
}

// ====== Helpers ====== //

function createOpKey(type: RequestTypeEnum, identifier: string): string {
  return `${type}:${identifier}`
}

function parseOpKey(opKey: string): { type: string; identifier: string } {
  const colonIndex = opKey.indexOf(':')
  return {
    type: opKey.slice(0, colonIndex),
    identifier: opKey.slice(colonIndex + 1)
  }
}

// Operation types that should be re-queued after reconnection
const REQUEUE_ON_RECONNECT: ReadonlySet<RequestTypeEnum> = new Set([REQUEST_TYPE.SUB])

function runWithRetry(client: PubSubClient, opKey: string) {
  const { socket, options } = client
  const op = client.pendingOperations.get(opKey)
  if (!op) return

  const { instance } = op
  let attemptNo = 0

  const send = () => {
    const currentOp = client.pendingOperations.get(opKey)

    // 1. Socket still valid and open?
    if (client.socket !== socket || socket?.readyState !== WebSocket.OPEN) return

    // 2. Operation cancelled or superseded?
    if (!currentOp || currentOp.instance !== instance) return

    // 3. Send with fresh payload
    socket.send(createRequest(currentOp.type, currentOp.getPayload()))

    // 4. Schedule retry with linear backoff
    setTimeout(() => {
      if (++attemptNo > options.maxOpRetries) {
        console.warn(`[pubsub] Giving up ${currentOp.type} for`, opKey)
        return
      }
      send()
    }, options.opRetryInterval * (attemptNo + 1))
  }

  send()
}

2. Update PubSubClient Type

export type PubSubClient = {
  // ... existing properties ...
  
  // REMOVE these:
  // pendingSubscriptionMap: Map<string, object>;
  // pendingUnsubscriptionMap: Map<string, object>;
  
  // ADD this:
  pendingOperations: Map<string, PendingOperation>;
  
  // Keep these as-is:
  subscriptionSet: Set<string>;
  kvFilter: Map<string, string[]>;
  // ... rest ...
};

3. Update createClient

export function createClient(url: string, options: Partial<Options> = {}): PubSubClient {
  const client: PubSubClient = {
    // ... existing properties ...
    
    // REMOVE:
    // pendingSubscriptionMap: new Map(),
    // pendingUnsubscriptionMap: new Map(),
    
    // ADD:
    pendingOperations: new Map(),
    
    // Keep:
    subscriptionSet: new Set(),
    kvFilter: new Map(),
    // ...
  }
  // ...
}

4. Update Public Methods

const publicMethods = {
  // ... clearAllTimers, connect unchanged ...

  destroy() {
    const client = this

    client.clearAllTimers()
    client.pendingOperations.clear()
    client.subscriptionSet.clear()
    // ... rest unchanged ...
  },

  // ... getNextRandomDelay, scheduleConnectionAttempt, pub unchanged ...

  sub(channelID: string) {
    const client = this
    const subKey = createOpKey(REQUEST_TYPE.SUB, channelID)
    const unsubKey = createOpKey(REQUEST_TYPE.UNSUB, channelID)

    if (!client.pendingOperations.has(subKey) && !client.subscriptionSet.has(channelID)) {
      client.pendingOperations.delete(unsubKey)

      client.pendingOperations.set(subKey, {
        type: REQUEST_TYPE.SUB,
        getPayload: () => {
          const kvFilter = client.kvFilter.get(channelID)
          return kvFilter ? { channelID, kvFilter } : { channelID }
        },
        instance: {}
      })

      runWithRetry(client, subKey)
    }
  },

  setKvFilter(channelID: string, kvFilter?: string[]) {
    const client = this
    const kvFilterKey = createOpKey(REQUEST_TYPE.KV_FILTER, channelID)
    const unsubKey = createOpKey(REQUEST_TYPE.UNSUB, channelID)

    if (kvFilter) {
      client.kvFilter.set(channelID, kvFilter)
    } else {
      client.kvFilter.delete(channelID)
    }

    if (client.subscriptionSet.has(channelID) && !client.pendingOperations.has(unsubKey)) {
      client.pendingOperations.set(kvFilterKey, {
        type: REQUEST_TYPE.KV_FILTER,
        getPayload: () => {
          const currentFilter = client.kvFilter.get(channelID)
          return currentFilter ? { channelID, kvFilter: currentFilter } : { channelID }
        },
        instance: {}
      })

      runWithRetry(client, kvFilterKey)
    }
  },

  unsub(channelID: string) {
    const client = this
    const subKey = createOpKey(REQUEST_TYPE.SUB, channelID)
    const unsubKey = createOpKey(REQUEST_TYPE.UNSUB, channelID)

    if (
      !client.pendingOperations.has(unsubKey) &&
      (client.subscriptionSet.has(channelID) || client.pendingOperations.has(subKey))
    ) {
      client.pendingOperations.delete(subKey)
      client.kvFilter.delete(channelID)

      client.pendingOperations.set(unsubKey, {
        type: REQUEST_TYPE.UNSUB,
        getPayload: () => ({ channelID }),
        instance: {}
      })

      runWithRetry(client, unsubKey)
    }
  }
}

5. Update Event Handlers

close handler:

close(event) {
  const client = this

  console.debug('[pubsub] Event: close', event.code, event.reason)
  client.failedConnectionAttempts++

  if (client.socket) {
    for (const name of socketEventNames) {
      client.socket.removeEventListener(name, client.listeners[name] as () => void)
    }
  }
  client.socket = null
  client.clearAllTimers()

  if (client.shouldReconnect) {
    const newInstance = {}
    const opsToRequeue = new Map<string, PendingOperation>()

    // Preserve operations that should survive reconnection
    for (const [opKey, op] of client.pendingOperations) {
      if (REQUEUE_ON_RECONNECT.has(op.type)) {
        opsToRequeue.set(opKey, { ...op, instance: newInstance })
      }
    }

    // Re-queue subscriptions that need to be restored
    for (const channelID of client.subscriptionSet) {
      const subKey = createOpKey(REQUEST_TYPE.SUB, channelID)
      const unsubKey = createOpKey(REQUEST_TYPE.UNSUB, channelID)

      if (!client.pendingOperations.has(unsubKey) && !opsToRequeue.has(subKey)) {
        opsToRequeue.set(subKey, {
          type: REQUEST_TYPE.SUB,
          getPayload: () => {
            const kvFilter = client.kvFilter.get(channelID)
            return kvFilter ? { channelID, kvFilter } : { channelID }
          },
          instance: newInstance
        })
      }
    }

    client.pendingOperations.clear()
    for (const [key, op] of opsToRequeue) {
      client.pendingOperations.set(key, op)
    }
  } else {
    client.pendingOperations.clear()
  }

  client.subscriptionSet.clear()

  if (client.shouldReconnect && client.options.reconnectOnDisconnection) {
    if (client.failedConnectionAttempts > client.options.maxRetries) {
      sbp('okTurtles.events/emit', PUBSUB_RECONNECTION_FAILED, client)
    } else {
      if (!isDefinetelyOffline() || client.isLocal) {
        client.scheduleConnectionAttempt()
      }
    }
  }
}

open handler:

open() {
  console.debug('[pubsub] Event: open')
  const client = this
  const { options } = this

  client.connectionTimeUsed = undefined
  client.clearAllTimers()
  sbp('okTurtles.events/emit', PUBSUB_RECONNECTION_SUCCEEDED, client)

  client.failedConnectionAttempts = -1
  client.isNew = false

  if (options.pingTimeout > 0 && options.pingTimeout < Infinity) {
    client.pingTimeoutID = setTimeout(() => {
      client.socket?.close()
    }, options.pingTimeout)
  }

  // Re-send all pending operations
  for (const [opKey] of client.pendingOperations) {
    runWithRetry(client, opKey)
  }
}

6. Update Message Handlers

RESPONSE_TYPE.ERROR handler:

[RESPONSE_TYPE.ERROR]({ data }) {
  const { type, channelID, reason } = data
  console.warn(`[pubsub] Received ERROR response for ${type} request to ${channelID}`)
  const client = this

  switch (type) {
    case REQUEST_TYPE.SUB: {
      console.warn(`[pubsub] Could not subscribe to ${channelID}: ${reason}`)
      client.pendingOperations.delete(createOpKey(REQUEST_TYPE.SUB, channelID))
      break
    }
    case REQUEST_TYPE.UNSUB: {
      console.warn(`[pubsub] Could not unsubscribe from ${channelID}: ${reason}`)
      client.pendingOperations.delete(createOpKey(REQUEST_TYPE.UNSUB, channelID))
      break
    }
    case REQUEST_TYPE.KV_FILTER: {
      console.warn(`[pubsub] Could not set KV filter for ${channelID}: ${reason}`)
      client.pendingOperations.delete(createOpKey(REQUEST_TYPE.KV_FILTER, channelID))
      break
    }
    case REQUEST_TYPE.PUSH_ACTION: {
      const { actionType, message } = data
      console.warn(
        `[pubsub] Received ERROR for PUSH_ACTION request with action type '${actionType}': ${message}`
      )
      break
    }
    default: {
      console.error(`[pubsub] Malformed response: invalid request type ${type}`)
    }
  }
}

RESPONSE_TYPE.OK handler:

[RESPONSE_TYPE.OK]({ data: { type, channelID, kvFilter } }) {
  const client = this

  switch (type) {
    case REQUEST_TYPE.SUB: {
      client.pendingOperations.delete(createOpKey(REQUEST_TYPE.SUB, channelID))
      client.subscriptionSet.add(channelID)
      sbp('okTurtles.events/emit', PUBSUB_SUBSCRIPTION_SUCCEEDED, client, { channelID })
      
      const ourKvFilter = client.kvFilter.get(channelID)
      if (!ourKvFilter !== !kvFilter) {
        client.setKvFilter(channelID, ourKvFilter)
      } else if (ourKvFilter && kvFilter) {
        if (ourKvFilter.length !== kvFilter.length) {
          client.setKvFilter(channelID, ourKvFilter)
        } else {
          const sortedA = [...ourKvFilter].sort()
          const sortedB = [...kvFilter].sort()
          for (let i = 0; i < sortedA.length; i++) {
            if (sortedA[i] !== sortedB[i]) {
              client.setKvFilter(channelID, ourKvFilter)
              break
            }
          }
        }
      }
      break
    }
    case REQUEST_TYPE.UNSUB: {
      client.pendingOperations.delete(createOpKey(REQUEST_TYPE.UNSUB, channelID))
      client.subscriptionSet.delete(channelID)
      console.debug(`[pubsub] Unsubscribed from ${channelID}`)
      break
    }
    case REQUEST_TYPE.KV_FILTER: {
      client.pendingOperations.delete(createOpKey(REQUEST_TYPE.KV_FILTER, channelID))
      console.debug(`[pubsub] Set KV filter for ${channelID}`)
      break
    }
    default: {
      console.error(`[pubsub] Malformed response: invalid request type ${type}`)
    }
  }
}

Benefits

Before After
3 separate maps (pendingSubscriptionMap, pendingUnsubscriptionMap, + awkward reuse for KV_FILTER) 1 unified pendingOperations map
Type-checking inside runWithRetry to determine which map to use Generic runWithRetry that works with any operation
Payload construction inside runWithRetry with type-specific logic Each operation carries its own getPayload() function
Hard to add new retriable message types Add new types by simply creating an operation object and calling runWithRetry

Adding New Retriable Operations

To add retry support for any new message type, you just need to:

const opKey = createOpKey(REQUEST_TYPE.NEW_TYPE, identifier)
client.pendingOperations.set(opKey, {
  type: REQUEST_TYPE.NEW_TYPE,
  getPayload: () => ({ /* your payload */ }),
  instance: {}
})
runWithRetry(client, opKey)

And handle the response in the OK/ERROR handlers by deleting from pendingOperations.

@taoeffect
Copy link
Member

taoeffect commented Dec 12, 2025

Here's Gemini Pro 3:

Details

Here is the solution.

The core issue is that the client was maintaining separate Map objects for specific operations (pendingSubscriptionMap, pendingUnsubscriptionMap). To support KV_FILTER and future message types elegantly, we should consolidate these into a single pendingRequests structure that nests the request type.

Structure Change:

  • Old: Map<ChannelID, InstanceToken> (one map per operation type)
  • New: Map<ChannelID, Map<RequestType, InstanceToken>> (one unified map)

Here is the refactored code.

// ... imports ...

export type PubSubClient = {
  // ... existing fields ...
  
  // REPLACED: pendingSubscriptionMap / pendingUnsubscriptionMap
  // WITH: A unified map tracking any pending request type for a channel
  pendingRequests: Map<string, Map<string, object>>;
  
  // ... existing fields ...
};

// ... helpers ...

function runWithRetry (
  client: PubSubClient,
  channelID: string,
  type: RequestTypeEnum,
  instance: object
) {
  let attemptNo = 0
  const { socket, options } = client

  const send = () => {
    // 1. Closure check: ensure socket instance hasn't been replaced
    if (client.socket !== socket || socket?.readyState !== WebSocket.OPEN) return

    // 2. Cancellation check
    // We now look up the nested map. If the instance token for this
    // specific type and channel has changed, we stop.
    const currentInstance = client.pendingRequests.get(channelID)?.get(type)

    if (currentInstance !== instance) return

    // 3. Send logic
    const kvFilter = client.kvFilter.get(channelID)
    const payload: JSONType = (
      kvFilter && ([REQUEST_TYPE.SUB, REQUEST_TYPE.KV_FILTER] as string[]).includes(type)
    )
      ? { channelID, kvFilter }
      : { channelID }

    socket.send(createRequest(type, payload))

    // 4. Schedule retry
    setTimeout(() => {
      if (++attemptNo > options.maxOpRetries) {
        console.warn(`Giving up ${type} for channel`, channelID)
        return
      }
      send()
    }, options.opRetryInterval * (attemptNo + 1))
  }

  send()
}

// ... API ...

export function createClient (url: string, options: Partial<Options> = {}): PubSubClient {
  const client: PubSubClient = {
    // ... other props ...
    
    // Initialize the unified map
    pendingRequests: new Map(),
    // pendingSubscriptionMap: new Map(), // Deleted
    // pendingUnsubscriptionMap: new Map(), // Deleted
    
    // ... other props ...
  }
  
  // ... listeners setup ...
  
  return client
}

// ...

// These handlers receive the PubSubClient instance through the `this` binding.
const defaultClientEventHandlers: ClientEventHandlers = {
  close (event) {
    const client = this
    // ... logging and cleanup ...

    if (client.shouldReconnect) {
      // Create a new instance token to invalidate any in-flight retry closures
      // from the previous connection.
      const instance = {}
      
      // 1. Refresh all currently pending requests with the new token
      for (const requests of client.pendingRequests.values()) {
        for (const type of requests.keys()) {
          requests.set(type, instance)
        }
      }

      // 2. Restore subscriptions: Ensure every channel we are subscribed to
      // is queued for re-subscription (SUB), unless we are explicitly unsubscribing.
      client.subscriptionSet.forEach((channelID) => {
        let requests = client.pendingRequests.get(channelID)
        if (!requests) {
          requests = new Map()
          client.pendingRequests.set(channelID, requests)
        }
        
        // Only queue a SUB if we aren't waiting for an UNSUB
        if (!requests.has(REQUEST_TYPE.UNSUB)) {
          requests.set(REQUEST_TYPE.SUB, instance)
        }
      })
    } else {
        client.pendingRequests.clear()
    }
    
    client.subscriptionSet.clear()
    // No need to clear pending maps here, logic above handled it or we cleared all
    
    // ... reconnection logic ...
  },

  // ... error, message, offline, online handlers ...

  open () {
    // ... existing setup ...
    
    // Send ANY pending request (SUB, KV_FILTER, or even UNSUB if pending)
    for (const [channelID, requests] of client.pendingRequests) {
      for (const [type, instance] of requests) {
        runWithRetry(client, channelID, type as RequestTypeEnum, instance)
      }
    }
  },

  // ... other handlers ...
}

const defaultMessageHandlers: MessageHandlers = {
  // ... other handlers ...

  [RESPONSE_TYPE.ERROR] ({ data }) {
    const { type, channelID, reason } = data
    const client = this

    // Generic cleanup for any request type
    const requests = client.pendingRequests.get(channelID)
    if (requests) {
      requests.delete(type)
      if (requests.size === 0) client.pendingRequests.delete(channelID)
    }

    // ... existing switch/case for specific logging ...
  },

  [RESPONSE_TYPE.OK] ({ data: { type, channelID, kvFilter } }) {
    const client = this

    // Generic cleanup: Request succeeded, stop retrying.
    const requests = client.pendingRequests.get(channelID)
    if (requests) {
      requests.delete(type)
      if (requests.size === 0) client.pendingRequests.delete(channelID)
    }

    switch (type) {
      case REQUEST_TYPE.SUB: {
        client.subscriptionSet.add(channelID)
        sbp('okTurtles.events/emit', PUBSUB_SUBSCRIPTION_SUCCEEDED, client, { channelID })
        
        // ... KV Filter logic stays largely the same ...
        // Only change: calling this.setKvFilter checks the new map structure
        // (Logic omitted for brevity, logic matches original snippet provided)
        break
      }
      case REQUEST_TYPE.UNSUB: {
        console.debug(`[pubsub] Unsubscribed from ${channelID}`)
        client.subscriptionSet.delete(channelID)
        break
      }
      case REQUEST_TYPE.KV_FILTER: {
        client.subscriptionSet.add(channelID)
        console.debug(`[pubsub] Set KV filter for ${channelID}`)
        break
      }
      // ...
    }
  }
}

// ...

const publicMethods: {
  // ...
  destroy(this: PubSubClient): void;
  // ...
} = {
  // ...
  destroy () {
    const client = this
    // ...
    client.pendingRequests.clear() // Clean unified map
    client.subscriptionSet.clear()
    // ...
  },

  sub (channelID: string) {
    const client = this
    
    // Get or Create the request map for this channel
    let requests = client.pendingRequests.get(channelID)
    if (!requests) {
      requests = new Map()
      client.pendingRequests.set(channelID, requests)
    }

    // Calling sub() cancels any pending unsub()
    requests.delete(REQUEST_TYPE.UNSUB)

    // If we aren't subscribed and aren't already trying to subscribe
    if (!client.subscriptionSet.has(channelID) && !requests.has(REQUEST_TYPE.SUB)) {
      const instance = {}
      requests.set(REQUEST_TYPE.SUB, instance)
      runWithRetry(client, channelID, REQUEST_TYPE.SUB, instance)
    }
  },

  setKvFilter (channelID: string, kvFilter?: string[]) {
    const client = this

    if (kvFilter) {
      client.kvFilter.set(channelID, kvFilter)
    } else {
      client.kvFilter.delete(channelID)
    }

    // Helper to check pending status
    const requests = client.pendingRequests.get(channelID)
    const isUnsubscribing = requests?.has(REQUEST_TYPE.UNSUB)

    // If subscribed and not currently unsubscribing
    if (client.subscriptionSet.has(channelID) && !isUnsubscribing) {
      const reqs = requests || new Map()
      if (!requests) client.pendingRequests.set(channelID, reqs)

      const instance = {}
      reqs.set(REQUEST_TYPE.KV_FILTER, instance)
      runWithRetry(client, channelID, REQUEST_TYPE.KV_FILTER, instance)
    }
  },

  unsub (channelID: string) {
    const client = this
    
    let requests = client.pendingRequests.get(channelID)
    if (!requests) {
      requests = new Map()
      client.pendingRequests.set(channelID, requests)
    }

    // Calling unsub() cancels pending sub() and kv_filter()
    const hadPendingSub = requests.delete(REQUEST_TYPE.SUB)
    requests.delete(REQUEST_TYPE.KV_FILTER)
    client.kvFilter.delete(channelID)

    // If we are currently subscribed, OR if we were trying to subscribe
    // (meaning the server might have received the SUB), we must send UNSUB.
    if (
      !requests.has(REQUEST_TYPE.UNSUB) &&
      (client.subscriptionSet.has(channelID) || hadPendingSub)
    ) {
      const instance = {}
      requests.set(REQUEST_TYPE.UNSUB, instance)
      runWithRetry(client, channelID, REQUEST_TYPE.UNSUB, instance)
    }
    
    // Cleanup empty map if nothing happened
    if (requests.size === 0) client.pendingRequests.delete(channelID)
  }
}

And here's Opus 4.5 again but with thinking enabled:

Details

Refactoring runWithRetry for Generic Message Delivery

The core issue is that runWithRetry has type-specific logic baked in, and the separate pendingSubscriptionMap/pendingUnsubscriptionMap approach doesn't scale well for new message types like KV_FILTER.

Solution: Unified Pending Operations Map with Composite Keys

Replace the two separate maps with a single pendingOperations map using composite keys (${type}:${channelID}), and make runWithRetry accept a payload builder function.

Key Changes:

// ====== Helpers ====== //

function pendingOpKey(type: RequestTypeEnum, channelID: string): string {
  return `${type}:${channelID}`
}

function runWithRetry(
  client: PubSubClient,
  type: RequestTypeEnum,
  channelID: string,
  instance: object,
  buildPayload: () => JSONObject
) {
  let attemptNo = 0
  const { socket, options } = client
  const key = pendingOpKey(type, channelID)

  const send = () => {
    // 1. Closure check: ensure socket instance hasn't been replaced
    if (client.socket !== socket || socket?.readyState !== WebSocket.OPEN) return

    // 2. Cancellation check: verify this operation is still current
    if (client.pendingOperations.get(key) !== instance) return

    // 3. Send with current payload
    socket.send(createRequest(type, buildPayload()))

    // 4. Schedule retry
    setTimeout(() => {
      if (++attemptNo > options.maxOpRetries) {
        console.warn(`Giving up ${type} for channel`, channelID)
        return
      }
      send()
    }, options.opRetryInterval * (attemptNo + 1))
  }

  send()
}

Updated PubSubClient Type:

export type PubSubClient = {
  // ... other fields ...
  
  // Replace pendingSubscriptionMap and pendingUnsubscriptionMap with:
  pendingOperations: Map<string, object>;
  
  // ... rest of fields ...
};

Updated Public Methods:

const publicMethods = {
  // ... other methods ...

  sub(channelID: string) {
    const client = this
    const subKey = pendingOpKey(REQUEST_TYPE.SUB, channelID)
    const unsubKey = pendingOpKey(REQUEST_TYPE.UNSUB, channelID)

    if (!client.pendingOperations.has(subKey) && !client.subscriptionSet.has(channelID)) {
      const instance = {}
      client.pendingOperations.set(subKey, instance)
      client.pendingOperations.delete(unsubKey)

      runWithRetry(client, REQUEST_TYPE.SUB, channelID, instance, () => {
        const kvFilter = client.kvFilter.get(channelID)
        return kvFilter ? { channelID, kvFilter } : { channelID }
      })
    }
  },

  unsub(channelID: string) {
    const client = this
    const subKey = pendingOpKey(REQUEST_TYPE.SUB, channelID)
    const unsubKey = pendingOpKey(REQUEST_TYPE.UNSUB, channelID)

    if (
      !client.pendingOperations.has(unsubKey) &&
      (client.subscriptionSet.has(channelID) || client.pendingOperations.has(subKey))
    ) {
      const instance = {}
      client.pendingOperations.delete(subKey)
      client.pendingOperations.set(unsubKey, instance)
      client.kvFilter.delete(channelID)

      runWithRetry(client, REQUEST_TYPE.UNSUB, channelID, instance, () => ({ channelID }))
    }
  },

  setKvFilter(channelID: string, kvFilter?: string[]) {
    const client = this

    if (kvFilter) {
      client.kvFilter.set(channelID, kvFilter)
    } else {
      client.kvFilter.delete(channelID)
    }

    const unsubKey = pendingOpKey(REQUEST_TYPE.UNSUB, channelID)
    const kvFilterKey = pendingOpKey(REQUEST_TYPE.KV_FILTER, channelID)

    if (client.subscriptionSet.has(channelID) && !client.pendingOperations.has(unsubKey)) {
      const instance = {}
      client.pendingOperations.set(kvFilterKey, instance)
      
      runWithRetry(client, REQUEST_TYPE.KV_FILTER, channelID, instance, () => {
        const kv = client.kvFilter.get(channelID)
        return kv ? { channelID, kvFilter: kv } : { channelID }
      })
    }
  },
}

Updated Event Handlers:

// In close handler:
close(event) {
  // ... existing cleanup code ...

  if (client.shouldReconnect) {
    const instance = {}
    
    // Refresh instances for pending SUB operations
    for (const key of client.pendingOperations.keys()) {
      if (key.startsWith(REQUEST_TYPE.SUB + ':')) {
        client.pendingOperations.set(key, instance)
      }
    }
    
    // Queue subscriptions for re-subscription (unless pending unsub)
    client.subscriptionSet.forEach((channelID) => {
      if (!client.pendingOperations.has(pendingOpKey(REQUEST_TYPE.UNSUB, channelID))) {
        client.pendingOperations.set(pendingOpKey(REQUEST_TYPE.SUB, channelID), instance)
      }
    })
  }

  client.subscriptionSet.clear()
  
  // Clear all UNSUB and KV_FILTER operations (irrelevant when disconnected)
  for (const key of [...client.pendingOperations.keys()]) {
    if (key.startsWith(REQUEST_TYPE.UNSUB + ':') || 
        key.startsWith(REQUEST_TYPE.KV_FILTER + ':')) {
      client.pendingOperations.delete(key)
    }
  }

  // ... reconnection logic ...
},

// In open handler:
open() {
  // ... existing setup code ...

  // Send all pending SUB requests
  for (const [key, instance] of client.pendingOperations) {
    if (key.startsWith(REQUEST_TYPE.SUB + ':')) {
      const channelID = key.slice((REQUEST_TYPE.SUB + ':').length)
      runWithRetry(client, REQUEST_TYPE.SUB, channelID, instance, () => {
        const kvFilter = client.kvFilter.get(channelID)
        return kvFilter ? { channelID, kvFilter } : { channelID }
      })
    }
  }
}

Updated Message Handlers:

[RESPONSE_TYPE.OK]({ data: { type, channelID, kvFilter } }) {
  const client = this

  switch (type) {
    case REQUEST_TYPE.SUB: {
      client.pendingOperations.delete(pendingOpKey(REQUEST_TYPE.SUB, channelID))
      client.subscriptionSet.add(channelID)
      sbp('okTurtles.events/emit', PUBSUB_SUBSCRIPTION_SUCCEEDED, client, { channelID })
      // ... existing kvFilter sync logic ...
      break
    }
    case REQUEST_TYPE.UNSUB: {
      client.pendingOperations.delete(pendingOpKey(REQUEST_TYPE.UNSUB, channelID))
      client.subscriptionSet.delete(channelID)
      console.debug(`[pubsub] Unsubscribed from ${channelID}`)
      break
    }
    case REQUEST_TYPE.KV_FILTER: {
      client.pendingOperations.delete(pendingOpKey(REQUEST_TYPE.KV_FILTER, channelID))
      console.debug(`[pubsub] Set KV filter for ${channelID}`)
      break
    }
    default: {
      console.error(`[pubsub] Malformed response: invalid request type ${type}`)
    }
  }
},

[RESPONSE_TYPE.ERROR]({ data }) {
  const { type, channelID, reason } = data
  // ...
  
  switch (type) {
    case REQUEST_TYPE.SUB: {
      client.pendingOperations.delete(pendingOpKey(REQUEST_TYPE.SUB, channelID))
      break
    }
    case REQUEST_TYPE.UNSUB: {
      client.pendingOperations.delete(pendingOpKey(REQUEST_TYPE.UNSUB, channelID))
      break
    }
    case REQUEST_TYPE.KV_FILTER: {
      client.pendingOperations.delete(pendingOpKey(REQUEST_TYPE.KV_FILTER, channelID))
      break
    }
    // ...
  }
}

Benefits

  1. Single map for all pending operations - No need to create new maps for new message types
  2. Generic runWithRetry - No type-specific logic; accepts any message type and payload builder
  3. Type-safe composite keys - Operations are naturally isolated by type
  4. Lazy payload evaluation - The buildPayload function ensures retries use current state
  5. Easy to extend - Adding a new retriable message type is just a matter of calling runWithRetry with appropriate parameters

@taoeffect
Copy link
Member

Ran the latest changes through Opus 4.5:

Details

Bugs

1. kvFilter deleted prematurely in unsub()

In the new code, kvFilter is deleted immediately when unsub() is called, rather than when the UNSUB operation succeeds:

unsub (channelID: string) {
  // ...
  if (
    !client.pendingOperations.tHas(REQUEST_TYPE.UNSUB, channelID) &&
    // ...
  ) {
    client.pendingOperations.tDelete(REQUEST_TYPE.SUB, channelID)
    client.kvFilter.delete(channelID)  // <-- Deleted immediately

    runWithRetry(client, channelID, REQUEST_TYPE.UNSUB, () => ({ channelID }))
  }
}

The old code deleted kvFilter in the RESPONSE_TYPE.OK handler after the server confirmed the unsubscription. If the UNSUB operation fails after all retries or the user changes their mind and calls sub() again before UNSUB completes, the kvFilter is already lost.

Suggested fix: Move the deletion back to the OK handler:

case REQUEST_TYPE.UNSUB: {
  if (client.pendingOperations.tHas(REQUEST_TYPE.UNSUB, channelID)) {
    console.debug(`[pubsub] Unsubscribed from ${channelID}`)
    client.pendingOperations.tDelete(REQUEST_TYPE.UNSUB, channelID)
    client.subscriptionSet.delete(channelID)
    client.kvFilter.delete(channelID)  // Move deletion here
  } else {
    console.debug(`[pubsub] Received unexpected unsub for ${channelID}`)
  }
  break
}

And remove it from unsub():

if (...) {
  client.pendingOperations.tDelete(REQUEST_TYPE.SUB, channelID)
  // Remove: client.kvFilter.delete(channelID)
  runWithRetry(client, channelID, REQUEST_TYPE.UNSUB, () => ({ channelID }))
}

2. Inconsistent use of this vs client

In the RESPONSE_TYPE.OK handler:

[RESPONSE_TYPE.OK] ({ data: { type, channelID, kvFilter } }) {
  const client = this

  switch (type) {
    case REQUEST_TYPE.SUB: {
      // ...
      if (!isKvFilterFresh(ourKvFilter, kvFilter)) {
        console.debug(`[pubsub] Subscribed to ${channelID}, need to set new KV filter`)
        this.setKvFilter(channelID, ourKvFilter)  // <-- Uses `this`
      }
      // ...
    }

client is defined but this is used. While functionally equivalent here, it's inconsistent with the rest of the function.

Suggested fix:

client.setKvFilter(channelID, ourKvFilter)

Potential Issues

3. Misleading function name

const pubPayloadFactory = (client: PubSubClient, channelID: string) => () => {
  const kvFilter = client.kvFilter.get(channelID)
  return kvFilter ? { kvFilter, channelID } : { channelID } as JSONObject
}

The name pubPayloadFactory suggests it creates payloads for PUB operations, but it's actually used for SUB and KV_FILTER requests.

Suggested fix:

const subPayloadFactory = (client: PubSubClient, channelID: string) => () => {
  const kvFilter = client.kvFilter.get(channelID)
  return kvFilter ? { kvFilter, channelID } : { channelID } as JSONObject
}

4. First retry could trigger with near-zero delay

const minDelay = (attemptNo - 1) * options.opRetryInterval
const maxDelay = attemptNo * options.opRetryInterval
const delay = randomIntFromRange(minDelay, maxDelay)

After the first send (attemptNo = 1), minDelay = 0. This means the first retry could fire almost immediately if the random value is low, potentially causing duplicate sends before the server has a chance to respond.

Suggested fix: Ensure a minimum delay:

const minDelay = Math.max(options.opRetryInterval / 2, (attemptNo - 1) * options.opRetryInterval)
const maxDelay = attemptNo * options.opRetryInterval
const delay = randomIntFromRange(minDelay, maxDelay)

Or simply:

const minDelay = attemptNo * options.opRetryInterval / 2
const maxDelay = attemptNo * options.opRetryInterval

5. Giving up on retries should be more prominent

if (attemptNo++ > options.maxOpRetries) {
  console.warn(`Giving up ${type} for channel`, channelID)
  client.pendingOperations.tDelete(type, channelID)
  return
}

Giving up on operations is a significant failure that could leave the application in an inconsistent state. Consider:

  1. Using console.error instead of console.warn
  2. Emitting an event so the application can respond appropriately

Suggested fix:

if (attemptNo++ > options.maxOpRetries) {
  console.error(`[pubsub] Giving up ${type} for channel`, channelID)
  client.pendingOperations.tDelete(type, channelID)
  sbp('okTurtles.events/emit', PUBSUB_ERROR, client, { 
    type: 'max-retries-exceeded', 
    requestType: type, 
    channelID 
  })
  return
}

6. Missing [pubsub] prefix in log message

console.warn(`Giving up ${type} for channel`, channelID)

Other log messages in the file use the [pubsub] prefix for consistency.

Suggested fix:

console.warn(`[pubsub] Giving up ${type} for channel`, channelID)

Minor Observations

The overall retry architecture looks sound - using reference equality with empty objects for cancellation detection is a clever pattern that avoids needing explicit cancellation tokens. The TieredMap helper class is well-implemented for its purpose.

I also ran it through using a different context that included the full pubsub file and it gave a slightly different review:

Details

Critical Issues

1. Missing kvFilter.delete in UNSUB OK handler

The old code deleted the KV filter when the UNSUB response was received, but this was moved to the unsub() method. However, the RESPONSE_TYPE.OK handler for UNSUB no longer cleans up the filter:

case REQUEST_TYPE.UNSUB: {
  if (client.pendingOperations.tHas(REQUEST_TYPE.UNSUB, channelID)) {
    console.debug(`[pubsub] Unsubscribed from ${channelID}`)
    client.pendingOperations.tDelete(REQUEST_TYPE.UNSUB, channelID)
    client.subscriptionSet.delete(channelID)
    // kvFilter is deleted in unsub() now, but what if unsub response arrives
    // for a channel where sub() was called again before the response?
  }

If this sequence occurs:

  1. unsub(channelID) - deletes kvFilter, starts UNSUB retry
  2. sub(channelID) with a new kvFilter - deletes UNSUB pending, sets new kvFilter, starts SUB retry
  3. Old UNSUB OK response arrives (from server still processing) - tHas(UNSUB) is false, so ignored

Actually, this is handled correctly since the check prevents processing stale responses. However, verify this was the intended behavior change.


Bugs

2. Type assertion only applies to one branch

const pubPayloadFactory = (client: PubSubClient, channelID: string) => () => {
  const kvFilter = client.kvFilter.get(channelID)
  return kvFilter ? { kvFilter, channelID } : { channelID } as JSONObject
}

The as JSONObject cast only applies to { channelID }, not to { kvFilter, channelID }. This could cause TypeScript issues.

Suggested fix:

const pubPayloadFactory = (client: PubSubClient, channelID: string) => (): JSONObject => {
  const kvFilter = client.kvFilter.get(channelID)
  return kvFilter ? { kvFilter, channelID } : { channelID }
}

3. First retry can happen with 0ms delay

// After first send, attemptNo is 1
const minDelay = (attemptNo - 1) * options.opRetryInterval  // (1-1) * 2000 = 0
const maxDelay = attemptNo * options.opRetryInterval        // 1 * 2000 = 2000
const delay = randomIntFromRange(minDelay, maxDelay)        // Could be 0!

The first retry could happen immediately with a 0ms delay, which might cause unnecessary load if the server is just slow to respond.

Suggested fix:

// Ensure minimum delay on first retry
const minDelay = Math.max(options.opRetryInterval / 2, (attemptNo - 1) * options.opRetryInterval)
const maxDelay = attemptNo * options.opRetryInterval

Potential Improvements

4. Retry timeouts are never cancelled

The setTimeout calls in runWithRetry are never cancelled when:

  • The operation succeeds
  • The socket closes
  • destroy() is called

While the checks at the start of send() prevent actual sending, the timeouts remain scheduled and closures stay in memory.

Suggested improvement:

function runWithRetry (
  client: PubSubClient,
  channelID: string,
  type: RequestTypeEnum,
  getPayload: () => JSONObject
) {
  let attemptNo = 0
  let timeoutId: ReturnType<typeof setTimeout> | undefined
  const { socket, options } = client
  const instance = { cancel: () => { clearTimeout(timeoutId) } }

  client.pendingOperations.tSet(type, channelID, instance)

  const send = () => {
    if (client.socket !== socket || socket?.readyState !== WebSocket.OPEN) return

    const currentInstance = client.pendingOperations.tGet(type, channelID)
    if (currentInstance !== instance) return

    if (attemptNo++ > options.maxOpRetries) {
      console.warn(`Giving up ${type} for channel`, channelID)
      client.pendingOperations.tDelete(type, channelID)
      return
    }

    const payload = getPayload()
    socket.send(createRequest(type, payload))

    const minDelay = (attemptNo - 1) * options.opRetryInterval
    const maxDelay = attemptNo * options.opRetryInterval
    const delay = randomIntFromRange(minDelay, maxDelay)
    timeoutId = setTimeout(send, delay)
  }

  send()
}

Then update tDelete usages to cancel the timeout:

// In RESPONSE_TYPE.OK handler
const instance = client.pendingOperations.tGet(REQUEST_TYPE.SUB, channelID)
if (instance && typeof instance.cancel === 'function') instance.cancel()
client.pendingOperations.tDelete(REQUEST_TYPE.SUB, channelID)

5. Misleading function name

pubPayloadFactory creates payloads for SUB and KV_FILTER operations, not PUB operations.

Suggested rename:

const subscriptionPayloadFactory = (client: PubSubClient, channelID: string) => (): JSONObject => {
  const kvFilter = client.kvFilter.get(channelID)
  return kvFilter ? { kvFilter, channelID } : { channelID }
}

6. Consider adding a max total retry duration

With maxOpRetries: 4 and opRetryInterval: 2000, the maximum time to give up is roughly:

  • 0-2s + 2-4s + 4-6s + 6-8s ≈ 12-20 seconds

Consider adding a maxOpRetryDuration option for time-sensitive operations:

const startTime = Date.now()

const send = () => {
  // ... existing checks ...
  
  // Add time-based check
  if (Date.now() - startTime > options.maxOpRetryDuration) {
    console.warn(`Timeout: giving up ${type} for channel`, channelID)
    client.pendingOperations.tDelete(type, channelID)
    return
  }
  // ...
}

Minor Issues

7. Unnecessary arrow function wrapper in setTimeout

setTimeout(() => {
  send()
}, delay)

Can be simplified to:

setTimeout(send, delay)

8. isKvFilterFresh could be clearer

if (!ourKvFilter !== !theirKvFilter) {

This double-negation XOR pattern is hard to read at first glance.

Suggested improvement:

const hasOurs = Array.isArray(ourKvFilter) && ourKvFilter.length > 0
const hasTheirs = Array.isArray(theirKvFilter) && theirKvFilter.length > 0
if (hasOurs !== hasTheirs) {
  return false
}

Or add a comment explaining the logic:

// XOR: return false if exactly one of them is truthy
if (!ourKvFilter !== !theirKvFilter) {

Copy link
Member

@taoeffect taoeffect left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done!

@taoeffect taoeffect merged commit 410a4c4 into main Dec 15, 2025
4 checks passed
@corrideat corrideat deleted the reattempt-pubsub branch December 15, 2025 17:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants