Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ pub enum DeliveryMode {
Push,
}

impl DeliveryMode {
pub fn is_push(self) -> bool {
self == DeliveryMode::Push
}
}

#[derive(PartialEq, Debug, Deserialize, Serialize)]
pub struct Config {
/// The sentry DSN to use for error reporting.
Expand Down Expand Up @@ -222,6 +228,11 @@ pub struct Config {
/// brokers are under load, or there are small networking delays.
pub processing_deadline_grace_sec: u64,

/// The number of additional seconds that claim expirations
/// are extended by. This helps reduce claim expirations when
/// brokers are under load, or there are small networking delays.
pub claim_expiration_grace_sec: u64,

/// The frequency at which upkeep tasks
/// (discarding, retrying activations, etc.) are executed.
pub upkeep_task_interval_ms: u64,
Expand Down Expand Up @@ -299,7 +310,7 @@ pub struct Config {
/// Maximum time in milliseconds for a single push RPC to the worker service. This should be greater than the worker's internal timeout.
pub push_timeout_ms: u64,

/// Update statuses from the gRPC server in batches?
/// Update statuses from the gRPC server in batches? Only applies in PUSH mode.
pub batch_status_updates: bool,

/// The size of a batch of status updates.
Expand All @@ -308,10 +319,19 @@ pub struct Config {
/// Maximum milliseconds to wait before flushing a batch of status updates.
pub status_update_interval_ms: u64,

/// The hostname used to construct `callback_url` for task push requests.
/// Update claimed → processing (dispatch) updates in batches? Only applies in PUSH mode.
pub batch_push_updates: bool,

/// The size of a batch of dispatch updates.
pub push_update_batch_size: usize,

/// Maximum milliseconds to wait before flushing a batch of dispatch updates.
pub push_update_interval_ms: u32,

/// (DEPRECATED) The hostname used to construct `callback_url` for task push requests.
pub callback_addr: String,

/// The port used to construct `callback_url` for task push requests.
/// (DEPRECATED) The port used to construct `callback_url` for task push requests.
pub callback_port: u32,

/// Maps every application to its worker endpoint, both represented as strings.
Expand Down Expand Up @@ -397,6 +417,7 @@ impl Default for Config {
max_processing_count: 2048,
max_processing_attempts: 5,
processing_deadline_grace_sec: 3,
claim_expiration_grace_sec: 3,
upkeep_task_interval_ms: 1000,
upkeep_unhealthy_interval_ms: 5000,
health_check_killswitched: false,
Expand All @@ -421,6 +442,9 @@ impl Default for Config {
batch_status_updates: false,
status_update_batch_size: 1,
status_update_interval_ms: 100,
batch_push_updates: false,
push_update_batch_size: 1,
push_update_interval_ms: 100,
callback_addr: "0.0.0.0".into(),
callback_port: 50051,
worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(),
Expand Down
56 changes: 11 additions & 45 deletions src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ use async_backtrace::framed;
use chrono::Utc;
use elegant_departure::get_shutdown_guard;
use tokio::time::sleep;
use tonic::async_trait;
use tracing::{debug, info, warn};

use crate::config::Config;
use crate::push::{PushError, PushPool};
use crate::store::activation::InflightActivation;
use crate::push::TaskPusher;
use crate::store::traits::InflightActivationStore;
use crate::store::types::BucketRange;

Expand Down Expand Up @@ -48,29 +46,6 @@ pub fn bucket_range_for_fetch_thread(thread_index: usize, fetch_threads: usize)
(low, high)
}

/// Thin interface for the push pool. It mostly serves to enable proper unit testing, but it also decouples fetch logic from push logic even further.
#[async_trait]
pub trait TaskPusher {
/// Submit a single task to the push pool.
async fn submit_task(
&self,
activation: InflightActivation,
time: Instant,
) -> Result<(), PushError>;
}

#[async_trait]
impl TaskPusher for PushPool {
#[framed]
async fn submit_task(
&self,
activation: InflightActivation,
time: Instant,
) -> Result<(), PushError> {
self.submit(activation, time).await
}
}

/// Wrapper around `config.fetch_threads` asynchronous tasks, each of which fetches batches of pending activations from the store, passes them to the push pool, and repeats.
pub struct FetchPool<T: TaskPusher> {
/// Inflight activation store.
Expand Down Expand Up @@ -122,11 +97,10 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
}

_ = async {
let start = Instant::now();

debug!("Fetching next batch of pending activations...");
metrics::counter!("fetch.loop.count").increment(1);

let start = Instant::now();
let mut backoff = false;

let result = store.claim_activations_for_push(limit, bucket).await;
Expand Down Expand Up @@ -165,13 +139,19 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
)
.record(latency as f64);
} else {
debug!(task_id = %id, namespace = activation.namespace, taskname = activation.taskname, "Activation already processed, skipping received → claimed latency recording");
debug!(
task_id = %id,
namespace = activation.namespace,
taskname = activation.taskname,
"Activation already processed, skipping \
received → claimed latency recording"
);
}

match pusher.submit_task(activation, start).await {
match pusher.push_task(activation, start).await {
Ok(()) => metrics::counter!("fetch.submit", "result" => "ok").increment(1),

Err(PushError::Timeout) => {
Err(_) => {
metrics::counter!("fetch.submit", "result" => "timeout")
.increment(1);

Expand All @@ -184,20 +164,6 @@ impl<T: TaskPusher + Send + Sync + 'static> FetchPool<T> {
// Wait for push queue to empty
backoff = true;
}

Err(PushError::Channel(e)) => {
metrics::counter!("fetch.submit", "result" => "channel_error")
.increment(1);

warn!(
task_id = %id,
error = ?e,
"Submit to push pool failed due to channel error",
);

// Wait before trying again
backoff = true;
}
}
}
}
Expand Down
15 changes: 7 additions & 8 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use tokio::time::{Duration, sleep};
use tonic::async_trait;

use crate::config::Config;
use crate::push::PushError;
use crate::store::activation::{InflightActivation, InflightActivationStatus};
use crate::store::traits::InflightActivationStore;
use crate::store::types::{BucketRange, FailedTasksForwarder};
Expand Down Expand Up @@ -98,10 +97,14 @@ impl InflightActivationStore for MockStore {
})
}

async fn mark_activation_processing(&self, _id: &str) -> Result<(), Error> {
async fn mark_processing(&self, _id: &str) -> Result<(), Error> {
Ok(())
}

async fn mark_processing_batch(&self, _ids: &[String]) -> Result<u64, Error> {
unimplemented!()
}

async fn pending_activation_max_lag(&self, _now: &DateTime<Utc>) -> f64 {
unimplemented!()
}
Expand Down Expand Up @@ -205,15 +208,11 @@ impl RecordingPusher {

#[async_trait]
impl TaskPusher for RecordingPusher {
async fn submit_task(
&self,
activation: InflightActivation,
_time: Instant,
) -> Result<(), PushError> {
async fn push_task(&self, activation: InflightActivation, _time: Instant) -> Result<()> {
self.pushed_ids.lock().await.push(activation.id.clone());

if self.fail {
return Err(PushError::Timeout);
return Err(anyhow!("timeout"));
}

Ok(())
Expand Down
35 changes: 30 additions & 5 deletions src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::pin::Pin;
use std::time::Duration;

use anyhow::Result;
use elegant_departure::get_shutdown_guard;
use tokio::sync::mpsc::Receiver;
use tracing::debug;

/// Run flusher that receives values of type T from a channel and flushes
/// them using the provided async `flush` function either when the batch is
Expand All @@ -26,10 +28,16 @@ where
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut buffer: Vec<T> = Vec::with_capacity(batch_size);
let guard = get_shutdown_guard();

loop {
tokio::select! {
msg = rx.recv() => {
biased;

// When the buffer is NOT full, try to receive another message
msg = rx.recv(), if buffer.len() < batch_size => {
debug!("Buffer is NOT full, receiving a message...");

match msg {
Some(v) => {
buffer.push(v);
Expand All @@ -39,25 +47,42 @@ where
}

if buffer.len() >= batch_size {
debug!("Flushing full buffer...");
flush(&mut buffer).await;
}
}

None => {
// Channel closed (shutdown), flush remaining and exit
flush(&mut buffer).await;
// Channel closed because all senders were dropped
debug!("Channel closed!");
break;
}
}
}

// Otherwise, try flushing whatever is in the buffer every `interval_ms` milliseconds
_ = interval.tick() => {
if !buffer.is_empty() {
flush(&mut buffer).await;
debug!("Performing periodic flush...");

if rx.is_closed() {
// Channel closed because all senders were dropped
debug!("Channel closed on tick!");
break;
}

flush(&mut buffer).await;
}
}
}

// Drain and flush before exit
while let Ok(update) = rx.try_recv() {
buffer.push(update);
}

// Delay shutdown until we have flushed everything in the buffer
flush(&mut buffer).await;
drop(guard);

Ok(())
}
3 changes: 3 additions & 0 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ impl ConsumerService for TaskbrokerServer {
}

if let Some(ref tx) = self.update_tx {
let depth = tx.max_capacity() - tx.capacity();
metrics::gauge!("grpc_server.update_queue.depth").set(depth as f64);

tx.send((id, status))
.await
.map_err(|_| Status::internal("Status update channel closed"))?;
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod store;
pub mod test_utils;
pub mod tokio;
pub mod upkeep;
pub mod worker;

/// Name of the grpc service.
/// Using the service type to get a name wasn't working across modules.
Expand Down
Loading
Loading