Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 99 additions & 44 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use lightning::routing::scoring::{
ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters,
};
use lightning::util::persist::{
migrate_kv_store_data_async, KVStore, KVSTORE_NAMESPACE_KEY_ALPHABET,
KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
migrate_kv_store_data_async, KVStore, PaginatedKVStore, PaginatedListResponse,
KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY,
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
Expand Down Expand Up @@ -222,7 +222,8 @@ where
})
}

/// Read all objects of type `T` from the given namespace, spawning reads in parallel.
/// Read all objects of type `T` from the given namespace, listing keys page-by-page and spawning
/// reads in parallel.
pub(crate) async fn read_all_objects<T, L>(
kv_store: &DynStore, primary_namespace: &str, secondary_namespace: &str, logger: L,
) -> Result<Vec<T>, std::io::Error>
Expand All @@ -234,55 +235,72 @@ where
let type_name = std::any::type_name::<T>();
let mut res = Vec::new();

let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?;

const BATCH_SIZE: usize = 50;

let mut set = tokio::task::JoinSet::new();

// Fill JoinSet with tasks if possible
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
let mut page_token = None;

loop {
let PaginatedListResponse { keys, next_page_token } = PaginatedKVStore::list_paginated(
&*kv_store,
primary_namespace,
secondary_namespace,
page_token,
)
.await?;
let mut stored_keys = keys;

// Fill JoinSet with tasks if possible
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
if let Some(next_key) = stored_keys.pop() {
let fut =
KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}
}
}

while let Some(read_res) = set.join_next().await {
// Exit early if we get an IO error.
let reader = read_res
.map_err(|e| {
log_error!(logger, "Failed to read {}: {}", type_name, e);
set.abort_all();
e
})?
.map_err(|e| {
log_error!(logger, "Failed to read {}: {}", type_name, e);
set.abort_all();
e
})?;
while let Some(read_res) = set.join_next().await {
// Exit early if we get an IO error.
let reader = read_res
.map_err(|e| {
log_error!(logger, "Failed to read {}: {}", type_name, e);
set.abort_all();
e
})?
.map_err(|e| {
log_error!(logger, "Failed to read {}: {}", type_name, e);
set.abort_all();
e
})?;

// Refill set for every finished future, if we still have something to do.
if let Some(next_key) = stored_keys.pop() {
let fut =
KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}

// Refill set for every finished future, if we still have something to do.
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
// Handle result.
let object = T::read(&mut &*reader).map_err(|e| {
log_error!(logger, "Failed to deserialize {}: {}", type_name, e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Failed to deserialize {}", type_name),
)
})?;
res.push(object);
}

// Handle result.
let object = T::read(&mut &*reader).map_err(|e| {
log_error!(logger, "Failed to deserialize {}: {}", type_name, e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Failed to deserialize {}", type_name),
)
})?;
res.push(object);
}
debug_assert!(set.is_empty());
debug_assert!(stored_keys.is_empty());

debug_assert!(set.is_empty());
debug_assert!(stored_keys.is_empty());
page_token = next_page_token;
if page_token.is_none() {
break;
}
}

Ok(res)
}
Expand Down Expand Up @@ -718,19 +736,56 @@ fn recover_incomplete_fs_store_migration(storage_dir_path: &Path) -> Result<(),
mod tests {
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use lightning::util::persist::{migrate_kv_store_data_async, KVStore};
use lightning::util::ser::Writeable;
use lightning::util::test_utils::TestLogger;
use lightning_persister::fs_store::v1::FilesystemStore;
use lightning_persister::fs_store::v2::FilesystemStoreV2;

use super::test_utils::random_storage_path;
use super::{open_or_migrate_fs_store, read_or_generate_seed_file};
use super::{open_or_migrate_fs_store, read_all_objects, read_or_generate_seed_file};
use crate::io::test_utils::InMemoryStore;
use crate::types::{DynStore, DynStoreWrapper};

const TEST_PRIMARY_NAMESPACE: &str = "test_primary_namespace";
const TEST_SECONDARY_NAMESPACE: &str = "test_secondary_namespace";
const TEST_KEY: &str = "test_key";
const TEST_VALUE: &[u8] = b"test_value";

#[tokio::test]
async fn read_all_objects_reads_across_pages() {
let store: Arc<DynStore> = Arc::new(DynStoreWrapper(InMemoryStore::new()));
let logger = Arc::new(TestLogger::new());

// Write more objects than fit in a single page to exercise the pagination loop.
let num_objects = 120u64;
for i in 0..num_objects {
let key = format!("key_{:03}", i);
KVStore::write(
&*store,
TEST_PRIMARY_NAMESPACE,
TEST_SECONDARY_NAMESPACE,
&key,
i.encode(),
)
.await
.unwrap();
}

let mut read: Vec<u64> = read_all_objects(
&*store,
TEST_PRIMARY_NAMESPACE,
TEST_SECONDARY_NAMESPACE,
Arc::clone(&logger),
)
.await
.unwrap();
read.sort_unstable();
assert_eq!(read, (0..num_objects).collect::<Vec<u64>>());
}

#[test]
fn generated_seed_is_readable() {
let mut rand_path = random_storage_path();
Expand Down
Loading