From 4aca010c2f58f1fc3eb8fd8b5b24a76ecac723a0 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 28 May 2026 11:32:08 -0400 Subject: [PATCH 1/3] Add partition aware depth count metric --- src/store/adapters/postgres.rs | 49 ++++++++++++++++++++++++ src/store/tests.rs | 68 ++++++++++++++++++++++++++++++++++ src/store/traits.rs | 10 +++++ src/upkeep.rs | 35 +++++++++++------ 4 files changed, 151 insertions(+), 11 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 02f1aa71..ed0e76d9 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::str::FromStr; use std::sync::RwLock; use std::time::Instant; @@ -644,6 +645,54 @@ impl ActivationStore for PostgresStore { }) } + #[instrument(skip_all)] + #[framed] + async fn count_depths_per_partition(&self) -> Result, Error> { + let mut query_builder = QueryBuilder::new( + "SELECT partition, + COUNT(*) FILTER (WHERE status = 'Pending'), + COUNT(*) FILTER (WHERE status = 'Delay'), + COUNT(*) FILTER (WHERE status = 'Claimed'), + COUNT(*) FILTER (WHERE status = 'Processing') + FROM inflight_taskactivations", + ); + + self.add_partition_condition(&mut query_builder, true); + query_builder.push(" GROUP BY partition"); + + let rows: Vec<(i32, i64, i64, i64, i64)> = query_builder + .build_query_as() + .fetch_all(&self.read_pool) + .await?; + + let mut counts: HashMap = rows + .into_iter() + .map(|(partition, pending, delay, claimed, processing)| { + ( + partition, + DepthCounts { + pending: pending as usize, + delay: delay as usize, + claimed: claimed as usize, + processing: processing as usize, + }, + ) + }) + .collect(); + + let assigned = self.partitions.read().unwrap(); + for partition in assigned.iter() { + counts.entry(*partition).or_insert(DepthCounts { + pending: 0, + delay: 0, + claimed: 0, + processing: 0, + }); + } + + Ok(counts) + } + /// Update the status of a specific activation. /// If max_attempts is provided (for Retry status), also updates the activation's retry_state. #[instrument(skip_all)] diff --git a/src/store/tests.rs b/src/store/tests.rs index 7df2c3a8..4754ece6 100644 --- a/src/store/tests.rs +++ b/src/store/tests.rs @@ -167,6 +167,74 @@ async fn test_count_depths(#[case] adapter: &str) { store.remove_db().await.unwrap(); } +#[tokio::test] +async fn test_count_depths_per_partition_postgres() { + let store = create_test_store("postgres").await; + + // Assign three partitions; partition 2 will have no activations and must + // appear in the result with zero counts (zero-fill behavior). + store.assign_partitions(vec![0, 1, 2]).unwrap(); + + let namespace = generate_unique_namespace(); + let now = Utc::now(); + + let make = |id: &str, partition: i32, offset: i64| { + ActivationBuilder::new() + .id(id.to_string()) + .taskname("taskname") + .namespace(namespace.clone()) + .partition(partition) + .added_at(now) + .received_at(now) + .offset(offset) + .processing_deadline_duration(10) + .build(TaskActivationBuilder::new()) + }; + + // Partition 0: 3 pending. Partition 1: 1 pending (later flipped to Delay). + let batch = vec![ + make("p0_0", 0, 0), + make("p0_1", 0, 1), + make("p0_2", 0, 2), + make("p1_0", 1, 3), + ]; + assert!(store.store(batch).await.is_ok()); + + store + .set_status("p0_0", ActivationStatus::Processing, None, None) + .await + .unwrap(); + store + .set_status("p1_0", ActivationStatus::Delay, None, None) + .await + .unwrap(); + + let depths = store.count_depths_per_partition().await.unwrap(); + + let p0 = depths.get(&0).expect("partition 0 missing"); + assert_eq!(p0.pending, 2, "partition 0 pending"); + assert_eq!(p0.processing, 1, "partition 0 processing"); + assert_eq!(p0.delay, 0, "partition 0 delay"); + assert_eq!(p0.claimed, 0, "partition 0 claimed"); + + let p1 = depths.get(&1).expect("partition 1 missing"); + assert_eq!(p1.pending, 0, "partition 1 pending"); + assert_eq!(p1.delay, 1, "partition 1 delay"); + assert_eq!(p1.processing, 0, "partition 1 processing"); + assert_eq!(p1.claimed, 0, "partition 1 claimed"); + + // Zero-fill: partition 2 is assigned but has no rows. + let p2 = depths + .get(&2) + .expect("partition 2 missing (zero-fill failed)"); + assert_eq!(p2.pending, 0, "partition 2 pending"); + assert_eq!(p2.delay, 0, "partition 2 delay"); + assert_eq!(p2.processing, 0, "partition 2 processing"); + assert_eq!(p2.claimed, 0, "partition 2 claimed"); + + store.remove_db().await.unwrap(); +} + #[tokio::test] #[rstest] #[case::sqlite("sqlite")] diff --git a/src/store/traits.rs b/src/store/traits.rs index 14a5425a..9040acb2 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use anyhow::{Error, anyhow}; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -125,6 +127,14 @@ pub trait ActivationStore: Send + Sync { }) } + /// Queue depths grouped by partition for upkeep gauges. The default + /// implementation returns the flat total under sentinel partition -1 for + /// stores that aren't partition-aware. + async fn count_depths_per_partition(&self) -> Result, Error> { + let total = self.count_depths().await?; + Ok(HashMap::from([(-1, total)])) + } + /// Set the processing deadline for a specific activation async fn set_processing_deadline( &self, diff --git a/src/upkeep.rs b/src/upkeep.rs index 4c83fa8f..b37feeb9 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -403,17 +403,20 @@ pub async fn do_upkeep( let now = Utc::now(); let (depth_counts, max_lag, db_file_meta, wal_file_meta) = join!( - store.count_depths(), + store.count_depths_per_partition(), store.pending_activation_max_lag(&now), fs::metadata(config.db_path.clone()), fs::metadata(config.db_path.clone() + "-wal") ); - if let Ok(depths) = depth_counts { - result_context.pending = depths.pending as u32; - result_context.delay = depths.delay as u32; - result_context.claimed = depths.claimed as u32; - result_context.processing = depths.processing as u32; + let depths_per_partition = depth_counts.ok(); + if let Some(ref depths) = depths_per_partition { + for counts in depths.values() { + result_context.pending += counts.pending as u32; + result_context.delay += counts.delay as u32; + result_context.claimed += counts.claimed as u32; + result_context.processing += counts.processing as u32; + } } if !result_context.empty() { @@ -464,11 +467,21 @@ pub async fn do_upkeep( // Forwarded tasks metrics::counter!("upkeep.forwarded_tasks").increment(result_context.forwarded); - // State of activations - metrics::gauge!("upkeep.current_pending_tasks").set(result_context.pending); - metrics::gauge!("upkeep.current_claimed_tasks").set(result_context.claimed); - metrics::gauge!("upkeep.current_processing_tasks").set(result_context.processing); - metrics::gauge!("upkeep.current_delayed_tasks").set(result_context.delay); + // State of activations, tagged per partition. Dashboards aggregating + // without a partition filter still see the global total via tag sum. + if let Some(depths) = depths_per_partition { + for (partition, counts) in depths { + let partition = partition.to_string(); + metrics::gauge!("upkeep.current_pending_tasks", "partition" => partition.clone()) + .set(counts.pending as f64); + metrics::gauge!("upkeep.current_claimed_tasks", "partition" => partition.clone()) + .set(counts.claimed as f64); + metrics::gauge!("upkeep.current_processing_tasks", "partition" => partition.clone()) + .set(counts.processing as f64); + metrics::gauge!("upkeep.current_delayed_tasks", "partition" => partition) + .set(counts.delay as f64); + } + } metrics::gauge!("upkeep.pending_activation.max_lag.sec").set(max_lag); if let Ok(db_file_meta) = db_file_meta { From 9ab18831f707da33580f26d9e055571a1fe64ce0 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 28 May 2026 14:19:45 -0400 Subject: [PATCH 2/3] Reset gauge on rebalance --- src/upkeep.rs | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/src/upkeep.rs b/src/upkeep.rs index b37feeb9..0a18c548 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -45,6 +46,7 @@ pub async fn upkeep( let mut last_run = Instant::now(); let mut last_vacuum = Instant::now(); let mut last_backtrace_log = Instant::now(); + let mut emitted_partitions: HashSet = HashSet::new(); loop { select! { _ = timer.tick() => { @@ -55,6 +57,7 @@ pub async fn upkeep( startup_time, runtime_config_manager.clone(), &mut last_vacuum, + &mut emitted_partitions, ).await; last_run = check_health(last_run, &config, health_reporter.clone()).await; @@ -126,6 +129,7 @@ pub async fn do_upkeep( startup_time: DateTime, runtime_config_manager: Arc, last_vacuum: &mut Instant, + emitted_partitions: &mut HashSet, ) -> UpkeepResults { let current_time = Utc::now(); let upkeep_start = Instant::now(); @@ -469,8 +473,22 @@ pub async fn do_upkeep( // State of activations, tagged per partition. Dashboards aggregating // without a partition filter still see the global total via tag sum. + // Zero out gauges for partitions we emitted last cycle but no longer own. if let Some(depths) = depths_per_partition { - for (partition, counts) in depths { + let current: HashSet = depths.keys().copied().collect(); + + for partition in emitted_partitions.difference(¤t) { + let partition = partition.to_string(); + metrics::gauge!("upkeep.current_pending_tasks", "partition" => partition.clone()) + .set(0.0); + metrics::gauge!("upkeep.current_claimed_tasks", "partition" => partition.clone()) + .set(0.0); + metrics::gauge!("upkeep.current_processing_tasks", "partition" => partition.clone()) + .set(0.0); + metrics::gauge!("upkeep.current_delayed_tasks", "partition" => partition).set(0.0); + } + + for (partition, counts) in &depths { let partition = partition.to_string(); metrics::gauge!("upkeep.current_pending_tasks", "partition" => partition.clone()) .set(counts.pending as f64); @@ -481,6 +499,8 @@ pub async fn do_upkeep( metrics::gauge!("upkeep.current_delayed_tasks", "partition" => partition) .set(counts.delay as f64); } + + *emitted_partitions = current; } metrics::gauge!("upkeep.pending_activation.max_lag.sec").set(max_lag); @@ -553,6 +573,7 @@ pub async fn check_health( #[cfg(test)] mod tests { + use std::collections::HashSet; use std::io::Write; use std::sync::Arc; use std::time::Duration; @@ -714,6 +735,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -774,6 +796,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -825,6 +848,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -884,6 +908,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -937,6 +962,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -983,6 +1009,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -1045,6 +1072,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -1091,6 +1119,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -1141,6 +1170,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -1223,6 +1253,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; assert_eq!(result_context.delay_elapsed, 1); @@ -1255,6 +1286,7 @@ mod tests { start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; assert_eq!(result_context.delay_elapsed, 1); @@ -1311,6 +1343,7 @@ demoted_namespaces: start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -1372,6 +1405,7 @@ demoted_namespaces: start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; @@ -1412,6 +1446,7 @@ demoted_namespaces: start_time, runtime_config.clone(), &mut last_vacuum, + &mut HashSet::new(), ) .await; From 99d6603b05f87ff94a0d09ce4530925267d2e4c7 Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 28 May 2026 14:45:45 -0400 Subject: [PATCH 3/3] Fix partition race condition --- src/store/adapters/postgres.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index ed0e76d9..1314e3b8 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -648,17 +648,24 @@ impl ActivationStore for PostgresStore { #[instrument(skip_all)] #[framed] async fn count_depths_per_partition(&self) -> Result, Error> { + let assigned: Vec = self.partitions.read().unwrap().clone(); + if assigned.is_empty() { + return Ok(HashMap::new()); + } + let mut query_builder = QueryBuilder::new( "SELECT partition, COUNT(*) FILTER (WHERE status = 'Pending'), COUNT(*) FILTER (WHERE status = 'Delay'), COUNT(*) FILTER (WHERE status = 'Claimed'), COUNT(*) FILTER (WHERE status = 'Processing') - FROM inflight_taskactivations", + FROM inflight_taskactivations WHERE partition IN (", ); - - self.add_partition_condition(&mut query_builder, true); - query_builder.push(" GROUP BY partition"); + let mut separated = query_builder.separated(", "); + for partition in &assigned { + separated.push_bind(*partition); + } + query_builder.push(") GROUP BY partition"); let rows: Vec<(i32, i64, i64, i64, i64)> = query_builder .build_query_as() @@ -680,8 +687,7 @@ impl ActivationStore for PostgresStore { }) .collect(); - let assigned = self.partitions.read().unwrap(); - for partition in assigned.iter() { + for partition in &assigned { counts.entry(*partition).or_insert(DepthCounts { pending: 0, delay: 0,