Skip to content
Draft
2 changes: 2 additions & 0 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ export interface HandshakeDocument extends Document {
compression: string[];
saslSupportedMechs?: string;
loadBalanced?: boolean;
backpressure: true;
}

/**
Expand All @@ -241,6 +242,7 @@ export async function prepareHandshakeDocument(

const handshakeDoc: HandshakeDocument = {
[serverApi?.version || options.loadBalanced === true ? 'hello' : LEGACY_HELLO_COMMAND]: 1,
backpressure: true,
helloOk: true,
client: clientMetadata,
compression: compressors
Expand Down
3 changes: 3 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,9 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.throwIfAborted();
}
} catch (error) {
if (options.session != null && !(error instanceof MongoServerError)) {
updateSessionFromResponse(options.session, MongoDBResponse.empty);
}
if (this.shouldEmitAndLogCommand) {
this.emitAndLogCommand(
this.monitorCommands,
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export {
MongoWriteConcernError,
WriteConcernErrorResult
} from './error';
export { TokenBucket } from './token_bucket';
export {
AbstractCursor,
// Actual driver classes exported
Expand Down
102 changes: 83 additions & 19 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { setTimeout } from 'node:timers/promises';

import { MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION } from '../cmap/wire_protocol/constants';
import {
isRetryableReadError,
Expand All @@ -10,6 +12,7 @@ import {
MongoInvalidArgumentError,
MongoNetworkError,
MongoNotConnectedError,
MongoOperationTimeoutError,
MongoRuntimeError,
MongoServerError,
MongoTransactionError,
Expand All @@ -26,9 +29,15 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import { abortable, maxWireVersion, supportsRetryableWrites } from '../utils';
import { RETRY_COST, TOKEN_REFRESH_RATE } from '../token_bucket';
import {
abortable,
ExponentialBackoffProvider,
maxWireVersion,
supportsRetryableWrites
} from '../utils';
import { AggregateOperation } from './aggregate';
import { AbstractOperation, Aspect } from './operation';
import { AbstractOperation, Aspect, RetryContext } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
Expand All @@ -50,7 +59,7 @@ type ResultTypeFromOperation<TOperation extends AbstractOperation> = ReturnType<
* The expectation is that this function:
* - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
* - Creates a session if none is provided and cleans up the session it creates
* - Tries an operation and retries under certain conditions, see {@link tryOperation}
* - Tries an operation and retries under certain conditions, see {@link executeOperationWithRetries}
*
* @typeParam T - The operation's type
* @typeParam TResult - The type of the operation's result, calculated from T
Expand Down Expand Up @@ -120,7 +129,7 @@ export async function executeOperation<
});

try {
return await tryOperation(operation, {
return await executeOperationWithRetries(operation, {
topology,
timeoutContext,
session,
Expand Down Expand Up @@ -184,7 +193,10 @@ type RetryOptions = {
*
* @param operation - The operation to execute
* */
async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFromOperation<T>>(
async function executeOperationWithRetries<
T extends AbstractOperation,
TResult = ResultTypeFromOperation<T>
>(
operation: T,
{ topology, timeoutContext, session, readPreference }: RetryOptions
): Promise<TResult> {
Expand Down Expand Up @@ -233,11 +245,27 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
session.incrementTransactionNumber();
}

const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
let previousOperationError: MongoError | undefined;
const deprioritizedServers = new DeprioritizedServers();

for (let tries = 0; tries < maxTries; tries++) {
const backoffDelayProvider = new ExponentialBackoffProvider(
10_000, // MAX_BACKOFF
100, // base backoff
2 // backoff rate
);

const retryContext =
operation.retryContext ??
new RetryContext(willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1);
for (
let attempt = 0;
attempt < retryContext.maxAttempts;
attempt++,
retryContext.maxAttempts =
willRetry && previousOperationError?.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)
? 6
: retryContext.maxAttempts
) {
if (previousOperationError) {
if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
throw new MongoServerError({
Expand All @@ -247,15 +275,39 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
});
}

if (operation.hasAspect(Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
const isRetryable =
// bulk write commands are retryable if all operations in the batch are retryable
(operation.hasAspect(Aspect.COMMAND_BATCHING) && operation.canRetryWrite) ||
// if we have a retryable read or write operation, we can retry
(hasWriteAspect && willRetryWrite && isRetryableWriteError(previousOperationError)) ||
(hasReadAspect && willRetryRead && isRetryableReadError(previousOperationError)) ||
// if we have a retryable, system overloaded error, we can retry
(previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError) &&
previousOperationError.hasErrorLabel(MongoErrorLabel.RetryableError));

if (!isRetryable) {
throw previousOperationError;
}

if (hasWriteAspect && !isRetryableWriteError(previousOperationError))
throw previousOperationError;

if (hasReadAspect && !isRetryableReadError(previousOperationError)) {
throw previousOperationError;
if (previousOperationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
const delayMS = backoffDelayProvider.getNextBackoffDuration();

// if the delay would exhaust the CSOT timeout, short-circuit.
if (timeoutContext.csotEnabled() && delayMS > timeoutContext.remainingTimeMS) {
// TODO: is this the right error to throw?
throw new MongoOperationTimeoutError(
`MongoDB SystemOverload exponential backoff would exceed timeoutMS deadline: remaining CSOT deadline=${timeoutContext.remainingTimeMS}, backoff delayMS=${delayMS}`,
{
cause: previousOperationError
}
);
}

if (!topology.tokenBucket.consume(RETRY_COST)) {
throw previousOperationError;
}

await setTimeout(delayMS);
}

if (
Expand Down Expand Up @@ -285,19 +337,34 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
operation.server = server;

try {
// If tries > 0 and we are command batching we need to reset the batch.
if (tries > 0 && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
const isRetry = attempt > 0;

// If attempt > 0 and we are command batching we need to reset the batch.
if (isRetry && operation.hasAspect(Aspect.COMMAND_BATCHING)) {
operation.resetBatch();
}

try {
const result = await server.command(operation, timeoutContext);
topology.tokenBucket.deposit(
isRetry
? // on successful retry, deposit the retry cost + the refresh rate.
TOKEN_REFRESH_RATE + RETRY_COST
: // otherwise, just deposit the refresh rate.
TOKEN_REFRESH_RATE
);
return operation.handleOk(result);
} catch (error) {
return operation.handleError(error);
}
} catch (operationError) {
if (!(operationError instanceof MongoError)) throw operationError;

if (!operationError.hasErrorLabel(MongoErrorLabel.SystemOverloadedError)) {
// if an operation fails with an error that does not contain the SystemOverloadError, deposit 1 token.
topology.tokenBucket.deposit(RETRY_COST);
}

if (
previousOperationError != null &&
operationError.hasErrorLabel(MongoErrorLabel.NoWritesPerformed)
Expand All @@ -312,8 +379,5 @@ async function tryOperation<T extends AbstractOperation, TResult = ResultTypeFro
}
}

throw (
previousOperationError ??
new MongoRuntimeError('Tried to propagate retryability error, but no error was found.')
);
throw previousOperationError ?? new MongoRuntimeError('ahh');
}
6 changes: 6 additions & 0 deletions src/operations/operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ export interface OperationOptions extends BSONSerializeOptions {
timeoutMS?: number;
}

export class RetryContext {
constructor(public maxAttempts: number) {}
}

/**
* This class acts as a parent class for any operation and is responsible for setting this.options,
* as well as setting and getting a session.
Expand All @@ -66,6 +70,8 @@ export abstract class AbstractOperation<TResult = any> {
/** Specifies the time an operation will run until it throws a timeout error. */
timeoutMS?: number;

retryContext?: RetryContext;

private _session: ClientSession | undefined;

static aspects?: Set<symbol>;
Expand Down
8 changes: 3 additions & 5 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { type Abortable, TypedEventEmitter } from '../mongo_types';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import type { ClientSession } from '../sessions';
import { Timeout, TimeoutContext, TimeoutError } from '../timeout';
import { TokenBucket } from '../token_bucket';
import type { Transaction } from '../transactions';
import {
addAbortListener,
Expand Down Expand Up @@ -207,18 +208,15 @@ export type TopologyEvents = {
* @internal
*/
export class Topology extends TypedEventEmitter<TopologyEvents> {
/** @internal */
s: TopologyPrivate;
/** @internal */
waitQueue: List<ServerSelectionRequest>;
/** @internal */
hello?: Document;
/** @internal */
_type?: string;

tokenBucket = new TokenBucket(1000);

client!: MongoClient;

/** @internal */
private connectionLock?: Promise<Topology>;

/** @event */
Expand Down
55 changes: 39 additions & 16 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
import type { MongoClient, MongoOptions } from './mongo_client';
import { TypedEventEmitter } from './mongo_types';
import { executeOperation } from './operations/execute_operation';
import { RetryContext } from './operations/operation';
import { RunCommandOperation } from './operations/run_command';
import { ReadConcernLevel } from './read_concern';
import { ReadPreference } from './read_preference';
Expand Down Expand Up @@ -466,7 +467,11 @@ export class ClientSession
} else {
const wcKeys = Object.keys(wc);
if (wcKeys.length > 2 || (!wcKeys.includes('wtimeoutMS') && !wcKeys.includes('wTimeoutMS')))
// if the write concern was specified with wTimeoutMS, then we set both wtimeoutMS and wTimeoutMS, guaranteeing at least two keys, so if we have more than two keys, then we can automatically assume that we should add the write concern to the command. If it has 2 or fewer keys, we need to check that those keys aren't the wtimeoutMS or wTimeoutMS options before we add the write concern to the command
// if the write concern was specified with wTimeoutMS, then we set both wtimeoutMS
// and wTimeoutMS, guaranteeing at least two keys, so if we have more than two keys,
// then we can automatically assume that we should add the write concern to the command.
// If it has 2 or fewer keys, we need to check that those keys aren't the wtimeoutMS
// or wTimeoutMS options before we add the write concern to the command
WriteConcern.apply(command, { ...wc, wtimeoutMS: undefined });
}
}
Expand All @@ -487,11 +492,14 @@ export class ClientSession
command.recoveryToken = this.transaction.recoveryToken;
}

const retryContext = new RetryContext(5);

const operation = new RunCommandOperation(new MongoDBNamespace('admin'), command, {
session: this,
readPreference: ReadPreference.primary,
bypassPinningCheck: true
});
operation.retryContext = retryContext;

const timeoutContext =
this.timeoutContext ??
Expand All @@ -516,15 +524,13 @@ export class ClientSession
this.unpin({ force: true });

try {
await executeOperation(
this.client,
new RunCommandOperation(new MongoDBNamespace('admin'), command, {
session: this,
readPreference: ReadPreference.primary,
bypassPinningCheck: true
}),
timeoutContext
);
const op = new RunCommandOperation(new MongoDBNamespace('admin'), command, {
session: this,
readPreference: ReadPreference.primary,
bypassPinningCheck: true
});
op.retryContext = retryContext;
await executeOperation(this.client, op, timeoutContext);
return;
} catch (retryCommitError) {
// If the retry failed, we process that error instead of the original
Expand Down Expand Up @@ -957,6 +963,11 @@ export class ServerSession {
id: ServerSessionId;
lastUse: number;
txnNumber: number;

/*
* Indicates that a network error has been encountered while using this session.
* Once a session is marked as dirty, it is always dirty.
*/
isDirty: boolean;

/** @internal */
Expand Down Expand Up @@ -1050,16 +1061,15 @@ export class ServerSessionPool {
* @param session - The session to release to the pool
*/
release(session: ServerSession): void {
const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;
if (this.client.topology?.loadBalanced) {
if (session.isDirty) return;

if (this.client.topology?.loadBalanced && !sessionTimeoutMinutes) {
this.sessions.unshift(session);
}

if (!sessionTimeoutMinutes) {
return;
}

const sessionTimeoutMinutes = this.client.topology?.logicalSessionTimeoutMinutes ?? 10;

this.sessions.prune(session => session.hasTimedOut(sessionTimeoutMinutes));

if (!session.hasTimedOut(sessionTimeoutMinutes)) {
Expand Down Expand Up @@ -1147,9 +1157,9 @@ export function applySession(
command.autocommit = false;

if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
command.startTransaction = true;

// TODO: read concern only applied if it is not the same as the server's default
const readConcern =
session.transaction.options.readConcern || session?.clientOptions?.readConcern;
if (readConcern) {
Expand Down Expand Up @@ -1185,4 +1195,17 @@ export function updateSessionFromResponse(session: ClientSession, document: Mong
session.snapshotTime = atClusterTime;
}
}

if (session.transaction.state === TxnState.STARTING_TRANSACTION) {
if (document.ok === 1) {
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
} else {
const error = new MongoServerError(document.toObject());
const isBackpressureError = error.hasErrorLabel(MongoErrorLabel.RetryableError);

if (!isBackpressureError) {
session.transaction.transition(TxnState.TRANSACTION_IN_PROGRESS);
}
}
}
}
Loading