diff --git a/aggregation_mode/Cargo.lock b/aggregation_mode/Cargo.lock index 6e424297a..f709785cd 100644 --- a/aggregation_mode/Cargo.lock +++ b/aggregation_mode/Cargo.lock @@ -3262,8 +3262,10 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" name = "db" version = "0.1.0" dependencies = [ + "serde", "sqlx", "tokio", + "tracing", ] [[package]] @@ -4495,6 +4497,7 @@ dependencies = [ "aligned-sdk", "alloy", "bincode", + "db", "hex", "serde", "serde_json", @@ -6800,6 +6803,7 @@ dependencies = [ "actix-web", "aligned-sdk", "alloy", + "db", "hex", "serde", "serde_json", diff --git a/aggregation_mode/db/Cargo.toml b/aggregation_mode/db/Cargo.toml index 47b3b89e3..3b5ad05f7 100644 --- a/aggregation_mode/db/Cargo.toml +++ b/aggregation_mode/db/Cargo.toml @@ -4,10 +4,10 @@ version = "0.1.0" edition = "2021" [dependencies] +serde = { workspace = true } tokio = { version = "1"} -# TODO: enable tls -sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] } - +sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" , "uuid", "bigdecimal"] } +tracing = { version = "0.1", features = ["log"] } [[bin]] name = "migrate" diff --git a/aggregation_mode/db/src/lib.rs b/aggregation_mode/db/src/lib.rs index cd408564e..a87f9d707 100644 --- a/aggregation_mode/db/src/lib.rs +++ b/aggregation_mode/db/src/lib.rs @@ -1 +1,3 @@ +pub mod orchestrator; +pub mod retry; pub mod types; diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs new file mode 100644 index 000000000..54b0ae523 --- /dev/null +++ b/aggregation_mode/db/src/orchestrator.rs @@ -0,0 +1,253 @@ +use std::{ + future::Future, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; + +use crate::retry::{RetryConfig, RetryError}; + +#[derive(Debug, Clone, Copy)] +enum Operation { + Read, + Write, +} + +/// A single DB node: connection pool plus shared health flags (used to prioritize nodes). +#[derive(Debug)] +struct DbNode { + pool: Pool, + last_read_failed: AtomicBool, + last_write_failed: AtomicBool, +} + +/// Database orchestrator for running reads/writes across multiple PostgreSQL nodes with retry/backoff. +/// +/// `DbOrchestartor` holds a list of database nodes (connection pools) and will: +/// - try nodes in a preferred order (healthy nodes first, then recently-failed nodes), +/// - mark nodes as failed on connection-type errors, +/// - retry transient failures with exponential backoff based on `retry_config`, +/// +/// ## Thread-safe `Clone` +/// This type is cheap and thread-safe to clone: +/// - `nodes` is `Vec>`, so cloning only increments `Arc` ref-counts and shares the same pools/nodes, +/// - `sqlx::Pool` is internally reference-counted and designed to be cloned and used concurrently, +/// - the node health flags are `AtomicBool`, so updates are safe from multiple threads/tasks. +/// +/// Clones share health state (the atomics) and the underlying pools, so all clones observe and influence +/// the same “preferred node” ordering decisions. +#[derive(Debug, Clone)] +pub struct DbOrchestartor { + nodes: Vec>, + retry_config: RetryConfig, +} + +#[derive(Debug)] +pub enum DbOrchestartorError { + InvalidNumberOfConnectionUrls, + Sqlx(sqlx::Error), +} + +impl std::fmt::Display for DbOrchestartorError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidNumberOfConnectionUrls => { + write!(f, "invalid number of connection URLs") + } + Self::Sqlx(e) => write!(f, "{e}"), + } + } +} + +impl DbOrchestartor { + pub fn try_new( + connection_urls: &[String], + retry_config: RetryConfig, + ) -> Result { + if connection_urls.is_empty() { + return Err(DbOrchestartorError::InvalidNumberOfConnectionUrls); + } + + let nodes = connection_urls + .iter() + .map(|url| { + let pool = PgPoolOptions::new().max_connections(5).connect_lazy(url)?; + + Ok(Arc::new(DbNode { + pool, + last_read_failed: AtomicBool::new(false), + last_write_failed: AtomicBool::new(false), + })) + }) + .collect::, sqlx::Error>>() + .map_err(DbOrchestartorError::Sqlx)?; + + Ok(Self { + nodes, + retry_config, + }) + } + + pub async fn write(&self, query: Q) -> Result + where + Q: Fn(Pool) -> Fut, + Fut: Future>, + { + self.query::(query, Operation::Write).await + } + + pub async fn read(&self, query: Q) -> Result + where + Q: Fn(Pool) -> Fut, + Fut: Future>, + { + self.query::(query, Operation::Read).await + } + + async fn query(&self, query_fn: Q, operation: Operation) -> Result + where + Q: Fn(Pool) -> Fut, + Fut: Future>, + { + let mut attempts = 0; + let mut delay = Duration::from_millis(self.retry_config.min_delay_millis); + + loop { + match self.execute_once(&query_fn, operation).await { + Ok(value) => return Ok(value), + Err(RetryError::Permanent(err)) => return Err(err), + Err(RetryError::Transient(err)) => { + if attempts >= self.retry_config.max_delay_seconds { + return Err(err); + } + + tracing::warn!(attempt = attempts, delay_milis = delay.as_millis(), error = ?err, "retrying after backoff"); + tokio::time::sleep(delay).await; + delay = self.next_backoff_delay(delay); + attempts += 1; + } + } + } + } + + // Exponential backoff with a hard cap. + // + // Each retry multiplies the previous delay by `retry_config.factor`, + // then clamps it to `max_delay_seconds`. This yields: + // + // d_{n+1} = min(max, d_n * factor) => d_n = min(max, d_initial * factor^n) + // + // Example starting at 500ms with factor = 2.0 (no jitter): + // retry 0: 0.5s + // retry 1: 1.0s + // retry 2: 2.0s + // retry 3: 4.0s + // retry 4: 8.0s + // ... + // until the delay reaches `max_delay_seconds`, after which it stays at that max. + // see reference: https://en.wikipedia.org/wiki/Exponential_backoff + // and here: https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/retry-backoff.html + fn next_backoff_delay(&self, current_delay: Duration) -> Duration { + let max: Duration = Duration::from_secs(self.retry_config.max_delay_seconds); + // Defensive: factor should be >= 1.0 for backoff, we clamp it to avoid shrinking/NaN. + let factor = f64::from(self.retry_config.factor).max(1.0); + + let scaled_secs = current_delay.as_secs_f64() * factor; + let scaled_secs = if scaled_secs.is_finite() { + scaled_secs + } else { + max.as_secs_f64() + }; + + let scaled = Duration::from_secs_f64(scaled_secs); + if scaled > max { + max + } else { + scaled + } + } + + async fn execute_once( + &self, + query_fn: &Q, + operation: Operation, + ) -> Result> + where + Q: Fn(Pool) -> Fut, + Fut: Future>, + { + let mut last_error = None; + + for idx in self.preferred_order(operation) { + let node = &self.nodes[idx]; + let pool = node.pool.clone(); + + match query_fn(pool).await { + Ok(res) => { + match operation { + Operation::Read => node.last_read_failed.store(false, Ordering::Relaxed), + Operation::Write => node.last_write_failed.store(false, Ordering::Relaxed), + }; + return Ok(res); + } + Err(err) => { + if Self::is_connection_error(&err) { + tracing::warn!(node_index = idx, error = ?err, "database query failed"); + match operation { + Operation::Read => node.last_read_failed.store(true, Ordering::Relaxed), + Operation::Write => { + node.last_write_failed.store(true, Ordering::Relaxed) + } + }; + last_error = Some(err); + } else { + return Err(RetryError::Permanent(err)); + } + } + }; + } + + Err(RetryError::Transient( + last_error.expect("write_op attempted without database nodes"), + )) + } + + fn preferred_order(&self, operation: Operation) -> Vec { + let mut preferred = Vec::with_capacity(self.nodes.len()); + let mut fallback = Vec::new(); + + for (idx, node) in self.nodes.iter().enumerate() { + let failed = match operation { + Operation::Read => node.last_read_failed.load(Ordering::Relaxed), + Operation::Write => node.last_write_failed.load(Ordering::Relaxed), + }; + + if failed { + fallback.push(idx); + } else { + preferred.push(idx); + } + } + + preferred.extend(fallback); + preferred + } + + fn is_connection_error(error: &sqlx::Error) -> bool { + matches!( + error, + sqlx::Error::Io(_) + | sqlx::Error::Tls(_) + | sqlx::Error::Protocol(_) + | sqlx::Error::PoolTimedOut + | sqlx::Error::PoolClosed + | sqlx::Error::WorkerCrashed + | sqlx::Error::BeginFailed + | sqlx::Error::Database(_) + ) + } +} diff --git a/aggregation_mode/db/src/retry.rs b/aggregation_mode/db/src/retry.rs new file mode 100644 index 000000000..adec8433d --- /dev/null +++ b/aggregation_mode/db/src/retry.rs @@ -0,0 +1,28 @@ +#[derive(Debug)] +pub(super) enum RetryError { + Transient(E), + Permanent(E), +} + +impl std::fmt::Display for RetryError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + RetryError::Transient(e) => write!(f, "{e}"), + RetryError::Permanent(e) => write!(f, "{e}"), + } + } +} + +impl std::error::Error for RetryError where E: std::fmt::Debug {} + +#[derive(Debug, Clone)] +pub struct RetryConfig { + /// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds) + pub min_delay_millis: u64, + /// * `factor` - Exponential backoff multiplier for retry delays + pub factor: f32, + /// * `max_times` - Maximum number of retry attempts + pub max_times: usize, + /// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds) + pub max_delay_seconds: u64, +} diff --git a/aggregation_mode/db/src/types.rs b/aggregation_mode/db/src/types.rs index 676a780a4..6b186e6cf 100644 --- a/aggregation_mode/db/src/types.rs +++ b/aggregation_mode/db/src/types.rs @@ -4,7 +4,7 @@ use sqlx::{ Type, }; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Type)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Type, serde::Serialize)] #[sqlx(type_name = "task_status", rename_all = "lowercase")] pub enum TaskStatus { Pending, diff --git a/aggregation_mode/gateway/Cargo.toml b/aggregation_mode/gateway/Cargo.toml index 79ef53351..927b11bc9 100644 --- a/aggregation_mode/gateway/Cargo.toml +++ b/aggregation_mode/gateway/Cargo.toml @@ -10,6 +10,7 @@ serde_yaml = { workspace = true } agg_mode_sdk = { path = "../sdk"} aligned-sdk = { workspace = true } sp1-sdk = { workspace = true } +db = { workspace = true } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } bincode = "1.3.3" diff --git a/aggregation_mode/gateway/src/config.rs b/aggregation_mode/gateway/src/config.rs index 444882ab5..8c3976671 100644 --- a/aggregation_mode/gateway/src/config.rs +++ b/aggregation_mode/gateway/src/config.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Config { pub port: u16, - pub db_connection_url: String, + pub db_connection_urls: Vec, pub network: String, pub max_daily_proofs_per_user: i64, } diff --git a/aggregation_mode/gateway/src/db.rs b/aggregation_mode/gateway/src/db.rs index 94809f234..816110d84 100644 --- a/aggregation_mode/gateway/src/db.rs +++ b/aggregation_mode/gateway/src/db.rs @@ -1,12 +1,40 @@ -use sqlx::{ - postgres::PgPoolOptions, - types::{BigDecimal, Uuid}, - Pool, Postgres, -}; +use crate::types::Receipt; +use db::{orchestrator::DbOrchestartor, retry::RetryConfig}; +use sqlx::types::{BigDecimal, Uuid}; + +// Retry/backoff behavior summary (see +// aggregation_mode/db/src/orchestrator.rs:next_back_off_delay for implementation) +// +// NOTE: These retry limits are intentionally lower than in other crates. +// This code runs in an HTTP server; in the worst case the request fails fast +// and the client can retry the request. Prolonged blocking retries here are +// less critical than in background or batch processing jobs. +// +// 1) Max wait time between failures if all retries fail: +// The sleep between retries is capped at 10 seconds (RETRY_MAX_DELAY_SECONDS). +// +// 2) Wait before each retry attempt with the current config +// (start = 500ms, factor = 2.0, max retries = 4): +// +// retry 1: 0.5s +// retry 2: 1.0s +// retry 3: 2.0s +// retry 4: 4.0s +// +// Worst-case total sleep time across all retries: 7.5 seconds, +// plus the execution time of each DB attempt. +/// Initial delay before first retry attempt (in milliseconds) +const RETRY_MIN_DELAY_MILLIS: u64 = 500; +/// Exponential backoff multiplier for retry delays +const RETRY_FACTOR: f32 = 2.0; +/// Maximum number of retry attempts +const RETRY_MAX_TIMES: usize = 4; +/// Maximum delay between retry attempts (in seconds) +const RETRY_MAX_DELAY_SECONDS: u64 = 10; #[derive(Clone, Debug)] pub struct Db { - pool: Pool, + orchestrator: DbOrchestartor, } #[derive(Debug, Clone)] @@ -14,52 +42,51 @@ pub enum DbError { ConnectError(String), } -#[derive(Debug, Clone, sqlx::Type, serde::Serialize)] -#[sqlx(type_name = "task_status")] -#[sqlx(rename_all = "lowercase")] -pub enum TaskStatus { - Pending, - Processing, - Verified, -} - -#[derive(Debug, Clone, sqlx::FromRow, sqlx::Type, serde::Serialize)] -pub struct Receipt { - pub status: TaskStatus, - pub merkle_path: Option>, - pub nonce: i64, - pub address: String, -} - impl Db { - pub async fn try_new(connection_url: &str) -> Result { - let pool = PgPoolOptions::new() - .max_connections(5) - .connect(connection_url) - .await - .map_err(|e| DbError::ConnectError(e.to_string()))?; + pub async fn try_new(connection_urls: &[String]) -> Result { + let orchestrator = DbOrchestartor::try_new( + connection_urls, + RetryConfig { + min_delay_millis: RETRY_MIN_DELAY_MILLIS, + factor: RETRY_FACTOR, + max_times: RETRY_MAX_TIMES, + max_delay_seconds: RETRY_MAX_DELAY_SECONDS, + }, + ) + .map_err(|e| DbError::ConnectError(e.to_string()))?; - Ok(Self { pool }) + Ok(Self { orchestrator }) } pub async fn count_tasks_by_address(&self, address: &str) -> Result { - let (count,) = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM tasks WHERE address = $1") - .bind(address.to_lowercase()) - .fetch_one(&self.pool) - .await?; + self.orchestrator + .read(async |pool| { + let (count,) = + sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM tasks WHERE address = $1") + .bind(address.to_lowercase()) + .fetch_one(&pool) + .await?; - Ok(count) + Ok(count) + }) + .await } pub async fn get_merkle_path_by_task_id( &self, task_id: Uuid, ) -> Result>, sqlx::Error> { - sqlx::query_scalar::<_, Option>>("SELECT merkle_path FROM tasks WHERE task_id = $1") - .bind(task_id) - .fetch_optional(&self.pool) + self.orchestrator + .read(async |pool| { + sqlx::query_scalar::<_, Option>>( + "SELECT merkle_path FROM tasks WHERE task_id = $1", + ) + .bind(task_id) + .fetch_optional(&pool) + .await + .map(|res| res.flatten()) + }) .await - .map(|res| res.flatten()) } pub async fn get_tasks_by_address_and_nonce( @@ -67,16 +94,20 @@ impl Db { address: &str, nonce: i64, ) -> Result, sqlx::Error> { - sqlx::query_as::<_, Receipt>( - "SELECT status,merkle_path,nonce,address FROM tasks - WHERE address = $1 - AND nonce = $2 - ORDER BY nonce DESC", - ) - .bind(address.to_lowercase()) - .bind(nonce) - .fetch_all(&self.pool) - .await + self.orchestrator + .read(async |pool| { + sqlx::query_as::<_, Receipt>( + "SELECT status,merkle_path,nonce,address FROM tasks + WHERE address = $1 + AND nonce = $2 + ORDER BY nonce DESC", + ) + .bind(address.to_lowercase()) + .bind(nonce) + .fetch_all(&pool) + .await + }) + .await } pub async fn get_tasks_by_address_with_limit( @@ -84,28 +115,36 @@ impl Db { address: &str, limit: i64, ) -> Result, sqlx::Error> { - sqlx::query_as::<_, Receipt>( - "SELECT status,merkle_path,nonce,address FROM tasks - WHERE address = $1 - ORDER BY nonce DESC - LIMIT $2", - ) - .bind(address.to_lowercase()) - .bind(limit) - .fetch_all(&self.pool) - .await + self.orchestrator + .read(async |pool| { + sqlx::query_as::<_, Receipt>( + "SELECT status,merkle_path,nonce,address FROM tasks + WHERE address = $1 + ORDER BY nonce DESC + LIMIT $2", + ) + .bind(address.to_lowercase()) + .bind(limit) + .fetch_all(&pool) + .await + }) + .await } pub async fn get_daily_tasks_by_address(&self, address: &str) -> Result { - sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) - FROM tasks - WHERE address = $1 - AND inserted_at::date = CURRENT_DATE", - ) - .bind(address.to_lowercase()) - .fetch_one(&self.pool) - .await + self.orchestrator + .read(async |pool| { + sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) + FROM tasks + WHERE address = $1 + AND inserted_at::date = CURRENT_DATE", + ) + .bind(address.to_lowercase()) + .fetch_one(&pool) + .await + }) + .await } pub async fn insert_task( @@ -117,25 +156,29 @@ impl Db { merkle_path: Option<&[u8]>, nonce: i64, ) -> Result { - sqlx::query_scalar::<_, Uuid>( - "INSERT INTO tasks ( - address, - proving_system_id, - proof, - program_commitment, - merkle_path, - nonce - ) VALUES ($1, $2, $3, $4, $5, $6) - RETURNING task_id", - ) - .bind(address.to_lowercase()) - .bind(proving_system_id) - .bind(proof) - .bind(program_commitment) - .bind(merkle_path) - .bind(nonce) - .fetch_one(&self.pool) - .await + self.orchestrator + .write(async |pool| { + sqlx::query_scalar::<_, Uuid>( + "INSERT INTO tasks ( + address, + proving_system_id, + proof, + program_commitment, + merkle_path, + nonce + ) VALUES ($1, $2, $3, $4, $5, $6) + RETURNING task_id", + ) + .bind(address.to_lowercase()) + .bind(proving_system_id) + .bind(proof) + .bind(program_commitment) + .bind(merkle_path) + .bind(nonce) + .fetch_one(&pool) + .await + }) + .await } pub async fn has_active_payment_event( @@ -143,15 +186,19 @@ impl Db { address: &str, epoch: BigDecimal, ) -> Result { - sqlx::query_scalar::<_, bool>( - "SELECT EXISTS ( - SELECT 1 FROM payment_events - WHERE address = $1 AND started_at < $2 AND $2 < valid_until - )", - ) - .bind(address.to_lowercase()) - .bind(epoch) - .fetch_one(&self.pool) - .await + self.orchestrator + .read(async |pool| { + sqlx::query_scalar::<_, bool>( + "SELECT EXISTS ( + SELECT 1 FROM payment_events + WHERE address = $1 AND started_at < $2 AND $2 < valid_until + )", + ) + .bind(address.to_lowercase()) + .bind(&epoch) + .fetch_one(&pool) + .await + }) + .await } } diff --git a/aggregation_mode/gateway/src/main.rs b/aggregation_mode/gateway/src/main.rs index 19e6ce05f..b017936ae 100644 --- a/aggregation_mode/gateway/src/main.rs +++ b/aggregation_mode/gateway/src/main.rs @@ -26,7 +26,7 @@ async fn main() { let config = Config::from_file(&config_file_path).expect("Config is valid"); tracing::info!("Config loaded"); - let db = Db::try_new(&config.db_connection_url) + let db = Db::try_new(config.db_connection_urls.as_slice()) .await .expect("db to start"); diff --git a/aggregation_mode/gateway/src/types.rs b/aggregation_mode/gateway/src/types.rs index 7826645ab..d89966f3e 100644 --- a/aggregation_mode/gateway/src/types.rs +++ b/aggregation_mode/gateway/src/types.rs @@ -1,9 +1,8 @@ use actix_multipart::form::{tempfile::TempFile, text::Text, MultipartForm}; +use db::types::TaskStatus; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::db::TaskStatus; - #[derive(Serialize, Deserialize)] pub(super) struct AppResponse { status: u16, @@ -62,3 +61,11 @@ pub struct GetReceiptsResponse { pub nonce: i64, pub address: String, } + +#[derive(Debug, Clone, sqlx::FromRow, sqlx::Type, serde::Serialize)] +pub struct Receipt { + pub status: TaskStatus, + pub merkle_path: Option>, + pub nonce: i64, + pub address: String, +} diff --git a/aggregation_mode/payments_poller/Cargo.toml b/aggregation_mode/payments_poller/Cargo.toml index c145fd02e..7ed53ff80 100644 --- a/aggregation_mode/payments_poller/Cargo.toml +++ b/aggregation_mode/payments_poller/Cargo.toml @@ -8,6 +8,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } aligned-sdk = { workspace = true } +db = { workspace = true } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } actix-web = "4" diff --git a/aggregation_mode/payments_poller/src/config.rs b/aggregation_mode/payments_poller/src/config.rs index 5ed4a26d3..1fd15ed98 100644 --- a/aggregation_mode/payments_poller/src/config.rs +++ b/aggregation_mode/payments_poller/src/config.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Config { - pub db_connection_url: String, + pub db_connection_urls: Vec, pub eth_rpc_url: String, pub payment_service_address: String, } diff --git a/aggregation_mode/payments_poller/src/db.rs b/aggregation_mode/payments_poller/src/db.rs index 5403b1b23..518fbac0c 100644 --- a/aggregation_mode/payments_poller/src/db.rs +++ b/aggregation_mode/payments_poller/src/db.rs @@ -1,8 +1,35 @@ -use sqlx::{postgres::PgPoolOptions, types::BigDecimal, Pool, Postgres}; +use db::{orchestrator::DbOrchestartor, retry::RetryConfig}; +use sqlx::types::BigDecimal; + +// Retry/backoff behavior summary for DB queries (see +// aggregation_mode/db/src/orchestrator.rs:next_back_off_delay for implementation) +// +// 1) Max wait time between failures if all retries fail: +// The sleep between retries is capped at 30 seconds (RETRY_MAX_DELAY_SECONDS). +// +// 2) Wait before each retry attempt with the current config +// (start = 500ms, factor = 4.0, max retries = 5): +// +// retry 1: 0.5s +// retry 2: 2.0s +// retry 3: 8.0s +// retry 4: 30s (capped; 32s would have been next) +// retry 5: 30s +// +// Worst-case total sleep time across all retries: 70.5 seconds -> 5 blocks of ethereum waiting, +// plus the execution time of each DB attempt. +/// Initial delay before first retry attempt (in milliseconds) +const RETRY_MIN_DELAY_MILLIS: u64 = 500; +/// Exponential backoff multiplier for retry delays +const RETRY_FACTOR: f32 = 4.0; +/// Maximum number of retry attempts +const RETRY_MAX_TIMES: usize = 5; +/// Maximum delay between retry attempts (in seconds) +const RETRY_MAX_DELAY_SECONDS: u64 = 30; #[derive(Clone, Debug)] pub struct Db { - pool: Pool, + orchestartor: DbOrchestartor, } #[derive(Debug, Clone)] @@ -11,14 +38,19 @@ pub enum DbError { } impl Db { - pub async fn try_new(connection_url: &str) -> Result { - let pool = PgPoolOptions::new() - .max_connections(5) - .connect(connection_url) - .await - .map_err(|e| DbError::ConnectError(e.to_string()))?; + pub async fn try_new(connection_urls: &[String]) -> Result { + let orchestartor = DbOrchestartor::try_new( + connection_urls, + RetryConfig { + min_delay_millis: RETRY_MIN_DELAY_MILLIS, + factor: RETRY_FACTOR, + max_times: RETRY_MAX_TIMES, + max_delay_seconds: RETRY_MAX_DELAY_SECONDS, + }, + ) + .map_err(|e| DbError::ConnectError(e.to_string()))?; - Ok(Self { pool }) + Ok(Self { orchestartor }) } pub async fn insert_payment_event( @@ -29,18 +61,23 @@ impl Db { valid_until: &BigDecimal, tx_hash: &str, ) -> Result<(), sqlx::Error> { - sqlx::query( - "INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (tx_hash) DO NOTHING", - ) - .bind(address.to_lowercase()) - .bind(started_at) - .bind(amount) - .bind(valid_until) - .bind(tx_hash) - .execute(&self.pool) - .await - .map(|_| ()) + self.orchestartor + .write(async |pool| { + sqlx::query( + "INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (tx_hash) DO NOTHING", + ) + .bind(address.to_lowercase()) + .bind(started_at) + .bind(amount) + .bind(valid_until) + .bind(tx_hash) + .execute(&pool) + .await?; + + Ok(()) + }) + .await } } diff --git a/aggregation_mode/payments_poller/src/main.rs b/aggregation_mode/payments_poller/src/main.rs index 2bbfea697..a6f21d0c6 100644 --- a/aggregation_mode/payments_poller/src/main.rs +++ b/aggregation_mode/payments_poller/src/main.rs @@ -26,7 +26,7 @@ async fn main() { let config = Config::from_file(&config_file_path).expect("Config is valid"); tracing::info!("Config loaded"); - let db = Db::try_new(&config.db_connection_url) + let db = Db::try_new(config.db_connection_urls.as_slice()) .await .expect("db to start"); diff --git a/aggregation_mode/proof_aggregator/src/backend/config.rs b/aggregation_mode/proof_aggregator/src/backend/config.rs index 8126db06c..41081afc7 100644 --- a/aggregation_mode/proof_aggregator/src/backend/config.rs +++ b/aggregation_mode/proof_aggregator/src/backend/config.rs @@ -20,7 +20,7 @@ pub struct Config { pub risc0_chunk_aggregator_image_id: String, pub sp1_chunk_aggregator_vk_hash: String, pub monthly_budget_eth: f64, - pub db_connection_url: String, + pub db_connection_urls: Vec, } impl Config { diff --git a/aggregation_mode/proof_aggregator/src/backend/db.rs b/aggregation_mode/proof_aggregator/src/backend/db.rs index feeaa0db2..25f73a5bc 100644 --- a/aggregation_mode/proof_aggregator/src/backend/db.rs +++ b/aggregation_mode/proof_aggregator/src/backend/db.rs @@ -1,82 +1,118 @@ -use db::types::Task; -use sqlx::{postgres::PgPoolOptions, types::Uuid, Pool, Postgres}; +use db::{orchestrator::DbOrchestartor, retry::RetryConfig, types::Task}; +use sqlx::types::Uuid; -#[derive(Clone, Debug)] +// Retry/backoff behavior summary (see +// aggregation_mode/db/src/orchestrator.rs:next_back_off_delay for implementation) +// +// 1) Max wait time between failures if all retries fail: +// The sleep between retries is capped at 30 seconds (RETRY_MAX_DELAY_SECONDS). +// +// 2) Wait before each retry attempt with the current config +// (start = 500ms, factor = 5.0, max retries = 10): +// +// retry 1: 0.5s +// retry 2: 2.5s +// retry 3: 12.5s +// retry 4: 30s (capped) +// retry 5–10: 30s each +// +// Worst-case total sleep time across all retries: ~3m 48s, +// plus the execution time of each DB attempt. +/// Initial delay before first retry attempt (in milliseconds) +const RETRY_MIN_DELAY_MILLIS: u64 = 500; +/// Exponential backoff multiplier for retry delays +const RETRY_FACTOR: f32 = 5.0; +/// Maximum number of retry attempts +const RETRY_MAX_TIMES: usize = 10; +/// Maximum delay between retry attempts (in seconds) +const RETRY_MAX_DELAY_SECONDS: u64 = 30; + +#[derive(Debug, Clone)] pub struct Db { - pool: Pool, + orchestrator: DbOrchestartor, } #[derive(Debug, Clone)] pub enum DbError { - ConnectError(String), + Creation(String), Query(String), } impl Db { - pub async fn try_new(connection_url: &str) -> Result { - let pool = PgPoolOptions::new() - .max_connections(5) - .connect(connection_url) - .await - .map_err(|e| DbError::ConnectError(e.to_string()))?; + pub async fn try_new(connection_urls: &[String]) -> Result { + let orchestrator = DbOrchestartor::try_new( + connection_urls, + RetryConfig { + min_delay_millis: RETRY_MIN_DELAY_MILLIS, + factor: RETRY_FACTOR, + max_times: RETRY_MAX_TIMES, + max_delay_seconds: RETRY_MAX_DELAY_SECONDS, + }, + ) + .map_err(|e| DbError::Creation(e.to_string()))?; - Ok(Self { pool }) + Ok(Self { orchestrator }) } pub async fn get_pending_tasks_and_mark_them_as_processing( - &self, + &mut self, proving_system_id: i32, limit: i64, ) -> Result, DbError> { - sqlx::query_as::<_, Task>( - "WITH selected AS ( - SELECT task_id - FROM tasks - WHERE proving_system_id = $1 AND status = 'pending' - LIMIT $2 - FOR UPDATE SKIP LOCKED + self.orchestrator + .write(async |pool| { + sqlx::query_as::<_, Task>( + "WITH selected AS ( + SELECT task_id + FROM tasks + WHERE proving_system_id = $1 AND status = 'pending' + LIMIT $2 + FOR UPDATE SKIP LOCKED + ) + UPDATE tasks t + SET status = 'processing' + FROM selected s + WHERE t.task_id = s.task_id + RETURNING t.*;", ) - UPDATE tasks t - SET status = 'processing' - FROM selected s - WHERE t.task_id = s.task_id - RETURNING t.*;", - ) - .bind(proving_system_id) - .bind(limit) - .fetch_all(&self.pool) - .await - .map_err(|e| DbError::Query(e.to_string())) + .bind(proving_system_id) + .bind(limit) + .fetch_all(&pool) + .await + }) + .await + .map_err(|e| DbError::Query(e.to_string())) } pub async fn insert_tasks_merkle_path_and_mark_them_as_verified( - &self, + &mut self, updates: Vec<(Uuid, Vec)>, ) -> Result<(), DbError> { - let mut tx = self - .pool - .begin() - .await - .map_err(|e| DbError::Query(e.to_string()))?; + let updates_ref = &updates; - for (task_id, merkle_path) in updates { - if let Err(e) = sqlx::query( - "UPDATE tasks SET merkle_path = $1, status = 'verified', proof = NULL WHERE task_id = $2", - ) - .bind(merkle_path) - .bind(task_id) - .execute(&mut *tx) - .await - { - tx.rollback() - .await - .map_err(|e| DbError::Query(e.to_string()))?; - tracing::error!("Error while updating task merkle path and status {}", e); - return Err(DbError::Query(e.to_string())); - } - } + self.orchestrator + .write(|pool| { + let updates = updates_ref; + async move { + let mut tx = pool.begin().await?; + + for (task_id, merkle_path) in updates.iter() { + if let Err(e) = sqlx::query( + "UPDATE tasks SET merkle_path = $1, status = 'verified', proof = NULL WHERE task_id = $2", + ) + .bind(merkle_path.as_slice()) + .bind(*task_id) + .execute(&mut *tx) + .await { + tracing::error!("Error while updating task merkle path and status {}", e); + return Err(e); + }; + } - tx.commit() + tx.commit().await?; + Ok(()) + } + }) .await .map_err(|e| DbError::Query(e.to_string()))?; diff --git a/aggregation_mode/proof_aggregator/src/backend/fetcher.rs b/aggregation_mode/proof_aggregator/src/backend/fetcher.rs index 5ed1df6de..9bb0ff787 100644 --- a/aggregation_mode/proof_aggregator/src/backend/fetcher.rs +++ b/aggregation_mode/proof_aggregator/src/backend/fetcher.rs @@ -24,7 +24,7 @@ impl ProofsFetcher { } pub async fn fetch_pending_proofs( - &self, + &mut self, engine: ZKVMEngine, limit: i64, ) -> Result<(Vec, Vec), ProofsFetcherError> { diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index 9dee2164c..8b0e28d4f 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -85,7 +85,7 @@ impl ProofAggregator { let engine = ZKVMEngine::from_env().expect("AGGREGATOR env variable to be set to one of sp1|risc0"); - let db = Db::try_new(&config.db_connection_url) + let db = Db::try_new(config.db_connection_urls.as_slice()) .await .expect("To connect to db"); diff --git a/config-files/config-agg-mode-gateway-ethereum-package.yaml b/config-files/config-agg-mode-gateway-ethereum-package.yaml index 21a843826..caeae01b1 100644 --- a/config-files/config-agg-mode-gateway-ethereum-package.yaml +++ b/config-files/config-agg-mode-gateway-ethereum-package.yaml @@ -1,5 +1,6 @@ port: 8089 -db_connection_url: "postgres://postgres:postgres@localhost:5435/" +db_connection_urls: + - "postgres://postgres:postgres@localhost:5435/" eth_rpc_url: "http://localhost:8545" payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe" network: "devnet" diff --git a/config-files/config-agg-mode-gateway.yaml b/config-files/config-agg-mode-gateway.yaml index ea30755e2..9b222994d 100644 --- a/config-files/config-agg-mode-gateway.yaml +++ b/config-files/config-agg-mode-gateway.yaml @@ -1,5 +1,6 @@ port: 8089 -db_connection_url: "postgres://postgres:postgres@localhost:5435/" +db_connection_urls: + - "postgres://postgres:postgres@localhost:5435/" eth_rpc_url: "http://localhost:8545" payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe" network: "devnet" diff --git a/config-files/config-proof-aggregator-ethereum-package.yaml b/config-files/config-proof-aggregator-ethereum-package.yaml index a20dad54b..23034743c 100644 --- a/config-files/config-proof-aggregator-ethereum-package.yaml +++ b/config-files/config-proof-aggregator-ethereum-package.yaml @@ -12,7 +12,8 @@ proofs_per_chunk: 512 # Amount of proofs to process per chunk # Since each proof commitments takes 32 bytes hash # We can aggregate as much proofs as 126.976 / 32 = 3968 per blob total_proofs_limit: 3968 -db_connection_url: "postgres://postgres:postgres@localhost:5435/" +db_connection_urls: + - "postgres://postgres:postgres@localhost:5435/" # Monthly ETH budget for on-chain proof verification. This value must be non-negative. # This value will be used to calculate how much gas we can spend per second to send proofs on-chain. diff --git a/config-files/config-proof-aggregator.yaml b/config-files/config-proof-aggregator.yaml index 9fc292f04..61a4f982a 100644 --- a/config-files/config-proof-aggregator.yaml +++ b/config-files/config-proof-aggregator.yaml @@ -12,7 +12,8 @@ proofs_per_chunk: 512 # Amount of proofs to process per chunk # Since each proof commitments takes 32 bytes hash # We can aggregate as much proofs as 126.976 / 32 = 3968 per blob total_proofs_limit: 3968 -db_connection_url: "postgres://postgres:postgres@localhost:5435/" +db_connection_urls: + - "postgres://postgres:postgres@localhost:5435/" # Monthly ETH budget for on-chain proof verification. This value must be non-negative. # This value will be used to calculate how much gas we can spend per second to send proofs on-chain.