From f147b707478e1f7e684edb7cc6939d3249e8cc71 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 1 Jul 2026 23:06:14 -0500 Subject: [PATCH] Add paginated payment listing API Add `DataStore::list_page`, returning one page of stored objects in reverse creation order together with a token to fetch the next page. Reads within a page are spawned concurrently, and keys removed between listing and reading are skipped rather than failing the page. Expose this via a new `Node::list_payments_paginated` method along with bindings support for `PageToken` and `PaymentDetailsPage`, allowing users to retrieve payments page-by-page instead of copying the entire payment history across the FFI boundary at once. Co-Authored-By: Claude Fable 5 --- CHANGELOG.md | 2 + bindings/ldk_node.udl | 7 +++ src/config.rs | 4 ++ src/data_store.rs | 141 +++++++++++++++++++++++++++++++++++++++++- src/ffi/types.rs | 12 +++- src/lib.rs | 25 ++++++++ 6 files changed, 188 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7e012a14..3bb83e7e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ - `EsploraSyncConfig` and `ElectrumSyncConfig` now support `force_wallet_full_scan`. When set, the on-chain wallet keeps using BDK `full_scan` instead of incremental sync until a full scan succeeds, allowing restored wallets to rediscover funds sent to previously-unknown addresses. +- A new `Node::list_payments_paginated` method allows retrieving payments page-by-page, ordered + from most recently created to least recently created, instead of all at once. ## Bug Fixes and Improvements - Building a fresh node against a Bitcoin Core RPC or REST chain source that fails to return the diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 7c0edc535..3ee987d40 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -137,6 +137,8 @@ interface Node { void remove_payment([ByRef]PaymentId payment_id); BalanceDetails list_balances(); sequence list_payments(); + [Throws=NodeError] + PaymentDetailsPage list_payments_paginated(PageToken? page_token); sequence list_peers(); sequence list_channels(); NetworkGraph network_graph(); @@ -261,6 +263,8 @@ enum PaymentFailureReason { typedef dictionary PaymentDetails; +typedef dictionary PaymentDetailsPage; + [Remote] dictionary RouteParametersConfig { u64? max_total_routing_fee_msat; @@ -376,6 +380,9 @@ typedef string Address; [Custom] typedef string OfferId; +[Custom] +typedef string PageToken; + [Custom] typedef string PaymentId; diff --git a/src/config.rs b/src/config.rs index ad1b91181..55bcba53d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -64,6 +64,10 @@ pub(crate) const BDK_CLIENT_STOP_GAP: usize = 20; // The number of concurrent requests made against the API provider. pub(crate) const BDK_CLIENT_CONCURRENCY: usize = 4; +// The maximum number of object reads we keep in flight at once when reading a page from the +// data store. +pub(crate) const DATA_STORE_READ_CONCURRENCY_LIMIT: usize = 10; + // The timeout after which we abandon retrying failed payments. pub(crate) const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/src/data_store.rs b/src/data_store.rs index 13afeca7e..38c158566 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -5,13 +5,14 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::ops::Deref; use std::sync::{Arc, Mutex}; -use lightning::util::persist::KVStore; +use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore}; use lightning::util::ser::{Readable, Writeable}; +use crate::config::DATA_STORE_READ_CONCURRENCY_LIMIT; use crate::logger::{log_error, LdkLogger}; use crate::types::DynStore; use crate::Error; @@ -179,6 +180,104 @@ where self.objects.lock().expect("lock").values().filter(f).cloned().collect::>() } + /// Returns a page of objects read from the underlying store, ordered from most recently + /// created to least recently created, together with a token that can be passed to a + /// subsequent call to retrieve the next page. + pub(crate) async fn list_page( + &self, page_token: Option, + ) -> Result<(Vec, Option), Error> { + let response = PaginatedKVStore::list_paginated( + &*self.kv_store, + &self.primary_namespace, + &self.secondary_namespace, + page_token, + ) + .await + .map_err(|e| { + log_error!( + self.logger, + "Listing object data under {}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + e + ); + Error::PersistenceFailed + })?; + + // Read the page's objects through a sliding window: keep up to + // `DATA_STORE_READ_CONCURRENCY_LIMIT` reads in flight and await them in key order to + // preserve the page's newest-first ordering. The page size is implementation-defined, + // so we can't rely on pages being small. + let mut spawn_iter = response.keys.iter(); + let mut read_handles = VecDeque::with_capacity(DATA_STORE_READ_CONCURRENCY_LIMIT); + for key in spawn_iter.by_ref().take(DATA_STORE_READ_CONCURRENCY_LIMIT) { + read_handles.push_back(tokio::spawn(KVStore::read( + &*self.kv_store, + &self.primary_namespace, + &self.secondary_namespace, + key, + ))); + } + + let mut objects = Vec::with_capacity(response.keys.len()); + for key in response.keys.iter() { + let handle = read_handles.pop_front().expect("read spawned for each key"); + + // Refill the window for every read we await, if keys remain. + if let Some(next_key) = spawn_iter.next() { + read_handles.push_back(tokio::spawn(KVStore::read( + &*self.kv_store, + &self.primary_namespace, + &self.secondary_namespace, + next_key, + ))); + } + + let read_res = handle.await.map_err(|e| { + log_error!( + self.logger, + "Reading object data for key {}/{}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + key, + e + ); + Error::PersistenceFailed + })?; + let data = match read_res { + Ok(data) => data, + // The object was removed after we listed the page's keys. Skip it rather than + // failing the page. + Err(e) if e.kind() == lightning::io::ErrorKind::NotFound => continue, + Err(e) => { + log_error!( + self.logger, + "Reading object data for key {}/{}/{} failed due to: {}", + &self.primary_namespace, + &self.secondary_namespace, + key, + e + ); + return Err(Error::PersistenceFailed); + }, + }; + let object = SO::read(&mut &data[..]).map_err(|e| { + log_error!( + self.logger, + "Failed to deserialize object data for key {}/{}/{}: {}", + &self.primary_namespace, + &self.secondary_namespace, + key, + e + ); + Error::PersistenceFailed + })?; + objects.push(object); + } + + Ok((objects, response.next_page_token)) + } + async fn persist(&self, object: &SO) -> Result<(), Error> { let (store_key, data) = Self::encode_object(object); self.persist_encoded(store_key, data).await @@ -338,6 +437,44 @@ mod tests { ) } + #[tokio::test] + async fn list_page_paginates_in_reverse_creation_order() { + let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); + let logger = Arc::new(TestLogger::new()); + let data_store: DataStore> = DataStore::new( + Vec::new(), + "datastore_test_primary".to_string(), + "datastore_test_secondary".to_string(), + Arc::clone(&store), + logger, + ); + + // Insert more objects than fit in a single page to exercise the pagination loop. + let num_objects = 120u32; + for i in 0..num_objects { + let id = TestObjectId { id: i.to_be_bytes() }; + data_store.insert(TestObject { id, data: [7u8; 3] }).await.unwrap(); + } + + let mut listed = Vec::with_capacity(num_objects as usize); + let mut page_token = None; + loop { + let (page, next_page_token) = data_store.list_page(page_token).await.unwrap(); + assert!(!page.is_empty()); + listed.extend(page); + page_token = next_page_token; + if page_token.is_none() { + break; + } + } + + let expected: Vec = (0..num_objects) + .rev() + .map(|i| TestObject { id: TestObjectId { id: i.to_be_bytes() }, data: [7u8; 3] }) + .collect(); + assert_eq!(listed, expected); + } + #[tokio::test] async fn data_is_persisted() { let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 9bb03bb07..fb5fb5ab0 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -149,7 +149,7 @@ pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; use crate::error::Error; pub use crate::liquidity::LSPS1OrderStatus; pub use crate::logger::{LogLevel, LogRecord, LogWriter}; -use crate::{hex_utils, SocketAddress, UserChannelId}; +use crate::{hex_utils, PageToken, SocketAddress, UserChannelId}; uniffi::custom_type!(PublicKey, String, { remote, @@ -893,6 +893,16 @@ uniffi::custom_type!(OfferId, String, { }, }); +uniffi::custom_type!(PageToken, String, { + remote, + try_lift: |val| { + Ok(PageToken::new(val)) + }, + lower: |obj| { + obj.to_string() + }, +}); + uniffi::custom_type!(PaymentId, String, { remote, try_lift: |val| { diff --git a/src/lib.rs b/src/lib.rs index c97e16fe6..106d0ee2b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -156,6 +156,7 @@ use lightning::ln::peer_handler::CustomMessageHandler; use lightning::routing::gossip::NodeAlias; use lightning::sign::EntropySource; use lightning::util::persist::KVStore; +pub use lightning::util::persist::PageToken; use lightning::util::wallet_utils::{Input, Wallet as LdkWallet}; use lightning_background_processor::process_events_async; pub use lightning_invoice; @@ -254,6 +255,16 @@ pub struct Node { _leak_checker: LeakChecker, } +/// A page of payments returned from a paginated listing. +#[derive(Clone, Debug, PartialEq, Eq)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct PaymentDetailsPage { + /// Payments in this page, ordered from most recently created to least recently created. + pub payments: Vec, + /// Token to pass to the next call to continue listing, if another page exists. + pub next_page_token: Option, +} + impl Node { /// Starts the necessary background tasks, such as handling events coming from user input, /// LDK/BDK, and the peer-to-peer network. @@ -2135,6 +2146,20 @@ impl Node { self.payment_store.list_filter(|_| true) } + /// Retrieves a page of payments from the underlying paginated store, ordered from most + /// recently created to least recently created. + /// + /// Pass `None` to start listing from the most recently created payment. If the returned + /// [`PaymentDetailsPage::next_page_token`] is `Some`, pass it to a subsequent call to + /// retrieve the next page. + pub fn list_payments_paginated( + &self, page_token: Option, + ) -> Result { + let (payments, next_page_token) = + self.runtime.block_on(self.payment_store.list_page(page_token))?; + Ok(PaymentDetailsPage { payments, next_page_token }) + } + /// Retrieves a list of known peers. pub fn list_peers(&self) -> Vec { let mut peers = Vec::new();