Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .changeset/cursor-pagination-loadsubset.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
---
'@tanstack/db': patch
'@tanstack/electric-db-collection': patch
'@tanstack/query-db-collection': patch
---

Enhanced LoadSubsetOptions with separate cursor expressions and offset for flexible pagination.

**⚠️ Breaking Change for Custom Sync Layers / Query Collections:**

`LoadSubsetOptions.where` no longer includes cursor expressions for pagination. If you have a custom sync layer or query collection that implements `loadSubset`, you must now handle pagination separately:

- **Cursor-based pagination:** Use the new `cursor` property (`cursor.whereFrom` and `cursor.whereCurrent`) and combine them with `where` yourself
- **Offset-based pagination:** Use the new `offset` property

Previously, cursor expressions were baked into the `where` clause. Now they are passed separately so sync layers can choose their preferred pagination strategy.

**Changes:**

- Added `CursorExpressions` type with `whereFrom`, `whereCurrent`, and optional `lastKey` properties
- Added `cursor` to `LoadSubsetOptions` for cursor-based pagination (separate from `where`)
- Added `offset` to `LoadSubsetOptions` for offset-based pagination support
- Electric sync layer now makes two parallel `requestSnapshot` calls when cursor is present:
- One for `whereCurrent` (all ties at boundary, no limit)
- One for `whereFrom` (rows after cursor, with limit)
- Query collection serialization now includes `offset` for query key generation
- Added `truncate` event to collections, emitted when synced data is truncated (e.g., after `must-refetch`)
- Fixed `setWindow` pagination: cursor expressions are now correctly built when paging through results
- Fixed offset tracking: `loadNextItems` now passes the correct window offset to prevent incorrect deduplication
- `CollectionSubscriber` now listens for `truncate` events to reset cursor tracking state

**Benefits:**

- Sync layers can choose between cursor-based or offset-based pagination strategies
- Electric can efficiently handle tie-breaking with two targeted requests
- Better separation of concerns between filtering (`where`) and pagination (`cursor`/`offset`)
- `setWindow` correctly triggers backend loading for subsequent pages in multi-column orderBy queries
- Cursor state is properly reset after truncation, preventing stale cursor data from being used
19 changes: 15 additions & 4 deletions docs/collections/query-collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ All direct write methods are available on `collection.utils`:

## QueryFn and Predicate Push-Down

When using `syncMode: 'on-demand'`, the collection automatically pushes down query predicates (where clauses, orderBy, and limit) to your `queryFn`. This allows you to fetch only the data needed for each specific query, rather than fetching the entire dataset.
When using `syncMode: 'on-demand'`, the collection automatically pushes down query predicates (where clauses, orderBy, limit, and offset) to your `queryFn`. This allows you to fetch only the data needed for each specific query, rather than fetching the entire dataset.

### How LoadSubsetOptions Are Passed

