From 5794215b2f982a8b6b7b658613ea4945773eb9de Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Thu, 18 Dec 2025 14:26:04 -0300 Subject: [PATCH 01/14] feat: db orchestrator design --- aggregation_mode/Cargo.lock | 1 + aggregation_mode/db/Cargo.toml | 3 +- aggregation_mode/db/src/lib.rs | 2 + aggregation_mode/db/src/orchestrator.rs | 86 +++++++++++++++++++++++++ aggregation_mode/db/src/retry.rs | 67 +++++++++++++++++++ 5 files changed, 157 insertions(+), 2 deletions(-) create mode 100644 aggregation_mode/db/src/orchestrator.rs create mode 100644 aggregation_mode/db/src/retry.rs diff --git a/aggregation_mode/Cargo.lock b/aggregation_mode/Cargo.lock index 6e424297a6..8991a7536a 100644 --- a/aggregation_mode/Cargo.lock +++ b/aggregation_mode/Cargo.lock @@ -3262,6 +3262,7 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" name = "db" version = "0.1.0" dependencies = [ + "backon", "sqlx", "tokio", ] diff --git a/aggregation_mode/db/Cargo.toml b/aggregation_mode/db/Cargo.toml index 47b3b89e31..b31012fde1 100644 --- a/aggregation_mode/db/Cargo.toml +++ b/aggregation_mode/db/Cargo.toml @@ -5,9 +5,8 @@ edition = "2021" [dependencies] tokio = { version = "1"} -# TODO: enable tls sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] } - +backon = "1.2.0" [[bin]] name = "migrate" diff --git a/aggregation_mode/db/src/lib.rs b/aggregation_mode/db/src/lib.rs index cd408564ea..776b06c163 100644 --- a/aggregation_mode/db/src/lib.rs +++ b/aggregation_mode/db/src/lib.rs @@ -1 +1,3 @@ +pub mod orchestrator; +mod retry; pub mod types; diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs new file mode 100644 index 0000000000..63c085aa9b --- /dev/null +++ b/aggregation_mode/db/src/orchestrator.rs @@ -0,0 +1,86 @@ +use std::{future::Future, time::Duration}; + +use backon::{ExponentialBuilder, Retryable}; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; + +use crate::retry::RetryError; + +#[derive(Clone, Debug)] +struct DbNode { + pool: Pool, +} + +pub struct DbOrchestartor { + nodes: Vec, +} + +impl DbOrchestartor { + pub fn try_new(connection_urls: Vec) -> Result { + // TODO: validate at least one connection url + let nodes = connection_urls + .into_iter() + .map(|url| { + let pool = PgPoolOptions::new().max_connections(5).connect_lazy(&url)?; + + Ok(DbNode { pool }) + }) + .collect::, sqlx::Error>>()?; + + Ok(Self { nodes }) + } + + fn backoff_builder(&self) -> ExponentialBuilder { + ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(0)) + .with_max_times(0) + .with_factor(0.0) + .with_max_delay(Duration::from_secs(0)) + } + + pub async fn query(&self, query_fn: Q) -> Result + where + Q: Fn(Pool) -> Fut, + Fut: Future>, + { + let func = async || { + let mut last_error = None; + + for idx in 0..self.nodes.len() { + let pool = self.nodes[idx].pool.clone(); + match query_fn(pool).await { + Ok(res) => return Ok(res), + Err(err) => { + if Self::is_connection_error(&err) { + last_error = Some(err); + } else { + return Err(RetryError::Permanent(err)); + } + } + }; + } + + Err(RetryError::Transient( + last_error.expect("write_op attempted without database nodes"), + )) + }; + + func.retry(self.backoff_builder()) + .sleep(tokio::time::sleep) + .when(|e| matches!(e, RetryError::Transient(_))) + .await + .map_err(|e| e.inner()) + } + + fn is_connection_error(error: &sqlx::Error) -> bool { + matches!( + error, + sqlx::Error::Io(_) + | sqlx::Error::Tls(_) + | sqlx::Error::Protocol(_) + | sqlx::Error::PoolTimedOut + | sqlx::Error::PoolClosed + | sqlx::Error::WorkerCrashed + | sqlx::Error::BeginFailed + ) + } +} diff --git a/aggregation_mode/db/src/retry.rs b/aggregation_mode/db/src/retry.rs new file mode 100644 index 0000000000..7d76414870 --- /dev/null +++ b/aggregation_mode/db/src/retry.rs @@ -0,0 +1,67 @@ +use backon::ExponentialBuilder; +use backon::Retryable; +use std::{future::Future, time::Duration}; + +#[derive(Debug)] +pub enum RetryError { + Transient(E), + Permanent(E), +} + +impl std::fmt::Display for RetryError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + RetryError::Transient(e) => write!(f, "{e}"), + RetryError::Permanent(e) => write!(f, "{e}"), + } + } +} + +impl RetryError { + pub fn inner(self) -> E { + match self { + RetryError::Transient(e) => e, + RetryError::Permanent(e) => e, + } + } +} + +impl std::error::Error for RetryError where E: std::fmt::Debug {} + +/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function +/// Runs with `jitter: false`. +/// +/// # Parameters +/// * `function` - The async function to retry +/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds) +/// * `factor` - Exponential backoff multiplier for retry delays +/// * `max_times` - Maximum number of retry attempts +/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds) +pub async fn retry_function( + mut function: FutureFn, + min_delay_millis: u64, + factor: f32, + max_times: usize, + max_delay_seconds: u64, +) -> Result> +where + Fut: Future>>, + FutureFn: FnMut() -> Fut, +{ + let backoff = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(min_delay_millis)) + .with_max_times(max_times) + .with_factor(factor) + .with_max_delay(Duration::from_secs(max_delay_seconds)); + + let func = async || match function().await { + Ok(res) => Ok(res), + Err(e) => Err(e), + }; + + function + .retry(backoff) + .sleep(tokio::time::sleep) + .when(|e| matches!(e, RetryError::Transient(_))) + .await +} From 5340365e1f0ca3a53fc166de451596c6e3861658 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Thu, 18 Dec 2025 16:02:52 -0300 Subject: [PATCH 02/14] feat: receive config via params and log errors --- aggregation_mode/Cargo.lock | 1 + aggregation_mode/db/Cargo.toml | 1 + aggregation_mode/db/src/orchestrator.rs | 36 +++++++++++++----- aggregation_mode/db/src/retry.rs | 50 +++++-------------------- 4 files changed, 39 insertions(+), 49 deletions(-) diff --git a/aggregation_mode/Cargo.lock b/aggregation_mode/Cargo.lock index 8991a7536a..69d3b81a41 100644 --- a/aggregation_mode/Cargo.lock +++ b/aggregation_mode/Cargo.lock @@ -3265,6 +3265,7 @@ dependencies = [ "backon", "sqlx", "tokio", + "tracing", ] [[package]] diff --git a/aggregation_mode/db/Cargo.toml b/aggregation_mode/db/Cargo.toml index b31012fde1..539a368bf0 100644 --- a/aggregation_mode/db/Cargo.toml +++ b/aggregation_mode/db/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" tokio = { version = "1"} sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] } backon = "1.2.0" +tracing = { version = "0.1", features = ["log"] } [[bin]] name = "migrate" diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs index 63c085aa9b..5caa7d0196 100644 --- a/aggregation_mode/db/src/orchestrator.rs +++ b/aggregation_mode/db/src/orchestrator.rs @@ -3,7 +3,7 @@ use std::{future::Future, time::Duration}; use backon::{ExponentialBuilder, Retryable}; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; -use crate::retry::RetryError; +use crate::retry::{RetryConfig, RetryError}; #[derive(Clone, Debug)] struct DbNode { @@ -12,11 +12,23 @@ struct DbNode { pub struct DbOrchestartor { nodes: Vec, + retry_config: RetryConfig, +} + +pub enum DbOrchestartorError { + InvalidNumberOfConnectionUrls, + Sqlx(sqlx::Error), } impl DbOrchestartor { - pub fn try_new(connection_urls: Vec) -> Result { - // TODO: validate at least one connection url + pub fn try_new( + connection_urls: Vec, + retry_config: RetryConfig, + ) -> Result { + if connection_urls.is_empty() { + return Err(DbOrchestartorError::InvalidNumberOfConnectionUrls); + } + let nodes = connection_urls .into_iter() .map(|url| { @@ -24,17 +36,21 @@ impl DbOrchestartor { Ok(DbNode { pool }) }) - .collect::, sqlx::Error>>()?; + .collect::, sqlx::Error>>() + .map_err(|e| DbOrchestartorError::Sqlx(e))?; - Ok(Self { nodes }) + Ok(Self { + nodes, + retry_config, + }) } fn backoff_builder(&self) -> ExponentialBuilder { ExponentialBuilder::default() - .with_min_delay(Duration::from_millis(0)) - .with_max_times(0) - .with_factor(0.0) - .with_max_delay(Duration::from_secs(0)) + .with_min_delay(Duration::from_millis(self.retry_config.min_delay_millis)) + .with_max_times(self.retry_config.max_times) + .with_factor(self.retry_config.factor) + .with_max_delay(Duration::from_secs(self.retry_config.max_delay_seconds)) } pub async fn query(&self, query_fn: Q) -> Result @@ -51,6 +67,7 @@ impl DbOrchestartor { Ok(res) => return Ok(res), Err(err) => { if Self::is_connection_error(&err) { + tracing::warn!(node_index = idx, error = ?err, "database query failed; retrying"); last_error = Some(err); } else { return Err(RetryError::Permanent(err)); @@ -81,6 +98,7 @@ impl DbOrchestartor { | sqlx::Error::PoolClosed | sqlx::Error::WorkerCrashed | sqlx::Error::BeginFailed + | sqlx::Error::Database(_) ) } } diff --git a/aggregation_mode/db/src/retry.rs b/aggregation_mode/db/src/retry.rs index 7d76414870..957419f277 100644 --- a/aggregation_mode/db/src/retry.rs +++ b/aggregation_mode/db/src/retry.rs @@ -1,7 +1,3 @@ -use backon::ExponentialBuilder; -use backon::Retryable; -use std::{future::Future, time::Duration}; - #[derive(Debug)] pub enum RetryError { Transient(E), @@ -28,40 +24,14 @@ impl RetryError { impl std::error::Error for RetryError where E: std::fmt::Debug {} -/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function -/// Runs with `jitter: false`. -/// -/// # Parameters -/// * `function` - The async function to retry -/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds) -/// * `factor` - Exponential backoff multiplier for retry delays -/// * `max_times` - Maximum number of retry attempts -/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds) -pub async fn retry_function( - mut function: FutureFn, - min_delay_millis: u64, - factor: f32, - max_times: usize, - max_delay_seconds: u64, -) -> Result> -where - Fut: Future>>, - FutureFn: FnMut() -> Fut, -{ - let backoff = ExponentialBuilder::default() - .with_min_delay(Duration::from_millis(min_delay_millis)) - .with_max_times(max_times) - .with_factor(factor) - .with_max_delay(Duration::from_secs(max_delay_seconds)); - - let func = async || match function().await { - Ok(res) => Ok(res), - Err(e) => Err(e), - }; - - function - .retry(backoff) - .sleep(tokio::time::sleep) - .when(|e| matches!(e, RetryError::Transient(_))) - .await +#[derive(Debug)] +pub struct RetryConfig { + /// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds) + pub min_delay_millis: u64, + /// * `factor` - Exponential backoff multiplier for retry delays + pub factor: f32, + /// * `max_times` - Maximum number of retry attempts + pub max_times: usize, + /// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds) + pub max_delay_seconds: u64, } From 5e325255cd612b9fcf939ab1d791c908fec332d9 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Thu, 18 Dec 2025 16:49:19 -0300 Subject: [PATCH 03/14] feat: orchestrator write/read failure and preferred order by last successfull --- aggregation_mode/db/src/orchestrator.rs | 157 +++++++++++++++++++----- aggregation_mode/db/src/retry.rs | 9 -- 2 files changed, 123 insertions(+), 43 deletions(-) diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs index 5caa7d0196..d7ad30c9e8 100644 --- a/aggregation_mode/db/src/orchestrator.rs +++ b/aggregation_mode/db/src/orchestrator.rs @@ -1,15 +1,23 @@ use std::{future::Future, time::Duration}; -use backon::{ExponentialBuilder, Retryable}; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; use crate::retry::{RetryConfig, RetryError}; -#[derive(Clone, Debug)] +#[derive(Debug, Clone, Copy)] +enum Operation { + Read, + Write, +} + +#[derive(Debug)] struct DbNode { pool: Pool, + last_read_failed: bool, + last_write_failed: bool, } +#[derive(Debug)] pub struct DbOrchestartor { nodes: Vec, retry_config: RetryConfig, @@ -34,7 +42,11 @@ impl DbOrchestartor { .map(|url| { let pool = PgPoolOptions::new().max_connections(5).connect_lazy(&url)?; - Ok(DbNode { pool }) + Ok(DbNode { + pool, + last_read_failed: false, + last_write_failed: false, + }) }) .collect::, sqlx::Error>>() .map_err(|e| DbOrchestartorError::Sqlx(e))?; @@ -45,47 +57,124 @@ impl DbOrchestartor { }) } - fn backoff_builder(&self) -> ExponentialBuilder { - ExponentialBuilder::default() - .with_min_delay(Duration::from_millis(self.retry_config.min_delay_millis)) - .with_max_times(self.retry_config.max_times) - .with_factor(self.retry_config.factor) - .with_max_delay(Duration::from_secs(self.retry_config.max_delay_seconds)) + pub async fn write(&mut self, query: Q) -> Result + where + Q: Fn(Pool) -> Fut, + Fut: Future>, + { + self.query::(query, Operation::Write).await } - pub async fn query(&self, query_fn: Q) -> Result + pub async fn read(&mut self, query: Q) -> Result where Q: Fn(Pool) -> Fut, Fut: Future>, { - let func = async || { - let mut last_error = None; - - for idx in 0..self.nodes.len() { - let pool = self.nodes[idx].pool.clone(); - match query_fn(pool).await { - Ok(res) => return Ok(res), - Err(err) => { - if Self::is_connection_error(&err) { - tracing::warn!(node_index = idx, error = ?err, "database query failed; retrying"); - last_error = Some(err); - } else { - return Err(RetryError::Permanent(err)); - } + self.query::(query, Operation::Read).await + } + + async fn query( + &mut self, + query_fn: Q, + operation: Operation, + ) -> Result + where + Q: Fn(Pool) -> Fut, + Fut: Future>, + { + let mut attempts = 0; + let mut delay = Duration::from_millis(self.retry_config.min_delay_millis); + + loop { + match self.execute_once(&query_fn, operation).await { + Ok(value) => return Ok(value), + Err(RetryError::Permanent(err)) => return Err(err), + Err(RetryError::Transient(err)) => { + if attempts >= self.retry_config.max_delay_seconds { + return Err(err); } - }; + + tracing::warn!(attempt = attempts, delay_milis = delay.as_millis(), error = ?err, "retrying after backoff"); + tokio::time::sleep(delay).await; + delay = self.backoff_delay(delay); + attempts += 1; + } } + } + } + + fn backoff_delay(&self, current: Duration) -> Duration { + let max = Duration::from_secs(self.retry_config.max_delay_seconds); + let scaled_secs = current.as_secs_f64() * f64::from(self.retry_config.factor); + let scaled = Duration::from_secs_f64(scaled_secs); + if scaled > max { + max + } else { + scaled + } + } - Err(RetryError::Transient( - last_error.expect("write_op attempted without database nodes"), - )) - }; + async fn execute_once( + &mut self, + query_fn: &Q, + operation: Operation, + ) -> Result> + where + Q: Fn(Pool) -> Fut, + Fut: Future>, + { + let mut last_error = None; + + for idx in self.preferred_order(operation) { + let pool = self.nodes[idx].pool.clone(); + + match query_fn(pool).await { + Ok(res) => { + match operation { + Operation::Read => self.nodes[idx].last_read_failed = false, + Operation::Write => self.nodes[idx].last_write_failed = false, + }; + return Ok(res); + } + Err(err) => { + if Self::is_connection_error(&err) { + tracing::warn!(node_index = idx, error = ?err, "database query failed"); + match operation { + Operation::Read => self.nodes[idx].last_read_failed = true, + Operation::Write => self.nodes[idx].last_write_failed = true, + }; + last_error = Some(err); + } else { + return Err(RetryError::Permanent(err)); + } + } + }; + } + + Err(RetryError::Transient( + last_error.expect("write_op attempted without database nodes"), + )) + } + + fn preferred_order(&self, operation: Operation) -> Vec { + let mut preferred = Vec::with_capacity(self.nodes.len()); + let mut fallback = Vec::new(); + + for (idx, node) in self.nodes.iter().enumerate() { + let failed = match operation { + Operation::Read => node.last_read_failed, + Operation::Write => node.last_write_failed, + }; + + if failed { + fallback.push(idx); + } else { + preferred.push(idx); + } + } - func.retry(self.backoff_builder()) - .sleep(tokio::time::sleep) - .when(|e| matches!(e, RetryError::Transient(_))) - .await - .map_err(|e| e.inner()) + preferred.extend(fallback); + preferred } fn is_connection_error(error: &sqlx::Error) -> bool { diff --git a/aggregation_mode/db/src/retry.rs b/aggregation_mode/db/src/retry.rs index 957419f277..e08c9ce7d9 100644 --- a/aggregation_mode/db/src/retry.rs +++ b/aggregation_mode/db/src/retry.rs @@ -13,15 +13,6 @@ impl std::fmt::Display for RetryError { } } -impl RetryError { - pub fn inner(self) -> E { - match self { - RetryError::Transient(e) => e, - RetryError::Permanent(e) => e, - } - } -} - impl std::error::Error for RetryError where E: std::fmt::Debug {} #[derive(Debug)] From 5a2d233328d4218ecef0b4b95ba2495bca6343c6 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Thu, 18 Dec 2025 17:33:28 -0300 Subject: [PATCH 04/14] feat: make orchestrator clone safe between threads --- aggregation_mode/db/src/lib.rs | 2 +- aggregation_mode/db/src/orchestrator.rs | 75 +++++++++++++++++++------ aggregation_mode/db/src/retry.rs | 4 +- 3 files changed, 60 insertions(+), 21 deletions(-) diff --git a/aggregation_mode/db/src/lib.rs b/aggregation_mode/db/src/lib.rs index 776b06c163..a87f9d7077 100644 --- a/aggregation_mode/db/src/lib.rs +++ b/aggregation_mode/db/src/lib.rs @@ -1,3 +1,3 @@ pub mod orchestrator; -mod retry; +pub mod retry; pub mod types; diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs index d7ad30c9e8..93cddb6fef 100644 --- a/aggregation_mode/db/src/orchestrator.rs +++ b/aggregation_mode/db/src/orchestrator.rs @@ -1,4 +1,11 @@ -use std::{future::Future, time::Duration}; +use std::{ + future::Future, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; @@ -10,27 +17,56 @@ enum Operation { Write, } +/// A single DB node: connection pool plus shared health flags (used to prioritize nodes). + #[derive(Debug)] struct DbNode { pool: Pool, - last_read_failed: bool, - last_write_failed: bool, + last_read_failed: AtomicBool, + last_write_failed: AtomicBool, } -#[derive(Debug)] +/// Database orchestrator for running reads/writes across multiple PostgreSQL nodes with retry/backoff. +/// +/// `DbOrchestartor` holds a list of database nodes (connection pools) and will: +/// - try nodes in a preferred order (healthy nodes first, then recently-failed nodes), +/// - mark nodes as failed on connection-type errors, +/// - retry transient failures with exponential backoff based on `retry_config`, +/// +/// ## Thread-safe `Clone` +/// This type is cheap and thread-safe to clone: +/// - `nodes` is `Vec>`, so cloning only increments `Arc` ref-counts and shares the same pools/nodes, +/// - `sqlx::Pool` is internally reference-counted and designed to be cloned and used concurrently, +/// - the node health flags are `AtomicBool`, so updates are safe from multiple threads/tasks. +/// +/// Clones share health state (the atomics) and the underlying pools, so all clones observe and influence +/// the same “preferred node” ordering decisions. +#[derive(Debug, Clone)] pub struct DbOrchestartor { - nodes: Vec, + nodes: Vec>, retry_config: RetryConfig, } +#[derive(Debug)] pub enum DbOrchestartorError { InvalidNumberOfConnectionUrls, Sqlx(sqlx::Error), } +impl std::fmt::Display for DbOrchestartorError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidNumberOfConnectionUrls => { + write!(f, "invalid number of connection URLs") + } + Self::Sqlx(e) => write!(f, "{e}"), + } + } +} + impl DbOrchestartor { pub fn try_new( - connection_urls: Vec, + connection_urls: &[&str], retry_config: RetryConfig, ) -> Result { if connection_urls.is_empty() { @@ -40,13 +76,13 @@ impl DbOrchestartor { let nodes = connection_urls .into_iter() .map(|url| { - let pool = PgPoolOptions::new().max_connections(5).connect_lazy(&url)?; + let pool = PgPoolOptions::new().max_connections(5).connect_lazy(url)?; - Ok(DbNode { + Ok(Arc::new(DbNode { pool, - last_read_failed: false, - last_write_failed: false, - }) + last_read_failed: AtomicBool::new(false), + last_write_failed: AtomicBool::new(false), + })) }) .collect::, sqlx::Error>>() .map_err(|e| DbOrchestartorError::Sqlx(e))?; @@ -126,13 +162,14 @@ impl DbOrchestartor { let mut last_error = None; for idx in self.preferred_order(operation) { - let pool = self.nodes[idx].pool.clone(); + let node = &self.nodes[idx]; + let pool = node.pool.clone(); match query_fn(pool).await { Ok(res) => { match operation { - Operation::Read => self.nodes[idx].last_read_failed = false, - Operation::Write => self.nodes[idx].last_write_failed = false, + Operation::Read => node.last_read_failed.store(false, Ordering::Relaxed), + Operation::Write => node.last_write_failed.store(false, Ordering::Relaxed), }; return Ok(res); } @@ -140,8 +177,10 @@ impl DbOrchestartor { if Self::is_connection_error(&err) { tracing::warn!(node_index = idx, error = ?err, "database query failed"); match operation { - Operation::Read => self.nodes[idx].last_read_failed = true, - Operation::Write => self.nodes[idx].last_write_failed = true, + Operation::Read => node.last_read_failed.store(true, Ordering::Relaxed), + Operation::Write => { + node.last_write_failed.store(true, Ordering::Relaxed) + } }; last_error = Some(err); } else { @@ -162,8 +201,8 @@ impl DbOrchestartor { for (idx, node) in self.nodes.iter().enumerate() { let failed = match operation { - Operation::Read => node.last_read_failed, - Operation::Write => node.last_write_failed, + Operation::Read => node.last_read_failed.load(Ordering::Relaxed), + Operation::Write => node.last_write_failed.load(Ordering::Relaxed), }; if failed { diff --git a/aggregation_mode/db/src/retry.rs b/aggregation_mode/db/src/retry.rs index e08c9ce7d9..adec8433d9 100644 --- a/aggregation_mode/db/src/retry.rs +++ b/aggregation_mode/db/src/retry.rs @@ -1,5 +1,5 @@ #[derive(Debug)] -pub enum RetryError { +pub(super) enum RetryError { Transient(E), Permanent(E), } @@ -15,7 +15,7 @@ impl std::fmt::Display for RetryError { impl std::error::Error for RetryError where E: std::fmt::Debug {} -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct RetryConfig { /// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds) pub min_delay_millis: u64, From 4ab27f139d79c37c98231012c54a07ba0384d213 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Thu, 18 Dec 2025 17:34:03 -0300 Subject: [PATCH 05/14] feat: use orchestrator in proof aggregator --- .../proof_aggregator/src/backend/db.rs | 128 ++++++++++-------- .../proof_aggregator/src/backend/fetcher.rs | 2 +- .../proof_aggregator/src/backend/mod.rs | 2 +- 3 files changed, 76 insertions(+), 56 deletions(-) diff --git a/aggregation_mode/proof_aggregator/src/backend/db.rs b/aggregation_mode/proof_aggregator/src/backend/db.rs index feeaa0db2a..63d8b19fb0 100644 --- a/aggregation_mode/proof_aggregator/src/backend/db.rs +++ b/aggregation_mode/proof_aggregator/src/backend/db.rs @@ -1,82 +1,102 @@ -use db::types::Task; -use sqlx::{postgres::PgPoolOptions, types::Uuid, Pool, Postgres}; +use db::{orchestrator::DbOrchestartor, retry::RetryConfig, types::Task}; +use sqlx::types::Uuid; -#[derive(Clone, Debug)] +// Retry parameters for Db queries +/// Initial delay before first retry attempt (in milliseconds) +const RETRY_MIN_DELAY_MILLIS: u64 = 500; +/// Exponential backoff multiplier for retry delays +const RETRY_FACTOR: f32 = 2.0; +/// Maximum number of retry attempts +const RETRY_MAX_TIMES: usize = 5; +/// Maximum delay between retry attempts (in seconds) +const RETRY_MAX_DELAY_SECONDS: u64 = 30; + +#[derive(Debug, Clone)] pub struct Db { - pool: Pool, + orchestrator: DbOrchestartor, } #[derive(Debug, Clone)] pub enum DbError { - ConnectError(String), + Creation(String), Query(String), } impl Db { - pub async fn try_new(connection_url: &str) -> Result { - let pool = PgPoolOptions::new() - .max_connections(5) - .connect(connection_url) - .await - .map_err(|e| DbError::ConnectError(e.to_string()))?; + pub async fn try_new(connection_urls: &[&str]) -> Result { + let orchestrator = DbOrchestartor::try_new( + connection_urls, + RetryConfig { + min_delay_millis: RETRY_MIN_DELAY_MILLIS, + factor: RETRY_FACTOR, + max_times: RETRY_MAX_TIMES, + max_delay_seconds: RETRY_MAX_DELAY_SECONDS, + }, + ) + .map_err(|e| DbError::Creation(e.to_string()))?; - Ok(Self { pool }) + Ok(Self { orchestrator }) } pub async fn get_pending_tasks_and_mark_them_as_processing( - &self, + &mut self, proving_system_id: i32, limit: i64, ) -> Result, DbError> { - sqlx::query_as::<_, Task>( - "WITH selected AS ( - SELECT task_id - FROM tasks - WHERE proving_system_id = $1 AND status = 'pending' - LIMIT $2 - FOR UPDATE SKIP LOCKED + self.orchestrator + .write(async |pool| { + sqlx::query_as::<_, Task>( + "WITH selected AS ( + SELECT task_id + FROM tasks + WHERE proving_system_id = $1 AND status = 'pending' + LIMIT $2 + FOR UPDATE SKIP LOCKED + ) + UPDATE tasks t + SET status = 'processing' + FROM selected s + WHERE t.task_id = s.task_id + RETURNING t.*;", ) - UPDATE tasks t - SET status = 'processing' - FROM selected s - WHERE t.task_id = s.task_id - RETURNING t.*;", - ) - .bind(proving_system_id) - .bind(limit) - .fetch_all(&self.pool) - .await - .map_err(|e| DbError::Query(e.to_string())) + .bind(proving_system_id) + .bind(limit) + .fetch_all(&pool) + .await + }) + .await + .map_err(|e| DbError::Query(e.to_string())) } pub async fn insert_tasks_merkle_path_and_mark_them_as_verified( - &self, + &mut self, updates: Vec<(Uuid, Vec)>, ) -> Result<(), DbError> { - let mut tx = self - .pool - .begin() - .await - .map_err(|e| DbError::Query(e.to_string()))?; + let updates_ref = &updates; - for (task_id, merkle_path) in updates { - if let Err(e) = sqlx::query( - "UPDATE tasks SET merkle_path = $1, status = 'verified', proof = NULL WHERE task_id = $2", - ) - .bind(merkle_path) - .bind(task_id) - .execute(&mut *tx) - .await - { - tx.rollback() - .await - .map_err(|e| DbError::Query(e.to_string()))?; - tracing::error!("Error while updating task merkle path and status {}", e); - return Err(DbError::Query(e.to_string())); - } - } + self.orchestrator + .write(|pool| { + let updates = updates_ref; + async move { + let mut tx = pool.begin().await?; + + for (task_id, merkle_path) in updates.iter() { + if let Err(e) = sqlx::query( + "UPDATE tasks SET merkle_path = $1, status = 'verified', proof = NULL WHERE task_id = $2", + ) + .bind(merkle_path.as_slice()) + .bind(*task_id) + .execute(&mut *tx) + .await { + tracing::error!("Error while updating task merkle path and status {}", e); + return Err(e); + }; + } - tx.commit() + tx.commit().await?; + Ok(()) + } + }) .await .map_err(|e| DbError::Query(e.to_string()))?; diff --git a/aggregation_mode/proof_aggregator/src/backend/fetcher.rs b/aggregation_mode/proof_aggregator/src/backend/fetcher.rs index 5ed1df6dec..9bb0ff7877 100644 --- a/aggregation_mode/proof_aggregator/src/backend/fetcher.rs +++ b/aggregation_mode/proof_aggregator/src/backend/fetcher.rs @@ -24,7 +24,7 @@ impl ProofsFetcher { } pub async fn fetch_pending_proofs( - &self, + &mut self, engine: ZKVMEngine, limit: i64, ) -> Result<(Vec, Vec), ProofsFetcherError> { diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index 9dee2164c3..44ff356653 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -85,7 +85,7 @@ impl ProofAggregator { let engine = ZKVMEngine::from_env().expect("AGGREGATOR env variable to be set to one of sp1|risc0"); - let db = Db::try_new(&config.db_connection_url) + let db = Db::try_new(&[&config.db_connection_url]) .await .expect("To connect to db"); From 67e8d052b9d463f2937fae50c81e2d38eee5fa68 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Thu, 18 Dec 2025 18:55:05 -0300 Subject: [PATCH 06/14] feat: use db orchestrator in gateway + remove mutable reference to self --- aggregation_mode/Cargo.lock | 2 + aggregation_mode/db/Cargo.toml | 1 + aggregation_mode/db/src/orchestrator.rs | 12 +- aggregation_mode/db/src/types.rs | 2 +- aggregation_mode/gateway/Cargo.toml | 1 + aggregation_mode/gateway/src/db.rs | 212 +++++++++++++----------- aggregation_mode/gateway/src/http.rs | 1 + aggregation_mode/gateway/src/types.rs | 11 +- 8 files changed, 134 insertions(+), 108 deletions(-) diff --git a/aggregation_mode/Cargo.lock b/aggregation_mode/Cargo.lock index 69d3b81a41..5f8a6a0e46 100644 --- a/aggregation_mode/Cargo.lock +++ b/aggregation_mode/Cargo.lock @@ -3263,6 +3263,7 @@ name = "db" version = "0.1.0" dependencies = [ "backon", + "serde", "sqlx", "tokio", "tracing", @@ -4497,6 +4498,7 @@ dependencies = [ "aligned-sdk", "alloy", "bincode", + "db", "hex", "serde", "serde_json", diff --git a/aggregation_mode/db/Cargo.toml b/aggregation_mode/db/Cargo.toml index 539a368bf0..288ce5083e 100644 --- a/aggregation_mode/db/Cargo.toml +++ b/aggregation_mode/db/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +serde = { workspace = true } tokio = { version = "1"} sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] } backon = "1.2.0" diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs index 93cddb6fef..fd8d9819f0 100644 --- a/aggregation_mode/db/src/orchestrator.rs +++ b/aggregation_mode/db/src/orchestrator.rs @@ -93,7 +93,7 @@ impl DbOrchestartor { }) } - pub async fn write(&mut self, query: Q) -> Result + pub async fn write(&self, query: Q) -> Result where Q: Fn(Pool) -> Fut, Fut: Future>, @@ -101,7 +101,7 @@ impl DbOrchestartor { self.query::(query, Operation::Write).await } - pub async fn read(&mut self, query: Q) -> Result + pub async fn read(&self, query: Q) -> Result where Q: Fn(Pool) -> Fut, Fut: Future>, @@ -109,11 +109,7 @@ impl DbOrchestartor { self.query::(query, Operation::Read).await } - async fn query( - &mut self, - query_fn: Q, - operation: Operation, - ) -> Result + async fn query(&self, query_fn: Q, operation: Operation) -> Result where Q: Fn(Pool) -> Fut, Fut: Future>, @@ -151,7 +147,7 @@ impl DbOrchestartor { } async fn execute_once( - &mut self, + &self, query_fn: &Q, operation: Operation, ) -> Result> diff --git a/aggregation_mode/db/src/types.rs b/aggregation_mode/db/src/types.rs index 676a780a48..6b186e6cf8 100644 --- a/aggregation_mode/db/src/types.rs +++ b/aggregation_mode/db/src/types.rs @@ -4,7 +4,7 @@ use sqlx::{ Type, }; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Type)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Type, serde::Serialize)] #[sqlx(type_name = "task_status", rename_all = "lowercase")] pub enum TaskStatus { Pending, diff --git a/aggregation_mode/gateway/Cargo.toml b/aggregation_mode/gateway/Cargo.toml index 79ef53351c..927b11bc91 100644 --- a/aggregation_mode/gateway/Cargo.toml +++ b/aggregation_mode/gateway/Cargo.toml @@ -10,6 +10,7 @@ serde_yaml = { workspace = true } agg_mode_sdk = { path = "../sdk"} aligned-sdk = { workspace = true } sp1-sdk = { workspace = true } +db = { workspace = true } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } bincode = "1.3.3" diff --git a/aggregation_mode/gateway/src/db.rs b/aggregation_mode/gateway/src/db.rs index 94809f2342..692c589a32 100644 --- a/aggregation_mode/gateway/src/db.rs +++ b/aggregation_mode/gateway/src/db.rs @@ -1,12 +1,11 @@ -use sqlx::{ - postgres::PgPoolOptions, - types::{BigDecimal, Uuid}, - Pool, Postgres, -}; +use db::{orchestrator::DbOrchestartor, retry::RetryConfig}; +use sqlx::types::{BigDecimal, Uuid}; + +use crate::types::Receipt; #[derive(Clone, Debug)] pub struct Db { - pool: Pool, + orchestrator: DbOrchestartor, } #[derive(Debug, Clone)] @@ -14,52 +13,51 @@ pub enum DbError { ConnectError(String), } -#[derive(Debug, Clone, sqlx::Type, serde::Serialize)] -#[sqlx(type_name = "task_status")] -#[sqlx(rename_all = "lowercase")] -pub enum TaskStatus { - Pending, - Processing, - Verified, -} - -#[derive(Debug, Clone, sqlx::FromRow, sqlx::Type, serde::Serialize)] -pub struct Receipt { - pub status: TaskStatus, - pub merkle_path: Option>, - pub nonce: i64, - pub address: String, -} - impl Db { - pub async fn try_new(connection_url: &str) -> Result { - let pool = PgPoolOptions::new() - .max_connections(5) - .connect(connection_url) - .await - .map_err(|e| DbError::ConnectError(e.to_string()))?; + pub async fn try_new(connection_urls: &[&str]) -> Result { + let orchestrator = DbOrchestartor::try_new( + connection_urls, + RetryConfig { + factor: 0.0, + max_delay_seconds: 0, + max_times: 0, + min_delay_millis: 0, + }, + ) + .map_err(|e| DbError::ConnectError(e.to_string()))?; - Ok(Self { pool }) + Ok(Self { orchestrator }) } pub async fn count_tasks_by_address(&self, address: &str) -> Result { - let (count,) = sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM tasks WHERE address = $1") - .bind(address.to_lowercase()) - .fetch_one(&self.pool) - .await?; + self.orchestrator + .read(async |pool| { + let (count,) = + sqlx::query_as::<_, (i64,)>("SELECT COUNT(*) FROM tasks WHERE address = $1") + .bind(address.to_lowercase()) + .fetch_one(&pool) + .await?; - Ok(count) + Ok(count) + }) + .await } pub async fn get_merkle_path_by_task_id( &self, task_id: Uuid, ) -> Result>, sqlx::Error> { - sqlx::query_scalar::<_, Option>>("SELECT merkle_path FROM tasks WHERE task_id = $1") - .bind(task_id) - .fetch_optional(&self.pool) + self.orchestrator + .read(async |pool| { + sqlx::query_scalar::<_, Option>>( + "SELECT merkle_path FROM tasks WHERE task_id = $1", + ) + .bind(task_id) + .fetch_optional(&pool) + .await + .map(|res| res.flatten()) + }) .await - .map(|res| res.flatten()) } pub async fn get_tasks_by_address_and_nonce( @@ -67,16 +65,20 @@ impl Db { address: &str, nonce: i64, ) -> Result, sqlx::Error> { - sqlx::query_as::<_, Receipt>( - "SELECT status,merkle_path,nonce,address FROM tasks - WHERE address = $1 - AND nonce = $2 - ORDER BY nonce DESC", - ) - .bind(address.to_lowercase()) - .bind(nonce) - .fetch_all(&self.pool) - .await + self.orchestrator + .read(async |pool| { + sqlx::query_as::<_, Receipt>( + "SELECT status,merkle_path,nonce,address FROM tasks + WHERE address = $1 + AND nonce = $2 + ORDER BY nonce DESC", + ) + .bind(address.to_lowercase()) + .bind(nonce) + .fetch_all(&pool) + .await + }) + .await } pub async fn get_tasks_by_address_with_limit( @@ -84,28 +86,36 @@ impl Db { address: &str, limit: i64, ) -> Result, sqlx::Error> { - sqlx::query_as::<_, Receipt>( - "SELECT status,merkle_path,nonce,address FROM tasks - WHERE address = $1 - ORDER BY nonce DESC - LIMIT $2", - ) - .bind(address.to_lowercase()) - .bind(limit) - .fetch_all(&self.pool) - .await + self.orchestrator + .read(async |pool| { + sqlx::query_as::<_, Receipt>( + "SELECT status,merkle_path,nonce,address FROM tasks + WHERE address = $1 + ORDER BY nonce DESC + LIMIT $2", + ) + .bind(address.to_lowercase()) + .bind(limit) + .fetch_all(&pool) + .await + }) + .await } pub async fn get_daily_tasks_by_address(&self, address: &str) -> Result { - sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) - FROM tasks - WHERE address = $1 - AND inserted_at::date = CURRENT_DATE", - ) - .bind(address.to_lowercase()) - .fetch_one(&self.pool) - .await + self.orchestrator + .read(async |pool| { + sqlx::query_scalar::<_, i64>( + "SELECT COUNT(*) + FROM tasks + WHERE address = $1 + AND inserted_at::date = CURRENT_DATE", + ) + .bind(address.to_lowercase()) + .fetch_one(&pool) + .await + }) + .await } pub async fn insert_task( @@ -117,25 +127,29 @@ impl Db { merkle_path: Option<&[u8]>, nonce: i64, ) -> Result { - sqlx::query_scalar::<_, Uuid>( - "INSERT INTO tasks ( - address, - proving_system_id, - proof, - program_commitment, - merkle_path, - nonce - ) VALUES ($1, $2, $3, $4, $5, $6) - RETURNING task_id", - ) - .bind(address.to_lowercase()) - .bind(proving_system_id) - .bind(proof) - .bind(program_commitment) - .bind(merkle_path) - .bind(nonce) - .fetch_one(&self.pool) - .await + self.orchestrator + .write(async |pool| { + sqlx::query_scalar::<_, Uuid>( + "INSERT INTO tasks ( + address, + proving_system_id, + proof, + program_commitment, + merkle_path, + nonce + ) VALUES ($1, $2, $3, $4, $5, $6) + RETURNING task_id", + ) + .bind(address.to_lowercase()) + .bind(proving_system_id) + .bind(proof) + .bind(program_commitment) + .bind(merkle_path) + .bind(nonce) + .fetch_one(&pool) + .await + }) + .await } pub async fn has_active_payment_event( @@ -143,15 +157,19 @@ impl Db { address: &str, epoch: BigDecimal, ) -> Result { - sqlx::query_scalar::<_, bool>( - "SELECT EXISTS ( - SELECT 1 FROM payment_events - WHERE address = $1 AND started_at < $2 AND $2 < valid_until - )", - ) - .bind(address.to_lowercase()) - .bind(epoch) - .fetch_one(&self.pool) - .await + self.orchestrator + .read(async |pool| { + sqlx::query_scalar::<_, bool>( + "SELECT EXISTS ( + SELECT 1 FROM payment_events + WHERE address = $1 AND started_at < $2 AND $2 < valid_until + )", + ) + .bind(address.to_lowercase()) + .bind(&epoch) + .fetch_one(&pool) + .await + }) + .await } } diff --git a/aggregation_mode/gateway/src/http.rs b/aggregation_mode/gateway/src/http.rs index 2b63e78697..d72d0d19bc 100644 --- a/aggregation_mode/gateway/src/http.rs +++ b/aggregation_mode/gateway/src/http.rs @@ -84,6 +84,7 @@ impl GatewayServer { .json(AppResponse::new_unsucessfull("Internal server error", 500)); }; + // TODO: how to fix the mutable thing let state = state.get_ref(); match state.db.count_tasks_by_address(&address).await { Ok(count) => HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!( diff --git a/aggregation_mode/gateway/src/types.rs b/aggregation_mode/gateway/src/types.rs index 7826645ab1..d89966f3e9 100644 --- a/aggregation_mode/gateway/src/types.rs +++ b/aggregation_mode/gateway/src/types.rs @@ -1,9 +1,8 @@ use actix_multipart::form::{tempfile::TempFile, text::Text, MultipartForm}; +use db::types::TaskStatus; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::db::TaskStatus; - #[derive(Serialize, Deserialize)] pub(super) struct AppResponse { status: u16, @@ -62,3 +61,11 @@ pub struct GetReceiptsResponse { pub nonce: i64, pub address: String, } + +#[derive(Debug, Clone, sqlx::FromRow, sqlx::Type, serde::Serialize)] +pub struct Receipt { + pub status: TaskStatus, + pub merkle_path: Option>, + pub nonce: i64, + pub address: String, +} From 440dc33c29395437017ecb75bf2d266df58d7832 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Fri, 19 Dec 2025 10:46:33 -0300 Subject: [PATCH 07/14] fix: build of aggregation-mode gateway --- aggregation_mode/gateway/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregation_mode/gateway/src/main.rs b/aggregation_mode/gateway/src/main.rs index 19e6ce05fa..9063aebec2 100644 --- a/aggregation_mode/gateway/src/main.rs +++ b/aggregation_mode/gateway/src/main.rs @@ -26,7 +26,7 @@ async fn main() { let config = Config::from_file(&config_file_path).expect("Config is valid"); tracing::info!("Config loaded"); - let db = Db::try_new(&config.db_connection_url) + let db = Db::try_new(&[config.db_connection_url.as_str()]) .await .expect("db to start"); From d472330d85d07b74c1d7fa1ab65abc130f418b34 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Fri, 19 Dec 2025 11:31:37 -0300 Subject: [PATCH 08/14] feat: orchestrator for payment poller --- aggregation_mode/db/src/orchestrator.rs | 2 +- aggregation_mode/payments_poller/Cargo.toml | 1 + aggregation_mode/payments_poller/src/db.rs | 55 ++++++++++++-------- aggregation_mode/payments_poller/src/main.rs | 2 +- 4 files changed, 36 insertions(+), 24 deletions(-) diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs index fd8d9819f0..117a5d9fa6 100644 --- a/aggregation_mode/db/src/orchestrator.rs +++ b/aggregation_mode/db/src/orchestrator.rs @@ -74,7 +74,7 @@ impl DbOrchestartor { } let nodes = connection_urls - .into_iter() + .iter() .map(|url| { let pool = PgPoolOptions::new().max_connections(5).connect_lazy(url)?; diff --git a/aggregation_mode/payments_poller/Cargo.toml b/aggregation_mode/payments_poller/Cargo.toml index c145fd02e5..7ed53ff805 100644 --- a/aggregation_mode/payments_poller/Cargo.toml +++ b/aggregation_mode/payments_poller/Cargo.toml @@ -8,6 +8,7 @@ serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } aligned-sdk = { workspace = true } +db = { workspace = true } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3.0", features = ["env-filter"] } actix-web = "4" diff --git a/aggregation_mode/payments_poller/src/db.rs b/aggregation_mode/payments_poller/src/db.rs index 5403b1b237..d36df71513 100644 --- a/aggregation_mode/payments_poller/src/db.rs +++ b/aggregation_mode/payments_poller/src/db.rs @@ -1,8 +1,9 @@ -use sqlx::{postgres::PgPoolOptions, types::BigDecimal, Pool, Postgres}; +use db::{orchestrator::DbOrchestartor, retry::RetryConfig}; +use sqlx::types::BigDecimal; #[derive(Clone, Debug)] pub struct Db { - pool: Pool, + orchestartor: DbOrchestartor, } #[derive(Debug, Clone)] @@ -11,14 +12,19 @@ pub enum DbError { } impl Db { - pub async fn try_new(connection_url: &str) -> Result { - let pool = PgPoolOptions::new() - .max_connections(5) - .connect(connection_url) - .await - .map_err(|e| DbError::ConnectError(e.to_string()))?; + pub async fn try_new(connection_urls: &[&str]) -> Result { + let orchestartor = DbOrchestartor::try_new( + connection_urls, + RetryConfig { + factor: 0.0, + max_delay_seconds: 0, + max_times: 0, + min_delay_millis: 0, + }, + ) + .map_err(|e| DbError::ConnectError(e.to_string()))?; - Ok(Self { pool }) + Ok(Self { orchestartor }) } pub async fn insert_payment_event( @@ -29,18 +35,23 @@ impl Db { valid_until: &BigDecimal, tx_hash: &str, ) -> Result<(), sqlx::Error> { - sqlx::query( - "INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash) - VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (tx_hash) DO NOTHING", - ) - .bind(address.to_lowercase()) - .bind(started_at) - .bind(amount) - .bind(valid_until) - .bind(tx_hash) - .execute(&self.pool) - .await - .map(|_| ()) + self.orchestartor + .write(async |pool| { + sqlx::query( + "INSERT INTO payment_events (address, started_at, amount, valid_until, tx_hash) + VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (tx_hash) DO NOTHING", + ) + .bind(address.to_lowercase()) + .bind(started_at) + .bind(amount) + .bind(valid_until) + .bind(tx_hash) + .execute(&pool) + .await?; + + Ok(()) + }) + .await } } diff --git a/aggregation_mode/payments_poller/src/main.rs b/aggregation_mode/payments_poller/src/main.rs index 2bbfea6976..6a73655ad8 100644 --- a/aggregation_mode/payments_poller/src/main.rs +++ b/aggregation_mode/payments_poller/src/main.rs @@ -26,7 +26,7 @@ async fn main() { let config = Config::from_file(&config_file_path).expect("Config is valid"); tracing::info!("Config loaded"); - let db = Db::try_new(&config.db_connection_url) + let db = Db::try_new(&[config.db_connection_url.as_str()]) .await .expect("db to start"); From 51085bda817c7269ff72407cdc6df5ee0d1d7824 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Fri, 19 Dec 2025 12:13:53 -0300 Subject: [PATCH 09/14] chore: read connection urls vector from config --- aggregation_mode/Cargo.lock | 1 + aggregation_mode/db/src/orchestrator.rs | 2 +- aggregation_mode/gateway/src/config.rs | 2 +- aggregation_mode/gateway/src/db.rs | 2 +- aggregation_mode/gateway/src/main.rs | 2 +- aggregation_mode/payments_poller/src/config.rs | 2 +- aggregation_mode/payments_poller/src/db.rs | 2 +- aggregation_mode/payments_poller/src/main.rs | 2 +- aggregation_mode/proof_aggregator/src/backend/config.rs | 2 +- aggregation_mode/proof_aggregator/src/backend/db.rs | 2 +- aggregation_mode/proof_aggregator/src/backend/mod.rs | 2 +- config-files/config-agg-mode-gateway-ethereum-package.yaml | 3 ++- config-files/config-agg-mode-gateway.yaml | 3 ++- config-files/config-proof-aggregator-ethereum-package.yaml | 3 ++- config-files/config-proof-aggregator.yaml | 3 ++- 15 files changed, 19 insertions(+), 14 deletions(-) diff --git a/aggregation_mode/Cargo.lock b/aggregation_mode/Cargo.lock index 5f8a6a0e46..20fa165a5d 100644 --- a/aggregation_mode/Cargo.lock +++ b/aggregation_mode/Cargo.lock @@ -6804,6 +6804,7 @@ dependencies = [ "actix-web", "aligned-sdk", "alloy", + "db", "hex", "serde", "serde_json", diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs index 117a5d9fa6..a266148a64 100644 --- a/aggregation_mode/db/src/orchestrator.rs +++ b/aggregation_mode/db/src/orchestrator.rs @@ -66,7 +66,7 @@ impl std::fmt::Display for DbOrchestartorError { impl DbOrchestartor { pub fn try_new( - connection_urls: &[&str], + connection_urls: &[String], retry_config: RetryConfig, ) -> Result { if connection_urls.is_empty() { diff --git a/aggregation_mode/gateway/src/config.rs b/aggregation_mode/gateway/src/config.rs index 444882ab56..8c39766712 100644 --- a/aggregation_mode/gateway/src/config.rs +++ b/aggregation_mode/gateway/src/config.rs @@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Config { pub port: u16, - pub db_connection_url: String, + pub db_connection_urls: Vec, pub network: String, pub max_daily_proofs_per_user: i64, } diff --git a/aggregation_mode/gateway/src/db.rs b/aggregation_mode/gateway/src/db.rs index 692c589a32..cca6d6c8a7 100644 --- a/aggregation_mode/gateway/src/db.rs +++ b/aggregation_mode/gateway/src/db.rs @@ -14,7 +14,7 @@ pub enum DbError { } impl Db { - pub async fn try_new(connection_urls: &[&str]) -> Result { + pub async fn try_new(connection_urls: &[String]) -> Result { let orchestrator = DbOrchestartor::try_new( connection_urls, RetryConfig { diff --git a/aggregation_mode/gateway/src/main.rs b/aggregation_mode/gateway/src/main.rs index 9063aebec2..b017936ae5 100644 --- a/aggregation_mode/gateway/src/main.rs +++ b/aggregation_mode/gateway/src/main.rs @@ -26,7 +26,7 @@ async fn main() { let config = Config::from_file(&config_file_path).expect("Config is valid"); tracing::info!("Config loaded"); - let db = Db::try_new(&[config.db_connection_url.as_str()]) + let db = Db::try_new(config.db_connection_urls.as_slice()) .await .expect("db to start"); diff --git a/aggregation_mode/payments_poller/src/config.rs b/aggregation_mode/payments_poller/src/config.rs index 5ed4a26d37..1fd15ed988 100644 --- a/aggregation_mode/payments_poller/src/config.rs +++ b/aggregation_mode/payments_poller/src/config.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Config { - pub db_connection_url: String, + pub db_connection_urls: Vec, pub eth_rpc_url: String, pub payment_service_address: String, } diff --git a/aggregation_mode/payments_poller/src/db.rs b/aggregation_mode/payments_poller/src/db.rs index d36df71513..6c611ba204 100644 --- a/aggregation_mode/payments_poller/src/db.rs +++ b/aggregation_mode/payments_poller/src/db.rs @@ -12,7 +12,7 @@ pub enum DbError { } impl Db { - pub async fn try_new(connection_urls: &[&str]) -> Result { + pub async fn try_new(connection_urls: &[String]) -> Result { let orchestartor = DbOrchestartor::try_new( connection_urls, RetryConfig { diff --git a/aggregation_mode/payments_poller/src/main.rs b/aggregation_mode/payments_poller/src/main.rs index 6a73655ad8..d24b26d5b1 100644 --- a/aggregation_mode/payments_poller/src/main.rs +++ b/aggregation_mode/payments_poller/src/main.rs @@ -26,7 +26,7 @@ async fn main() { let config = Config::from_file(&config_file_path).expect("Config is valid"); tracing::info!("Config loaded"); - let db = Db::try_new(&[config.db_connection_url.as_str()]) + let db = Db::try_new(&config.db_connection_urls.as_slice()) .await .expect("db to start"); diff --git a/aggregation_mode/proof_aggregator/src/backend/config.rs b/aggregation_mode/proof_aggregator/src/backend/config.rs index 8126db06cc..41081afc74 100644 --- a/aggregation_mode/proof_aggregator/src/backend/config.rs +++ b/aggregation_mode/proof_aggregator/src/backend/config.rs @@ -20,7 +20,7 @@ pub struct Config { pub risc0_chunk_aggregator_image_id: String, pub sp1_chunk_aggregator_vk_hash: String, pub monthly_budget_eth: f64, - pub db_connection_url: String, + pub db_connection_urls: Vec, } impl Config { diff --git a/aggregation_mode/proof_aggregator/src/backend/db.rs b/aggregation_mode/proof_aggregator/src/backend/db.rs index 63d8b19fb0..0dad14c4fe 100644 --- a/aggregation_mode/proof_aggregator/src/backend/db.rs +++ b/aggregation_mode/proof_aggregator/src/backend/db.rs @@ -23,7 +23,7 @@ pub enum DbError { } impl Db { - pub async fn try_new(connection_urls: &[&str]) -> Result { + pub async fn try_new(connection_urls: &[String]) -> Result { let orchestrator = DbOrchestartor::try_new( connection_urls, RetryConfig { diff --git a/aggregation_mode/proof_aggregator/src/backend/mod.rs b/aggregation_mode/proof_aggregator/src/backend/mod.rs index 44ff356653..8b0e28d4fc 100644 --- a/aggregation_mode/proof_aggregator/src/backend/mod.rs +++ b/aggregation_mode/proof_aggregator/src/backend/mod.rs @@ -85,7 +85,7 @@ impl ProofAggregator { let engine = ZKVMEngine::from_env().expect("AGGREGATOR env variable to be set to one of sp1|risc0"); - let db = Db::try_new(&[&config.db_connection_url]) + let db = Db::try_new(config.db_connection_urls.as_slice()) .await .expect("To connect to db"); diff --git a/config-files/config-agg-mode-gateway-ethereum-package.yaml b/config-files/config-agg-mode-gateway-ethereum-package.yaml index 21a8438268..caeae01b14 100644 --- a/config-files/config-agg-mode-gateway-ethereum-package.yaml +++ b/config-files/config-agg-mode-gateway-ethereum-package.yaml @@ -1,5 +1,6 @@ port: 8089 -db_connection_url: "postgres://postgres:postgres@localhost:5435/" +db_connection_urls: + - "postgres://postgres:postgres@localhost:5435/" eth_rpc_url: "http://localhost:8545" payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe" network: "devnet" diff --git a/config-files/config-agg-mode-gateway.yaml b/config-files/config-agg-mode-gateway.yaml index ea30755e23..9b222994da 100644 --- a/config-files/config-agg-mode-gateway.yaml +++ b/config-files/config-agg-mode-gateway.yaml @@ -1,5 +1,6 @@ port: 8089 -db_connection_url: "postgres://postgres:postgres@localhost:5435/" +db_connection_urls: + - "postgres://postgres:postgres@localhost:5435/" eth_rpc_url: "http://localhost:8545" payment_service_address: "0x922D6956C99E12DFeB3224DEA977D0939758A1Fe" network: "devnet" diff --git a/config-files/config-proof-aggregator-ethereum-package.yaml b/config-files/config-proof-aggregator-ethereum-package.yaml index a20dad54b8..23034743c6 100644 --- a/config-files/config-proof-aggregator-ethereum-package.yaml +++ b/config-files/config-proof-aggregator-ethereum-package.yaml @@ -12,7 +12,8 @@ proofs_per_chunk: 512 # Amount of proofs to process per chunk # Since each proof commitments takes 32 bytes hash # We can aggregate as much proofs as 126.976 / 32 = 3968 per blob total_proofs_limit: 3968 -db_connection_url: "postgres://postgres:postgres@localhost:5435/" +db_connection_urls: + - "postgres://postgres:postgres@localhost:5435/" # Monthly ETH budget for on-chain proof verification. This value must be non-negative. # This value will be used to calculate how much gas we can spend per second to send proofs on-chain. diff --git a/config-files/config-proof-aggregator.yaml b/config-files/config-proof-aggregator.yaml index 9fc292f041..61a4f982a4 100644 --- a/config-files/config-proof-aggregator.yaml +++ b/config-files/config-proof-aggregator.yaml @@ -12,7 +12,8 @@ proofs_per_chunk: 512 # Amount of proofs to process per chunk # Since each proof commitments takes 32 bytes hash # We can aggregate as much proofs as 126.976 / 32 = 3968 per blob total_proofs_limit: 3968 -db_connection_url: "postgres://postgres:postgres@localhost:5435/" +db_connection_urls: + - "postgres://postgres:postgres@localhost:5435/" # Monthly ETH budget for on-chain proof verification. This value must be non-negative. # This value will be used to calculate how much gas we can spend per second to send proofs on-chain. From eb7870621f4b8bd4dd0f2f31daff7ee80c42d64c Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Fri, 19 Dec 2025 14:48:12 -0300 Subject: [PATCH 10/14] chore: address clippy warnings --- aggregation_mode/db/src/orchestrator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs index a266148a64..86599e9fc0 100644 --- a/aggregation_mode/db/src/orchestrator.rs +++ b/aggregation_mode/db/src/orchestrator.rs @@ -85,7 +85,7 @@ impl DbOrchestartor { })) }) .collect::, sqlx::Error>>() - .map_err(|e| DbOrchestartorError::Sqlx(e))?; + .map_err(DbOrchestartorError::Sqlx)?; Ok(Self { nodes, From 2d4b876ec7d10dd802970bf264491352b70ae3ce Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Fri, 19 Dec 2025 16:10:12 -0300 Subject: [PATCH 11/14] chore: address more clippy warnings --- aggregation_mode/payments_poller/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aggregation_mode/payments_poller/src/main.rs b/aggregation_mode/payments_poller/src/main.rs index d24b26d5b1..a6f21d0c65 100644 --- a/aggregation_mode/payments_poller/src/main.rs +++ b/aggregation_mode/payments_poller/src/main.rs @@ -26,7 +26,7 @@ async fn main() { let config = Config::from_file(&config_file_path).expect("Config is valid"); tracing::info!("Config loaded"); - let db = Db::try_new(&config.db_connection_urls.as_slice()) + let db = Db::try_new(config.db_connection_urls.as_slice()) .await .expect("db to start"); From 34b2ece2cf96ba97f4c37df3805133536966e134 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Mon, 22 Dec 2025 13:01:04 -0300 Subject: [PATCH 12/14] refactor: comment backoff delays remove stale todos and set initial retry params --- aggregation_mode/Cargo.lock | 1 - aggregation_mode/db/Cargo.toml | 3 +- aggregation_mode/db/src/orchestrator.rs | 34 ++++++++++++++++++---- aggregation_mode/gateway/src/db.rs | 19 ++++++++---- aggregation_mode/gateway/src/http.rs | 1 - aggregation_mode/payments_poller/src/db.rs | 18 +++++++++--- 6 files changed, 58 insertions(+), 18 deletions(-) diff --git a/aggregation_mode/Cargo.lock b/aggregation_mode/Cargo.lock index 20fa165a5d..f709785cd1 100644 --- a/aggregation_mode/Cargo.lock +++ b/aggregation_mode/Cargo.lock @@ -3262,7 +3262,6 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" name = "db" version = "0.1.0" dependencies = [ - "backon", "serde", "sqlx", "tokio", diff --git a/aggregation_mode/db/Cargo.toml b/aggregation_mode/db/Cargo.toml index 288ce5083e..3b5ad05f75 100644 --- a/aggregation_mode/db/Cargo.toml +++ b/aggregation_mode/db/Cargo.toml @@ -6,8 +6,7 @@ edition = "2021" [dependencies] serde = { workspace = true } tokio = { version = "1"} -sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] } -backon = "1.2.0" +sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" , "uuid", "bigdecimal"] } tracing = { version = "0.1", features = ["log"] } [[bin]] diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs index 86599e9fc0..e611982b38 100644 --- a/aggregation_mode/db/src/orchestrator.rs +++ b/aggregation_mode/db/src/orchestrator.rs @@ -18,7 +18,6 @@ enum Operation { } /// A single DB node: connection pool plus shared health flags (used to prioritize nodes). - #[derive(Debug)] struct DbNode { pool: Pool, @@ -128,16 +127,41 @@ impl DbOrchestartor { tracing::warn!(attempt = attempts, delay_milis = delay.as_millis(), error = ?err, "retrying after backoff"); tokio::time::sleep(delay).await; - delay = self.backoff_delay(delay); + delay = self.next_backoff_delay(delay); attempts += 1; } } } } - fn backoff_delay(&self, current: Duration) -> Duration { - let max = Duration::from_secs(self.retry_config.max_delay_seconds); - let scaled_secs = current.as_secs_f64() * f64::from(self.retry_config.factor); + // Exponential backoff with a hard cap. + // + // Each retry multiplies the previous delay by `retry_config.factor`, + // then clamps it to `max_delay_seconds`. This yields: + // + // d_{n+1} = min(max, d_n * factor) => d_n = min(max, d_initial * factor^n) + // + // Example starting at 500ms with factor = 2.0 (no jitter): + // retry 0: 0.5s + // retry 1: 1.0s + // retry 2: 2.0s + // retry 3: 4.0s + // retry 4: 8.0s + // ... + // until the delay reaches `max_delay_seconds`, after which it stays at that max. + // see reference: https://en.wikipedia.org/wiki/Exponential_backoff + fn next_backoff_delay(&self, current_delay: Duration) -> Duration { + let max: Duration = Duration::from_secs(self.retry_config.max_delay_seconds); + // Defensive: factor should be >= 1.0 for backoff, we clamp it to avoid shrinking/NaN. + let factor = f64::from(self.retry_config.factor).max(1.0); + + let scaled_secs = current_delay.as_secs_f64() * factor; + let scaled_secs = if scaled_secs.is_finite() { + scaled_secs + } else { + max.as_secs_f64() + }; + let scaled = Duration::from_secs_f64(scaled_secs); if scaled > max { max diff --git a/aggregation_mode/gateway/src/db.rs b/aggregation_mode/gateway/src/db.rs index cca6d6c8a7..59f315e066 100644 --- a/aggregation_mode/gateway/src/db.rs +++ b/aggregation_mode/gateway/src/db.rs @@ -1,7 +1,16 @@ +use crate::types::Receipt; use db::{orchestrator::DbOrchestartor, retry::RetryConfig}; use sqlx::types::{BigDecimal, Uuid}; -use crate::types::Receipt; +// Retry parameters for Db queries +/// Initial delay before first retry attempt (in milliseconds) +const RETRY_MIN_DELAY_MILLIS: u64 = 500; +/// Exponential backoff multiplier for retry delays +const RETRY_FACTOR: f32 = 2.0; +/// Maximum number of retry attempts +const RETRY_MAX_TIMES: usize = 4; +/// Maximum delay between retry attempts (in seconds) +const RETRY_MAX_DELAY_SECONDS: u64 = 10; #[derive(Clone, Debug)] pub struct Db { @@ -18,10 +27,10 @@ impl Db { let orchestrator = DbOrchestartor::try_new( connection_urls, RetryConfig { - factor: 0.0, - max_delay_seconds: 0, - max_times: 0, - min_delay_millis: 0, + min_delay_millis: RETRY_MIN_DELAY_MILLIS, + factor: RETRY_FACTOR, + max_times: RETRY_MAX_TIMES, + max_delay_seconds: RETRY_MAX_DELAY_SECONDS, }, ) .map_err(|e| DbError::ConnectError(e.to_string()))?; diff --git a/aggregation_mode/gateway/src/http.rs b/aggregation_mode/gateway/src/http.rs index d72d0d19bc..2b63e78697 100644 --- a/aggregation_mode/gateway/src/http.rs +++ b/aggregation_mode/gateway/src/http.rs @@ -84,7 +84,6 @@ impl GatewayServer { .json(AppResponse::new_unsucessfull("Internal server error", 500)); }; - // TODO: how to fix the mutable thing let state = state.get_ref(); match state.db.count_tasks_by_address(&address).await { Ok(count) => HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!( diff --git a/aggregation_mode/payments_poller/src/db.rs b/aggregation_mode/payments_poller/src/db.rs index 6c611ba204..b3511693a9 100644 --- a/aggregation_mode/payments_poller/src/db.rs +++ b/aggregation_mode/payments_poller/src/db.rs @@ -1,6 +1,16 @@ use db::{orchestrator::DbOrchestartor, retry::RetryConfig}; use sqlx::types::BigDecimal; +// Retry parameters for Db queries +/// Initial delay before first retry attempt (in milliseconds) +const RETRY_MIN_DELAY_MILLIS: u64 = 500; +/// Exponential backoff multiplier for retry delays +const RETRY_FACTOR: f32 = 2.0; +/// Maximum number of retry attempts +const RETRY_MAX_TIMES: usize = 5; +/// Maximum delay between retry attempts (in seconds) +const RETRY_MAX_DELAY_SECONDS: u64 = 30; + #[derive(Clone, Debug)] pub struct Db { orchestartor: DbOrchestartor, @@ -16,10 +26,10 @@ impl Db { let orchestartor = DbOrchestartor::try_new( connection_urls, RetryConfig { - factor: 0.0, - max_delay_seconds: 0, - max_times: 0, - min_delay_millis: 0, + min_delay_millis: RETRY_MIN_DELAY_MILLIS, + factor: RETRY_FACTOR, + max_times: RETRY_MAX_TIMES, + max_delay_seconds: RETRY_MAX_DELAY_SECONDS, }, ) .map_err(|e| DbError::ConnectError(e.to_string()))?; From 7c6e035581539e7099a6548133e0c0623d28259e Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Mon, 22 Dec 2025 13:01:43 -0300 Subject: [PATCH 13/14] adjust db retries params + explanation --- aggregation_mode/gateway/src/db.rs | 22 ++++++++++++++++++- aggregation_mode/payments_poller/src/db.rs | 20 +++++++++++++++-- .../proof_aggregator/src/backend/db.rs | 22 ++++++++++++++++--- 3 files changed, 58 insertions(+), 6 deletions(-) diff --git a/aggregation_mode/gateway/src/db.rs b/aggregation_mode/gateway/src/db.rs index 59f315e066..816110d847 100644 --- a/aggregation_mode/gateway/src/db.rs +++ b/aggregation_mode/gateway/src/db.rs @@ -2,7 +2,27 @@ use crate::types::Receipt; use db::{orchestrator::DbOrchestartor, retry::RetryConfig}; use sqlx::types::{BigDecimal, Uuid}; -// Retry parameters for Db queries +// Retry/backoff behavior summary (see +// aggregation_mode/db/src/orchestrator.rs:next_back_off_delay for implementation) +// +// NOTE: These retry limits are intentionally lower than in other crates. +// This code runs in an HTTP server; in the worst case the request fails fast +// and the client can retry the request. Prolonged blocking retries here are +// less critical than in background or batch processing jobs. +// +// 1) Max wait time between failures if all retries fail: +// The sleep between retries is capped at 10 seconds (RETRY_MAX_DELAY_SECONDS). +// +// 2) Wait before each retry attempt with the current config +// (start = 500ms, factor = 2.0, max retries = 4): +// +// retry 1: 0.5s +// retry 2: 1.0s +// retry 3: 2.0s +// retry 4: 4.0s +// +// Worst-case total sleep time across all retries: 7.5 seconds, +// plus the execution time of each DB attempt. /// Initial delay before first retry attempt (in milliseconds) const RETRY_MIN_DELAY_MILLIS: u64 = 500; /// Exponential backoff multiplier for retry delays diff --git a/aggregation_mode/payments_poller/src/db.rs b/aggregation_mode/payments_poller/src/db.rs index b3511693a9..518fbac0c1 100644 --- a/aggregation_mode/payments_poller/src/db.rs +++ b/aggregation_mode/payments_poller/src/db.rs @@ -1,11 +1,27 @@ use db::{orchestrator::DbOrchestartor, retry::RetryConfig}; use sqlx::types::BigDecimal; -// Retry parameters for Db queries +// Retry/backoff behavior summary for DB queries (see +// aggregation_mode/db/src/orchestrator.rs:next_back_off_delay for implementation) +// +// 1) Max wait time between failures if all retries fail: +// The sleep between retries is capped at 30 seconds (RETRY_MAX_DELAY_SECONDS). +// +// 2) Wait before each retry attempt with the current config +// (start = 500ms, factor = 4.0, max retries = 5): +// +// retry 1: 0.5s +// retry 2: 2.0s +// retry 3: 8.0s +// retry 4: 30s (capped; 32s would have been next) +// retry 5: 30s +// +// Worst-case total sleep time across all retries: 70.5 seconds -> 5 blocks of ethereum waiting, +// plus the execution time of each DB attempt. /// Initial delay before first retry attempt (in milliseconds) const RETRY_MIN_DELAY_MILLIS: u64 = 500; /// Exponential backoff multiplier for retry delays -const RETRY_FACTOR: f32 = 2.0; +const RETRY_FACTOR: f32 = 4.0; /// Maximum number of retry attempts const RETRY_MAX_TIMES: usize = 5; /// Maximum delay between retry attempts (in seconds) diff --git a/aggregation_mode/proof_aggregator/src/backend/db.rs b/aggregation_mode/proof_aggregator/src/backend/db.rs index 0dad14c4fe..25f73a5bc8 100644 --- a/aggregation_mode/proof_aggregator/src/backend/db.rs +++ b/aggregation_mode/proof_aggregator/src/backend/db.rs @@ -1,13 +1,29 @@ use db::{orchestrator::DbOrchestartor, retry::RetryConfig, types::Task}; use sqlx::types::Uuid; -// Retry parameters for Db queries +// Retry/backoff behavior summary (see +// aggregation_mode/db/src/orchestrator.rs:next_back_off_delay for implementation) +// +// 1) Max wait time between failures if all retries fail: +// The sleep between retries is capped at 30 seconds (RETRY_MAX_DELAY_SECONDS). +// +// 2) Wait before each retry attempt with the current config +// (start = 500ms, factor = 5.0, max retries = 10): +// +// retry 1: 0.5s +// retry 2: 2.5s +// retry 3: 12.5s +// retry 4: 30s (capped) +// retry 5–10: 30s each +// +// Worst-case total sleep time across all retries: ~3m 48s, +// plus the execution time of each DB attempt. /// Initial delay before first retry attempt (in milliseconds) const RETRY_MIN_DELAY_MILLIS: u64 = 500; /// Exponential backoff multiplier for retry delays -const RETRY_FACTOR: f32 = 2.0; +const RETRY_FACTOR: f32 = 5.0; /// Maximum number of retry attempts -const RETRY_MAX_TIMES: usize = 5; +const RETRY_MAX_TIMES: usize = 10; /// Maximum delay between retry attempts (in seconds) const RETRY_MAX_DELAY_SECONDS: u64 = 30; From 37e6a8816bec2b06e7ffe0742f0d8ad94c5dd0e0 Mon Sep 17 00:00:00 2001 From: Marcos Nicolau Date: Mon, 22 Dec 2025 13:04:21 -0300 Subject: [PATCH 14/14] chore: link to aws in backoff alg --- aggregation_mode/db/src/orchestrator.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/aggregation_mode/db/src/orchestrator.rs b/aggregation_mode/db/src/orchestrator.rs index e611982b38..54b0ae5236 100644 --- a/aggregation_mode/db/src/orchestrator.rs +++ b/aggregation_mode/db/src/orchestrator.rs @@ -150,6 +150,7 @@ impl DbOrchestartor { // ... // until the delay reaches `max_delay_seconds`, after which it stays at that max. // see reference: https://en.wikipedia.org/wiki/Exponential_backoff + // and here: https://docs.aws.amazon.com/prescriptive-guidance/latest/cloud-design-patterns/retry-backoff.html fn next_backoff_delay(&self, current_delay: Duration) -> Duration { let max: Duration = Duration::from_secs(self.retry_config.max_delay_seconds); // Defensive: factor should be >= 1.0 for backoff, we clamp it to avoid shrinking/NaN.