diff --git a/.changeset/dry-streets-exist.md b/.changeset/dry-streets-exist.md new file mode 100644 index 0000000000..0d60838f42 --- /dev/null +++ b/.changeset/dry-streets-exist.md @@ -0,0 +1,5 @@ +--- +'@tanstack/query-core': patch +--- + +Made context.signal consume aware with streamedQuery diff --git a/packages/query-core/src/__tests__/streamedQuery.test.tsx b/packages/query-core/src/__tests__/streamedQuery.test.tsx index 084ea7d9db..6adced18ea 100644 --- a/packages/query-core/src/__tests__/streamedQuery.test.tsx +++ b/packages/query-core/src/__tests__/streamedQuery.test.tsx @@ -329,7 +329,12 @@ describe('streamedQuery', () => { const observer = new QueryObserver(queryClient, { queryKey: key, queryFn: streamedQuery({ - streamFn: () => createAsyncNumberGenerator(3), + streamFn: (context) => { + // just consume the signal + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition + const numbers = context.signal ? 3 : 0 + return createAsyncNumberGenerator(numbers) + }, refetchMode: 'append', }), }) @@ -420,6 +425,42 @@ describe('streamedQuery', () => { }) }) + test('should not abort when signal not consumed', async () => { + const key = queryKey() + const observer = new QueryObserver(queryClient, { + queryKey: key, + queryFn: streamedQuery({ + streamFn: () => createAsyncNumberGenerator(3), + }), + }) + + const unsubscribe = observer.subscribe(vi.fn()) + + expect(queryClient.getQueryState(key)).toMatchObject({ + status: 'pending', + fetchStatus: 'fetching', + data: undefined, + }) + + await vi.advanceTimersByTimeAsync(60) + + expect(queryClient.getQueryState(key)).toMatchObject({ + status: 'success', + fetchStatus: 'fetching', + data: [0], + }) + + unsubscribe() + + await vi.advanceTimersByTimeAsync(50) + + expect(queryClient.getQueryState(key)).toMatchObject({ + status: 'success', + fetchStatus: 'fetching', + data: [0, 1], + }) + }) + test('should support custom reducer', async () => { const key = queryKey() diff --git a/packages/query-core/src/infiniteQueryBehavior.ts b/packages/query-core/src/infiniteQueryBehavior.ts index 476d90ce15..af9c50e550 100644 --- a/packages/query-core/src/infiniteQueryBehavior.ts +++ b/packages/query-core/src/infiniteQueryBehavior.ts @@ -1,4 +1,9 @@ -import { addToEnd, addToStart, ensureQueryFn } from './utils' +import { + addConsumeAwareSignal, + addToEnd, + addToStart, + ensureQueryFn, +} from './utils' import type { QueryBehavior } from './query' import type { InfiniteData, @@ -23,19 +28,11 @@ export function infiniteQueryBehavior( const fetchFn = async () => { let cancelled = false const addSignalProperty = (object: unknown) => { - Object.defineProperty(object, 'signal', { - enumerable: true, - get: () => { - if (context.signal.aborted) { - cancelled = true - } else { - context.signal.addEventListener('abort', () => { - cancelled = true - }) - } - return context.signal - }, - }) + addConsumeAwareSignal( + object, + () => context.signal, + () => (cancelled = true), + ) } const queryFn = ensureQueryFn(context.options, context.fetchOptions) diff --git a/packages/query-core/src/streamedQuery.ts b/packages/query-core/src/streamedQuery.ts index 2eb944d699..788a3c2c74 100644 --- a/packages/query-core/src/streamedQuery.ts +++ b/packages/query-core/src/streamedQuery.ts @@ -1,5 +1,10 @@ -import { addToEnd } from './utils' -import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' +import { addConsumeAwareSignal, addToEnd } from './utils' +import type { + OmitKeyof, + QueryFunction, + QueryFunctionContext, + QueryKey, +} from './types' type BaseStreamedQueryParams = { streamFn: ( @@ -73,10 +78,25 @@ export function streamedQuery< let result = initialValue - const stream = await streamFn(context) + let cancelled = false + const streamFnContext = addConsumeAwareSignal< + OmitKeyof + >( + { + client: context.client, + meta: context.meta, + queryKey: context.queryKey, + pageParam: context.pageParam, + direction: context.direction, + }, + () => context.signal, + () => (cancelled = true), + ) + + const stream = await streamFn(streamFnContext) for await (const chunk of stream) { - if (context.signal.aborted) { + if (cancelled) { break } @@ -90,7 +110,7 @@ export function streamedQuery< } // finalize result: replace-refetching needs to write to the cache - if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) { + if (isRefetch && refetchMode === 'replace' && !cancelled) { context.client.setQueryData(context.queryKey, result) } diff --git a/packages/query-core/src/utils.ts b/packages/query-core/src/utils.ts index dadc6d7bc3..feee9c36c6 100644 --- a/packages/query-core/src/utils.ts +++ b/packages/query-core/src/utils.ts @@ -465,3 +465,33 @@ export function shouldThrowError) => boolean>( return !!throwOnError } + +export function addConsumeAwareSignal( + object: T, + getSignal: () => AbortSignal, + onCancelled: VoidFunction, +): T & { signal: AbortSignal } { + let consumed = false + let signal: AbortSignal | undefined + + Object.defineProperty(object, 'signal', { + enumerable: true, + get: () => { + signal ??= getSignal() + if (consumed) { + return signal + } + + consumed = true + if (signal.aborted) { + onCancelled() + } else { + signal.addEventListener('abort', onCancelled, { once: true }) + } + + return signal + }, + }) + + return object as T & { signal: AbortSignal } +}