Expand All @@ -530,9 +530,13 @@ LoadSubsetOptions are passed to your `queryFn` via the query context's `meta` pr
```typescript
queryFn: async (ctx) => {
// Extract LoadSubsetOptions from the context
const { limit, where, orderBy } = ctx.meta.loadSubsetOptions
const { limit, offset, where, orderBy } = ctx.meta.loadSubsetOptions

// Use these to fetch only the data you need
// - where: filter expression (AST)
// - orderBy: sort expression (AST)
// - limit: maximum number of rows
// - offset: number of rows to skip (for pagination)
// ...
}
```
Expand Down Expand Up @@ -572,7 +576,7 @@ const productsCollection = createCollection(
syncMode: 'on-demand', // Enable predicate push-down

queryFn: async (ctx) => {
const { limit, where, orderBy } = ctx.meta.loadSubsetOptions
const { limit, offset, where, orderBy } = ctx.meta.loadSubsetOptions

// Parse the expressions into simple format
const parsed = parseLoadSubsetOptions({ where, orderBy, limit })
Expand Down Expand Up @@ -605,6 +609,11 @@ const productsCollection = createCollection(
params.set('limit', String(parsed.limit))
}

// Add offset for pagination
if (offset) {
params.set('offset', String(offset))
}

const response = await fetch(`/api/products?${params}`)
return response.json()
},
Expand All @@ -629,6 +638,7 @@ const affordableElectronics = createLiveQueryCollection({

// This triggers a queryFn call with:
// GET /api/products?category=electronics&price_lt=100&sort=price:asc&limit=10
// When paginating, offset is included: &offset=20
```

### Custom Handlers for Complex APIs
Expand Down Expand Up @@ -731,10 +741,11 @@ queryFn: async (ctx) => {
Convenience function that parses all LoadSubsetOptions at once. Good for simple use cases.

```typescript
const { filters, sorts, limit } = parseLoadSubsetOptions(ctx.meta?.loadSubsetOptions)
const { filters, sorts, limit, offset } = parseLoadSubsetOptions(ctx.meta?.loadSubsetOptions)
// filters: [{ field: ['category'], operator: 'eq', value: 'electronics' }]
// sorts: [{ field: ['price'], direction: 'asc', nulls: 'last' }]
// limit: 10
// offset: 20 (for pagination)
```

#### `parseWhereExpression(expr, options)`
Expand Down
10 changes: 10 additions & 0 deletions packages/db-collection-e2e/src/suites/pagination.suite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ export function createPaginationTestSuite(
.limit(10),
)

console.log(`[QUERY DEBUG] query:`, query)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging


await query.preload()
await waitForQueryData(query, { minSize: 10 })

Expand All @@ -433,11 +435,15 @@ export function createPaginationTestSuite(
}
}

console.log(`[QUERY DEBUG] setting window`)

// Move to second page using setWindow
// IMPORTANT: setWindow returns a Promise when loading is required,
// or `true` if data is already available. We verify loading occurs.
const setWindowResult = query.utils.setWindow({ offset: 10, limit: 10 })

console.log(`[QUERY DEBUG] setWindowResult:`, setWindowResult)

// In on-demand mode, moving to offset 10 should trigger loading
// since only the first 10 records were initially loaded
if (setWindowResult !== true) {
Expand All @@ -446,10 +452,14 @@ export function createPaginationTestSuite(
}
await waitForQueryData(query, { minSize: 10 })

console.log(`[QUERY DEBUG] waited for data`, setWindowResult)

// Get second page
const secondPage = Array.from(query.state.values())
expect(secondPage).toHaveLength(10)

console.log(`[QUERY DEBUG] second page:`, secondPage)

// Verify second page ordering
for (let i = 1; i < secondPage.length; i++) {
const prev = secondPage[i - 1]!
Expand Down
10 changes: 10 additions & 0 deletions packages/db/src/collection/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,19 @@ export interface CollectionLoadingSubsetChangeEvent {
loadingSubsetTransition: `start` | `end`
}

/**
* Event emitted when the collection is truncated (all data cleared)
*/
export interface CollectionTruncateEvent {
type: `truncate`
collection: Collection<any, any, any, any, any>
}

export type AllCollectionEvents = {
'status:change': CollectionStatusChangeEvent
'subscribers:change': CollectionSubscribersChangeEvent
'loadingSubset:change': CollectionLoadingSubsetChangeEvent
truncate: CollectionTruncateEvent
} & {
[K in CollectionStatus as `status:${K}`]: CollectionStatusEvent<K>
}
Expand All @@ -56,6 +65,7 @@ export type CollectionEvent =
| CollectionStatusChangeEvent
| CollectionSubscribersChangeEvent
| CollectionLoadingSubsetChangeEvent
| CollectionTruncateEvent

export type CollectionEventHandler<T extends keyof AllCollectionEvents> = (
event: AllCollectionEvents[T],
Expand Down
1 change: 1 addition & 0 deletions packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ export class CollectionImpl<
lifecycle: this._lifecycle,
changes: this._changes,
indexes: this._indexes,
events: this._events,
})
this._sync.setDeps({
collection: this, // Required for passing to config.sync callback
Expand Down
10 changes: 10 additions & 0 deletions packages/db/src/collection/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { CollectionImpl } from './index.js'
import type { CollectionLifecycleManager } from './lifecycle'
import type { CollectionChangesManager } from './changes'
import type { CollectionIndexesManager } from './indexes'
import type { CollectionEventsManager } from './events'

interface PendingSyncedTransaction<
T extends object = Record<string, unknown>,
Expand All @@ -37,6 +38,7 @@ export class CollectionStateManager<
public lifecycle!: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>
public changes!: CollectionChangesManager<TOutput, TKey, TSchema, TInput>
public indexes!: CollectionIndexesManager<TOutput, TKey, TSchema, TInput>
private _events!: CollectionEventsManager

// Core state - make public for testing
public transactions: SortedMap<string, Transaction<any>>
Expand Down Expand Up @@ -79,11 +81,13 @@ export class CollectionStateManager<
lifecycle: CollectionLifecycleManager<TOutput, TKey, TSchema, TInput>
changes: CollectionChangesManager<TOutput, TKey, TSchema, TInput>
indexes: CollectionIndexesManager<TOutput, TKey, TSchema, TInput>
events: CollectionEventsManager
}) {
this.collection = deps.collection
this.lifecycle = deps.lifecycle
this.changes = deps.changes
this.indexes = deps.indexes
this._events = deps.events
}

/**
Expand Down Expand Up @@ -525,6 +529,12 @@ export class CollectionStateManager<
for (const key of changedKeys) {
currentVisibleState.delete(key)
}

// 4) Emit truncate event so subscriptions can reset their cursor tracking state
this._events.emit(`truncate`, {
type: `truncate`,
collection: this.collection,
})
}

for (const operation of transaction.operations) {
Expand Down
121 changes: 64 additions & 57 deletions packages/db/src/collection/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type RequestLimitedSnapshotOptions = {
limit: number
/** All column values for cursor (first value used for local index, all values for sync layer) */
minValues?: Array<unknown>
/** Row offset for offset-based pagination (passed to sync layer) */
offset?: number
}

type CollectionSubscriptionOptions = {
Expand Down Expand Up @@ -63,6 +65,12 @@ export class CollectionSubscription
// Keep track of the keys we've sent (needed for join and orderBy optimizations)
private sentKeys = new Set<string | number>()

// Track the count of rows sent via requestLimitedSnapshot for offset-based pagination
private limitedSnapshotRowCount = 0

// Track the last key sent via requestLimitedSnapshot for cursor-based pagination
private lastSentKey: string | number | undefined

private filteredCallback: (changes: Array<ChangeMessage<any, any>>) => void

private orderByIndex: IndexInterface<string | number> | undefined
Expand Down Expand Up @@ -258,6 +266,7 @@ export class CollectionSubscription
orderBy,
limit,
minValues,
offset,
}: RequestLimitedSnapshotOptions) {
if (!limit) throw new Error(`limit is required`)

Expand Down Expand Up @@ -354,77 +363,75 @@ export class CollectionSubscription
keys = index.take(valuesNeeded(), biggestObservedValue, filterFn)
}

// Track row count for offset-based pagination (before sending to callback)
// Use the current count as the offset for this load
const currentOffset = this.limitedSnapshotRowCount

this.callback(changes)

// Build the WHERE filter for sync layer loadSubset
// buildCursor handles both single-column and multi-column cases
let whereWithValueFilter = where
// Update the row count and last key after sending (for next call's offset/cursor)
this.limitedSnapshotRowCount += changes.length
if (changes.length > 0) {
this.lastSentKey = changes[changes.length - 1]!.key
}

// Build cursor expressions for sync layer loadSubset
// The cursor expressions are separate from the main where clause
// so the sync layer can choose cursor-based or offset-based pagination
let cursorExpressions:
| {
whereFrom: BasicExpression<boolean>
whereCurrent: BasicExpression<boolean>
lastKey?: string | number
}
| undefined

if (minValues !== undefined && minValues.length > 0) {
const cursor = buildCursor(orderBy, minValues)
if (cursor) {
whereWithValueFilter = where ? and(where, cursor) : cursor
const whereFromCursor = buildCursor(orderBy, minValues)

if (whereFromCursor) {
const { expression } = orderBy[0]!
const minValue = minValues[0]

// Build the whereCurrent expression for the first orderBy column
// For Date values, we need to handle precision differences between JS (ms) and backends (μs)
// A JS Date represents a 1ms range, so we query for all values within that range
let whereCurrentCursor: BasicExpression<boolean>
if (minValue instanceof Date) {
const minValuePlus1ms = new Date(minValue.getTime() + 1)
whereCurrentCursor = and(
gte(expression, new Value(minValue)),
lt(expression, new Value(minValuePlus1ms)),
)
} else {
whereCurrentCursor = eq(expression, new Value(minValue))
}

cursorExpressions = {
whereFrom: whereFromCursor,
whereCurrent: whereCurrentCursor,
lastKey: this.lastSentKey,
}
}
}

// Request the sync layer to load more data
// don't await it, we will load the data into the collection when it comes in
const loadOptions1: LoadSubsetOptions = {
where: whereWithValueFilter,
// Note: `where` does NOT include cursor expressions - they are passed separately
// The sync layer can choose to use cursor-based or offset-based pagination
const loadOptions: LoadSubsetOptions = {
where, // Main filter only, no cursor
limit,
orderBy,
cursor: cursorExpressions, // Cursor expressions passed separately
offset: offset ?? currentOffset, // Use provided offset, or auto-tracked offset
subscription: this,
}
const syncResult = this.collection._sync.loadSubset(loadOptions1)
const syncResult = this.collection._sync.loadSubset(loadOptions)

// Track this loadSubset call
this.loadedSubsets.push(loadOptions1)

// Make parallel loadSubset calls for values equal to minValue and values greater than minValue
const promises: Array<Promise<void>> = []

// First promise: load all values equal to minValue
if (typeof minValue !== `undefined`) {
const { expression } = orderBy[0]!

// For Date values, we need to handle precision differences between JS (ms) and backends (μs)
// A JS Date represents a 1ms range, so we query for all values within that range
let exactValueFilter
if (minValue instanceof Date) {
const minValuePlus1ms = new Date(minValue.getTime() + 1)
exactValueFilter = and(
gte(expression, new Value(minValue)),
lt(expression, new Value(minValuePlus1ms)),
)
} else {
exactValueFilter = eq(expression, new Value(minValue))
}

const loadOptions2: LoadSubsetOptions = {
where: exactValueFilter,
subscription: this,
}
const equalValueResult = this.collection._sync.loadSubset(loadOptions2)

// Track this loadSubset call
this.loadedSubsets.push(loadOptions2)

if (equalValueResult instanceof Promise) {
promises.push(equalValueResult)
}
}

// Second promise: load values greater than minValue
if (syncResult instanceof Promise) {
promises.push(syncResult)
}

// Track the combined promise
if (promises.length > 0) {
const combinedPromise = Promise.all(promises).then(() => {})
this.trackLoadSubsetPromise(combinedPromise)
} else {
this.trackLoadSubsetPromise(syncResult)
}
this.loadedSubsets.push(loadOptions)
this.trackLoadSubsetPromise(syncResult)
}

// TODO: also add similar test but that checks that it can also load it from the collection's loadSubset function
Expand Down
1 change: 1 addition & 0 deletions packages/db/src/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export {
minusWherePredicates,
isOrderBySubset,
isLimitSubset,
isOffsetLimitSubset,
isPredicateSubset,
} from './predicate-utils.js'

Expand Down
Loading
Loading