diff --git a/ldk-server/src/api/list_forwarded_payments.rs b/ldk-server/src/api/list_forwarded_payments.rs index f784c457..7baf491d 100644 --- a/ldk-server/src/api/list_forwarded_payments.rs +++ b/ldk-server/src/api/list_forwarded_payments.rs @@ -28,7 +28,7 @@ pub(crate) async fn handle_list_forwarded_payments_request( let page_token = request.page_token.map(|p| (p.token, p.index)); let list_response = context .paginated_kv_store - .list( + .list_values( FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, page_token, @@ -41,21 +41,8 @@ pub(crate) async fn handle_list_forwarded_payments_request( })?; let mut forwarded_payments: Vec = - Vec::with_capacity(list_response.keys.len()); - for key in list_response.keys { - let forwarded_payment_bytes = context - .paginated_kv_store - .read( - FORWARDED_PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - FORWARDED_PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .map_err(|e| { - LdkServerError::new( - InternalServerError, - format!("Failed to read forwarded payment data: {}", e), - ) - })?; + Vec::with_capacity(list_response.items.len()); + for (_, forwarded_payment_bytes) in list_response.items { let forwarded_payment = ForwardedPayment::decode(Bytes::from(forwarded_payment_bytes)) .map_err(|e| { LdkServerError::new( diff --git a/ldk-server/src/api/list_payments.rs b/ldk-server/src/api/list_payments.rs index 502d7458..781709e8 100644 --- a/ldk-server/src/api/list_payments.rs +++ b/ldk-server/src/api/list_payments.rs @@ -27,7 +27,7 @@ pub(crate) async fn handle_list_payments_request( let page_token = request.page_token.map(|p| (p.token, p.index)); let list_response = context .paginated_kv_store - .list( + .list_values( PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, page_token, @@ -36,21 +36,8 @@ pub(crate) async fn handle_list_payments_request( LdkServerError::new(InternalServerError, format!("Failed to list payments: {}", e)) })?; - let mut payments: Vec = Vec::with_capacity(list_response.keys.len()); - for key in list_response.keys { - let payment_bytes = context - .paginated_kv_store - .read( - PAYMENTS_PERSISTENCE_PRIMARY_NAMESPACE, - PAYMENTS_PERSISTENCE_SECONDARY_NAMESPACE, - &key, - ) - .map_err(|e| { - LdkServerError::new( - InternalServerError, - format!("Failed to read payment data: {}", e), - ) - })?; + let mut payments: Vec = Vec::with_capacity(list_response.items.len()); + for (_, payment_bytes) in list_response.items { let payment = Payment::decode(Bytes::from(payment_bytes)).map_err(|e| { LdkServerError::new(InternalServerError, format!("Failed to decode payment: {}", e)) })?; diff --git a/ldk-server/src/io/persist/paginated_kv_store.rs b/ldk-server/src/io/persist/paginated_kv_store.rs index 7a7f66b8..21dd4441 100644 --- a/ldk-server/src/io/persist/paginated_kv_store.rs +++ b/ldk-server/src/io/persist/paginated_kv_store.rs @@ -42,6 +42,7 @@ pub trait PaginatedKVStore: Send + Sync { /// `primary_namespace` and `secondary_namespace`. /// /// [`ErrorKind::NotFound`]: io::ErrorKind::NotFound + #[cfg_attr(not(test), allow(dead_code))] fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Result, io::Error>; @@ -74,16 +75,27 @@ pub trait PaginatedKVStore: Send + Sync { /// there are no more keys to return. /// /// [`ListResponse`]: struct.ListResponse.html + #[cfg_attr(not(test), allow(dead_code))] fn list( &self, primary_namespace: &str, secondary_namespace: &str, next_page_token: Option<(String, i64)>, ) -> Result; + + /// Returns a paginated list of keys and values that are stored under the given + /// `secondary_namespace` in `primary_namespace`, ordered in descending order of `time`. + /// + /// This follows the same pagination and ordering semantics as [`PaginatedKVStore::list`]. + fn list_values( + &self, primary_namespace: &str, secondary_namespace: &str, + next_page_token: Option<(String, i64)>, + ) -> Result; } /// Represents the response from a paginated `list` operation. /// /// Contains the list of keys and an optional `next_page_token` that can be used to retrieve the /// next set of keys. +#[cfg_attr(not(test), allow(dead_code))] pub struct ListResponse { /// A vector of keys, ordered in descending order of `time`. pub keys: Vec, @@ -91,3 +103,15 @@ pub struct ListResponse { /// A token that can be used to retrieve the next set of keys. pub next_page_token: Option<(String, i64)>, } + +/// Represents the response from a paginated `list_values` operation. +/// +/// Contains the list of keys and values and an optional `next_page_token` that can be used to +/// retrieve the next set of items. +pub struct ListValuesResponse { + /// A vector of key-value pairs, ordered in descending order of `time`. + pub items: Vec<(String, Vec)>, + + /// A token that can be used to retrieve the next set of items. + pub next_page_token: Option<(String, i64)>, +} diff --git a/ldk-server/src/io/persist/sqlite_store/mod.rs b/ldk-server/src/io/persist/sqlite_store/mod.rs index 45e421ed..fa62c5a0 100644 --- a/ldk-server/src/io/persist/sqlite_store/mod.rs +++ b/ldk-server/src/io/persist/sqlite_store/mod.rs @@ -14,7 +14,7 @@ use std::{fs, io}; use ldk_node::lightning::types::string::PrintableString; use rusqlite::{named_params, Connection}; -use crate::io::persist::paginated_kv_store::{ListResponse, PaginatedKVStore}; +use crate::io::persist::paginated_kv_store::{ListResponse, ListValuesResponse, PaginatedKVStore}; use crate::io::utils::check_namespace_key_validity; /// The default database file name. @@ -105,14 +105,19 @@ impl SqliteStore { io::Error::other(msg) })?; - let index_creation_time_sql = format!( - "CREATE INDEX IF NOT EXISTS idx_creation_time ON {} (creation_time);", + let index_paginated_kv_namespace_time_key_sql = format!( + "CREATE INDEX IF NOT EXISTS idx_paginated_kv_namespace_time_key ON {} ( + primary_namespace, + secondary_namespace, + creation_time DESC, + key ASC + );", paginated_kv_table_name ); - connection.execute(&index_creation_time_sql, []).map_err(|e| { + connection.execute(&index_paginated_kv_namespace_time_key_sql, []).map_err(|e| { let msg = format!( - "Failed to create index on creation_time, table {}: {}", + "Failed to create paginated namespace index, table {}: {}", paginated_kv_table_name, e ); io::Error::other(msg) @@ -122,6 +127,7 @@ impl SqliteStore { Ok(Self { connection, paginated_kv_table_name }) } + #[cfg_attr(not(test), allow(dead_code))] fn read_internal( &self, kv_table_name: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { @@ -285,6 +291,74 @@ impl PaginatedKVStore for SqliteStore { Ok(ListResponse { keys, next_page_token }) } + + fn list_values( + &self, primary_namespace: &str, secondary_namespace: &str, + page_token: Option<(String, i64)>, + ) -> io::Result { + check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list_values")?; + + let locked_conn = self.connection.lock().unwrap(); + + // The cursor predicate is paired with this exact ordering: newer creation times first, + // then keys ascending to make rows with the same creation time deterministic. + let sql = format!( + "SELECT key, creation_time, value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace \ + AND ( creation_time < :creation_time_token OR (creation_time = :creation_time_token AND key > :key_token) ) \ + ORDER BY creation_time DESC, key ASC LIMIT :page_size", + self.paginated_kv_table_name + ); + + let mut stmt = locked_conn.prepare_cached(&sql).map_err(|e| { + let msg = format!("Failed to prepare statement: {}", e); + io::Error::other(msg) + })?; + + let mut items: Vec<(String, Vec)> = Vec::new(); + let page_token = page_token.unwrap_or(("".to_string(), i64::MAX)); + + let rows_iter = stmt + .query_map( + named_params! { + ":primary_namespace": primary_namespace, + ":secondary_namespace": secondary_namespace, + ":key_token": page_token.0, + ":creation_time_token": page_token.1, + ":page_size": LIST_KEYS_MAX_PAGE_SIZE, + }, + |row| { + let key: String = row.get(0)?; + let creation_time: i64 = row.get(1)?; + let value: Vec = row.get(2)?; + Ok((key, creation_time, value)) + }, + ) + .map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::other(msg) + })?; + + let mut last_creation_time: Option = None; + for r in rows_iter { + let (k, ct, value) = r.map_err(|e| { + let msg = format!("Failed to retrieve queried rows: {}", e); + io::Error::other(msg) + })?; + items.push((k, value)); + last_creation_time = Some(ct); + } + + let last_key = items.last().map(|(key, _)| key.clone()); + // Match list's token behavior: EOF is confirmed by a following empty page, not by + // withholding the token from a short non-empty page. + let next_page_token = if let (Some(k), Some(ct)) = (last_key, last_creation_time) { + Some((k, ct)) + } else { + None + }; + + Ok(ListValuesResponse { items, next_page_token }) + } } #[cfg(test)] @@ -343,6 +417,24 @@ mod tests { all_keys }; + let list_all_items = + |primary_namespace: &str, secondary_namespace: &str| -> Vec<(String, Vec)> { + let mut all_items = Vec::new(); + let mut page_token = None; + loop { + let list_response = kv_store + .list_values(primary_namespace, secondary_namespace, page_token) + .unwrap(); + assert!(list_response.items.len() <= LIST_KEYS_MAX_PAGE_SIZE as usize); + all_items.extend(list_response.items); + if list_response.next_page_token.is_none() { + break; + } + page_token = list_response.next_page_token; + } + all_items + }; + // Test the basic KVStore operations. for i in 0..110 { kv_store @@ -365,6 +457,14 @@ mod tests { assert_eq!(listed_keys.len(), 110); assert_eq!(listed_keys[0], testkey); + let listed_items = list_all_items(primary_namespace, secondary_namespace); + let listed_item_keys: Vec = + listed_items.iter().map(|(key, _)| key.clone()).collect(); + assert_eq!(listed_item_keys, listed_keys); + for (_, value) in listed_items { + assert_eq!(data, &*value); + } + let read_data = kv_store.read(primary_namespace, secondary_namespace, testkey).unwrap(); assert_eq!(data, &*read_data); @@ -378,6 +478,11 @@ mod tests { assert_eq!(listed_keys.len(), 1); assert_eq!(listed_keys[0], max_chars); + let listed_items = list_all_items(&max_chars, &max_chars); + assert_eq!(listed_items.len(), 1); + assert_eq!(listed_items[0].0.as_str(), max_chars.as_str()); + assert_eq!(data, listed_items[0].1.as_slice()); + let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap(); assert_eq!(data, &*read_data); }