Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/store/adapters/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::RwLock;
use std::time::Instant;
Expand Down Expand Up @@ -644,6 +645,60 @@ impl ActivationStore for PostgresStore {
})
}

#[instrument(skip_all)]
#[framed]
async fn count_depths_per_partition(&self) -> Result<HashMap<i32, DepthCounts>, Error> {
let assigned: Vec<i32> = self.partitions.read().unwrap().clone();
if assigned.is_empty() {
return Ok(HashMap::new());
}

let mut query_builder = QueryBuilder::new(
"SELECT partition,
COUNT(*) FILTER (WHERE status = 'Pending'),
COUNT(*) FILTER (WHERE status = 'Delay'),
COUNT(*) FILTER (WHERE status = 'Claimed'),
COUNT(*) FILTER (WHERE status = 'Processing')
FROM inflight_taskactivations WHERE partition IN (",
);
let mut separated = query_builder.separated(", ");
for partition in &assigned {
separated.push_bind(*partition);
}
query_builder.push(") GROUP BY partition");

let rows: Vec<(i32, i64, i64, i64, i64)> = query_builder
.build_query_as()
.fetch_all(&self.read_pool)
.await?;

let mut counts: HashMap<i32, DepthCounts> = rows
.into_iter()
.map(|(partition, pending, delay, claimed, processing)| {
(
partition,
DepthCounts {
pending: pending as usize,
delay: delay as usize,
claimed: claimed as usize,
processing: processing as usize,
},
)
})
.collect();

for partition in &assigned {
counts.entry(*partition).or_insert(DepthCounts {
pending: 0,
delay: 0,
claimed: 0,
processing: 0,
});
}
Comment thread
cursor[bot] marked this conversation as resolved.

Ok(counts)
}

/// Update the status of a specific activation.
/// If max_attempts is provided (for Retry status), also updates the activation's retry_state.
#[instrument(skip_all)]
Expand Down
68 changes: 68 additions & 0 deletions src/store/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,74 @@ async fn test_count_depths(#[case] adapter: &str) {
store.remove_db().await.unwrap();
}

#[tokio::test]
async fn test_count_depths_per_partition_postgres() {
let store = create_test_store("postgres").await;

// Assign three partitions; partition 2 will have no activations and must
// appear in the result with zero counts (zero-fill behavior).
store.assign_partitions(vec![0, 1, 2]).unwrap();

let namespace = generate_unique_namespace();
let now = Utc::now();

let make = |id: &str, partition: i32, offset: i64| {
ActivationBuilder::new()
.id(id.to_string())
.taskname("taskname")
.namespace(namespace.clone())
.partition(partition)
.added_at(now)
.received_at(now)
.offset(offset)
.processing_deadline_duration(10)
.build(TaskActivationBuilder::new())
};

// Partition 0: 3 pending. Partition 1: 1 pending (later flipped to Delay).
let batch = vec![
make("p0_0", 0, 0),
make("p0_1", 0, 1),
make("p0_2", 0, 2),
make("p1_0", 1, 3),
];
assert!(store.store(batch).await.is_ok());

store
.set_status("p0_0", ActivationStatus::Processing, None, None)
.await
.unwrap();
store
.set_status("p1_0", ActivationStatus::Delay, None, None)
.await
.unwrap();

let depths = store.count_depths_per_partition().await.unwrap();

let p0 = depths.get(&0).expect("partition 0 missing");
assert_eq!(p0.pending, 2, "partition 0 pending");
assert_eq!(p0.processing, 1, "partition 0 processing");
assert_eq!(p0.delay, 0, "partition 0 delay");
assert_eq!(p0.claimed, 0, "partition 0 claimed");

let p1 = depths.get(&1).expect("partition 1 missing");
assert_eq!(p1.pending, 0, "partition 1 pending");
assert_eq!(p1.delay, 1, "partition 1 delay");
assert_eq!(p1.processing, 0, "partition 1 processing");
assert_eq!(p1.claimed, 0, "partition 1 claimed");

// Zero-fill: partition 2 is assigned but has no rows.
let p2 = depths
.get(&2)
.expect("partition 2 missing (zero-fill failed)");
assert_eq!(p2.pending, 0, "partition 2 pending");
assert_eq!(p2.delay, 0, "partition 2 delay");
assert_eq!(p2.processing, 0, "partition 2 processing");
assert_eq!(p2.claimed, 0, "partition 2 claimed");

store.remove_db().await.unwrap();
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
Expand Down
10 changes: 10 additions & 0 deletions src/store/traits.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

use anyhow::{Error, anyhow};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -125,6 +127,14 @@ pub trait ActivationStore: Send + Sync {
})
}

/// Queue depths grouped by partition for upkeep gauges. The default
/// implementation returns the flat total under sentinel partition -1 for
/// stores that aren't partition-aware.
async fn count_depths_per_partition(&self) -> Result<HashMap<i32, DepthCounts>, Error> {
let total = self.count_depths().await?;
Ok(HashMap::from([(-1, total)]))
}

/// Set the processing deadline for a specific activation
async fn set_processing_deadline(
&self,
Expand Down
70 changes: 59 additions & 11 deletions src/upkeep.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -45,6 +46,7 @@ pub async fn upkeep(
let mut last_run = Instant::now();
let mut last_vacuum = Instant::now();
let mut last_backtrace_log = Instant::now();
let mut emitted_partitions: HashSet<i32> = HashSet::new();
loop {
select! {
_ = timer.tick() => {
Expand All @@ -55,6 +57,7 @@ pub async fn upkeep(
startup_time,
runtime_config_manager.clone(),
&mut last_vacuum,
&mut emitted_partitions,
).await;
last_run = check_health(last_run, &config, health_reporter.clone()).await;

Expand Down Expand Up @@ -126,6 +129,7 @@ pub async fn do_upkeep(
startup_time: DateTime<Utc>,
runtime_config_manager: Arc<RuntimeConfigManager>,
last_vacuum: &mut Instant,
emitted_partitions: &mut HashSet<i32>,
) -> UpkeepResults {
let current_time = Utc::now();
let upkeep_start = Instant::now();
Expand Down Expand Up @@ -403,17 +407,20 @@ pub async fn do_upkeep(

let now = Utc::now();
let (depth_counts, max_lag, db_file_meta, wal_file_meta) = join!(
store.count_depths(),
store.count_depths_per_partition(),
store.pending_activation_max_lag(&now),
fs::metadata(config.db_path.clone()),
fs::metadata(config.db_path.clone() + "-wal")
);

if let Ok(depths) = depth_counts {
result_context.pending = depths.pending as u32;
result_context.delay = depths.delay as u32;
result_context.claimed = depths.claimed as u32;
result_context.processing = depths.processing as u32;
let depths_per_partition = depth_counts.ok();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need an Option? Couldn't you use the result directly?

if let Some(ref depths) = depths_per_partition {
for counts in depths.values() {
result_context.pending += counts.pending as u32;
result_context.delay += counts.delay as u32;
result_context.claimed += counts.claimed as u32;
result_context.processing += counts.processing as u32;
}
}

if !result_context.empty() {
Expand Down Expand Up @@ -464,11 +471,37 @@ pub async fn do_upkeep(
// Forwarded tasks
metrics::counter!("upkeep.forwarded_tasks").increment(result_context.forwarded);

// State of activations
metrics::gauge!("upkeep.current_pending_tasks").set(result_context.pending);
metrics::gauge!("upkeep.current_claimed_tasks").set(result_context.claimed);
metrics::gauge!("upkeep.current_processing_tasks").set(result_context.processing);
metrics::gauge!("upkeep.current_delayed_tasks").set(result_context.delay);
// State of activations, tagged per partition. Dashboards aggregating
// without a partition filter still see the global total via tag sum.
// Zero out gauges for partitions we emitted last cycle but no longer own.
if let Some(depths) = depths_per_partition {
let current: HashSet<i32> = depths.keys().copied().collect();

for partition in emitted_partitions.difference(&current) {
let partition = partition.to_string();
metrics::gauge!("upkeep.current_pending_tasks", "partition" => partition.clone())
.set(0.0);
metrics::gauge!("upkeep.current_claimed_tasks", "partition" => partition.clone())
.set(0.0);
metrics::gauge!("upkeep.current_processing_tasks", "partition" => partition.clone())
.set(0.0);
metrics::gauge!("upkeep.current_delayed_tasks", "partition" => partition).set(0.0);
}

for (partition, counts) in &depths {
let partition = partition.to_string();
metrics::gauge!("upkeep.current_pending_tasks", "partition" => partition.clone())
.set(counts.pending as f64);
metrics::gauge!("upkeep.current_claimed_tasks", "partition" => partition.clone())
.set(counts.claimed as f64);
metrics::gauge!("upkeep.current_processing_tasks", "partition" => partition.clone())
.set(counts.processing as f64);
metrics::gauge!("upkeep.current_delayed_tasks", "partition" => partition)
.set(counts.delay as f64);
}

*emitted_partitions = current;
}
Comment thread
sentry[bot] marked this conversation as resolved.
metrics::gauge!("upkeep.pending_activation.max_lag.sec").set(max_lag);

if let Ok(db_file_meta) = db_file_meta {
Expand Down Expand Up @@ -540,6 +573,7 @@ pub async fn check_health(

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::io::Write;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -701,6 +735,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -761,6 +796,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -812,6 +848,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -871,6 +908,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -924,6 +962,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -970,6 +1009,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -1032,6 +1072,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -1078,6 +1119,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -1128,6 +1170,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -1210,6 +1253,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;
assert_eq!(result_context.delay_elapsed, 1);
Expand Down Expand Up @@ -1242,6 +1286,7 @@ mod tests {
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;
assert_eq!(result_context.delay_elapsed, 1);
Expand Down Expand Up @@ -1298,6 +1343,7 @@ demoted_namespaces:
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -1359,6 +1405,7 @@ demoted_namespaces:
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down Expand Up @@ -1399,6 +1446,7 @@ demoted_namespaces:
start_time,
runtime_config.clone(),
&mut last_vacuum,
&mut HashSet::new(),
)
.await;

Expand Down
Loading