-
Notifications
You must be signed in to change notification settings - Fork 153
Add paginated payment listing API #959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,13 +5,14 @@ | |
| // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in | ||
| // accordance with one or both of these licenses. | ||
|
|
||
| use std::collections::HashMap; | ||
| use std::collections::{HashMap, VecDeque}; | ||
| use std::ops::Deref; | ||
| use std::sync::{Arc, Mutex}; | ||
|
|
||
| use lightning::util::persist::KVStore; | ||
| use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore}; | ||
| use lightning::util::ser::{Readable, Writeable}; | ||
|
|
||
| use crate::config::DATA_STORE_READ_CONCURRENCY_LIMIT; | ||
| use crate::logger::{log_error, LdkLogger}; | ||
| use crate::types::DynStore; | ||
| use crate::Error; | ||
|
|
@@ -179,6 +180,104 @@ where | |
| self.objects.lock().expect("lock").values().filter(f).cloned().collect::<Vec<SO>>() | ||
| } | ||
|
|
||
| /// Returns a page of objects read from the underlying store, ordered from most recently | ||
| /// created to least recently created, together with a token that can be passed to a | ||
| /// subsequent call to retrieve the next page. | ||
| pub(crate) async fn list_page( | ||
| &self, page_token: Option<PageToken>, | ||
| ) -> Result<(Vec<SO>, Option<PageToken>), Error> { | ||
| let response = PaginatedKVStore::list_paginated( | ||
| &*self.kv_store, | ||
| &self.primary_namespace, | ||
| &self.secondary_namespace, | ||
| page_token, | ||
| ) | ||
| .await | ||
| .map_err(|e| { | ||
| log_error!( | ||
| self.logger, | ||
| "Listing object data under {}/{} failed due to: {}", | ||
| &self.primary_namespace, | ||
| &self.secondary_namespace, | ||
| e | ||
| ); | ||
| Error::PersistenceFailed | ||
| })?; | ||
|
|
||
| // Read the page's objects through a sliding window: keep up to | ||
| // `DATA_STORE_READ_CONCURRENCY_LIMIT` reads in flight and await them in key order to | ||
| // preserve the page's newest-first ordering. The page size is implementation-defined, | ||
| // so we can't rely on pages being small. | ||
| let mut spawn_iter = response.keys.iter(); | ||
| let mut read_handles = VecDeque::with_capacity(DATA_STORE_READ_CONCURRENCY_LIMIT); | ||
| for key in spawn_iter.by_ref().take(DATA_STORE_READ_CONCURRENCY_LIMIT) { | ||
| read_handles.push_back(tokio::spawn(KVStore::read( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Drive-by question: are there plans for something like
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like a follow-up optimization that could eventually make sense (though, maybe even worth benchmarking if necessary for the concrete use cases). Mind opening an issue for it so we don't forget to revisit?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| &*self.kv_store, | ||
| &self.primary_namespace, | ||
| &self.secondary_namespace, | ||
| key, | ||
| ))); | ||
| } | ||
|
|
||
| let mut objects = Vec::with_capacity(response.keys.len()); | ||
| for key in response.keys.iter() { | ||
| let handle = read_handles.pop_front().expect("read spawned for each key"); | ||
|
|
||
| // Refill the window for every read we await, if keys remain. | ||
| if let Some(next_key) = spawn_iter.next() { | ||
| read_handles.push_back(tokio::spawn(KVStore::read( | ||
| &*self.kv_store, | ||
| &self.primary_namespace, | ||
| &self.secondary_namespace, | ||
| next_key, | ||
| ))); | ||
| } | ||
|
|
||
| let read_res = handle.await.map_err(|e| { | ||
| log_error!( | ||
| self.logger, | ||
| "Reading object data for key {}/{}/{} failed due to: {}", | ||
| &self.primary_namespace, | ||
| &self.secondary_namespace, | ||
| key, | ||
| e | ||
| ); | ||
| Error::PersistenceFailed | ||
| })?; | ||
| let data = match read_res { | ||
| Ok(data) => data, | ||
| // The object was removed after we listed the page's keys. Skip it rather than | ||
| // failing the page. | ||
| Err(e) if e.kind() == lightning::io::ErrorKind::NotFound => continue, | ||
| Err(e) => { | ||
| log_error!( | ||
| self.logger, | ||
| "Reading object data for key {}/{}/{} failed due to: {}", | ||
| &self.primary_namespace, | ||
| &self.secondary_namespace, | ||
| key, | ||
| e | ||
| ); | ||
| return Err(Error::PersistenceFailed); | ||
| }, | ||
| }; | ||
| let object = SO::read(&mut &data[..]).map_err(|e| { | ||
| log_error!( | ||
| self.logger, | ||
| "Failed to deserialize object data for key {}/{}/{}: {}", | ||
| &self.primary_namespace, | ||
| &self.secondary_namespace, | ||
| key, | ||
| e | ||
| ); | ||
| Error::PersistenceFailed | ||
| })?; | ||
| objects.push(object); | ||
| } | ||
|
|
||
| Ok((objects, response.next_page_token)) | ||
| } | ||
|
|
||
| async fn persist(&self, object: &SO) -> Result<(), Error> { | ||
| let (store_key, data) = Self::encode_object(object); | ||
| self.persist_encoded(store_key, data).await | ||
|
|
@@ -338,6 +437,44 @@ mod tests { | |
| ) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn list_page_paginates_in_reverse_creation_order() { | ||
| let store: Arc<DynStore> = Arc::new(DynStoreWrapper(InMemoryStore::new())); | ||
| let logger = Arc::new(TestLogger::new()); | ||
| let data_store: DataStore<TestObject, Arc<TestLogger>> = DataStore::new( | ||
| Vec::new(), | ||
| "datastore_test_primary".to_string(), | ||
| "datastore_test_secondary".to_string(), | ||
| Arc::clone(&store), | ||
| logger, | ||
| ); | ||
|
|
||
| // Insert more objects than fit in a single page to exercise the pagination loop. | ||
| let num_objects = 120u32; | ||
| for i in 0..num_objects { | ||
| let id = TestObjectId { id: i.to_be_bytes() }; | ||
| data_store.insert(TestObject { id, data: [7u8; 3] }).await.unwrap(); | ||
| } | ||
|
|
||
| let mut listed = Vec::with_capacity(num_objects as usize); | ||
| let mut page_token = None; | ||
| loop { | ||
| let (page, next_page_token) = data_store.list_page(page_token).await.unwrap(); | ||
| assert!(!page.is_empty()); | ||
| listed.extend(page); | ||
| page_token = next_page_token; | ||
| if page_token.is_none() { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| let expected: Vec<TestObject> = (0..num_objects) | ||
| .rev() | ||
| .map(|i| TestObject { id: TestObjectId { id: i.to_be_bytes() }, data: [7u8; 3] }) | ||
| .collect(); | ||
| assert_eq!(listed, expected); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn data_is_persisted() { | ||
| let store: Arc<DynStore> = Arc::new(DynStoreWrapper(InMemoryStore::new())); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -149,7 +149,7 @@ pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; | |
| use crate::error::Error; | ||
| pub use crate::liquidity::LSPS1OrderStatus; | ||
| pub use crate::logger::{LogLevel, LogRecord, LogWriter}; | ||
| use crate::{hex_utils, SocketAddress, UserChannelId}; | ||
| use crate::{hex_utils, PageToken, SocketAddress, UserChannelId}; | ||
|
|
||
| uniffi::custom_type!(PublicKey, String, { | ||
| remote, | ||
|
|
@@ -893,6 +893,16 @@ uniffi::custom_type!(OfferId, String, { | |
| }, | ||
| }); | ||
|
|
||
| uniffi::custom_type!(PageToken, String, { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use the remote type rather than adding a custom type (as we try to get rid of the latter over time)? |
||
| remote, | ||
| try_lift: |val| { | ||
| Ok(PageToken::new(val)) | ||
| }, | ||
| lower: |obj| { | ||
| obj.to_string() | ||
| }, | ||
| }); | ||
|
|
||
| uniffi::custom_type!(PaymentId, String, { | ||
| remote, | ||
| try_lift: |val| { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -156,6 +156,7 @@ use lightning::ln::peer_handler::CustomMessageHandler; | |
| use lightning::routing::gossip::NodeAlias; | ||
| use lightning::sign::EntropySource; | ||
| use lightning::util::persist::KVStore; | ||
| pub use lightning::util::persist::PageToken; | ||
| use lightning::util::wallet_utils::{Input, Wallet as LdkWallet}; | ||
| use lightning_background_processor::process_events_async; | ||
| pub use lightning_invoice; | ||
|
|
@@ -254,6 +255,16 @@ pub struct Node { | |
| _leak_checker: LeakChecker, | ||
| } | ||
|
|
||
| /// A page of payments returned from a paginated listing. | ||
| #[derive(Clone, Debug, PartialEq, Eq)] | ||
| #[cfg_attr(feature = "uniffi", derive(uniffi::Record))] | ||
| pub struct PaymentDetailsPage { | ||
| /// Payments in this page, ordered from most recently created to least recently created. | ||
| pub payments: Vec<PaymentDetails>, | ||
| /// Token to pass to the next call to continue listing, if another page exists. | ||
| pub next_page_token: Option<PageToken>, | ||
| } | ||
|
|
||
| impl Node { | ||
| /// Starts the necessary background tasks, such as handling events coming from user input, | ||
| /// LDK/BDK, and the peer-to-peer network. | ||
|
|
@@ -2135,6 +2146,20 @@ impl Node { | |
| self.payment_store.list_filter(|_| true) | ||
| } | ||
|
|
||
| /// Retrieves a page of payments from the underlying paginated store, ordered from most | ||
| /// recently created to least recently created. | ||
| /// | ||
| /// Pass `None` to start listing from the most recently created payment. If the returned | ||
| /// [`PaymentDetailsPage::next_page_token`] is `Some`, pass it to a subsequent call to | ||
| /// retrieve the next page. | ||
| pub fn list_payments_paginated( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, do we really want an additional method? At some point (hopefully for v0.9) we'll stop keeping all entries in memory and at that point at the latest the current shape of |
||
| &self, page_token: Option<PageToken>, | ||
| ) -> Result<PaymentDetailsPage, Error> { | ||
| let (payments, next_page_token) = | ||
| self.runtime.block_on(self.payment_store.list_page(page_token))?; | ||
| Ok(PaymentDetailsPage { payments, next_page_token }) | ||
| } | ||
|
|
||
| /// Retrieves a list of known peers. | ||
| pub fn list_peers(&self) -> Vec<PeerDetails> { | ||
| let mut peers = Vec::new(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we're still leaning on all
DataStoreentries being kept in memory everywhere else (which we should change soon as mentioned above). So right now it's a bit odd to have the paginated version being the only one calling through toKVStore, and not even using any cache (so it's likely pretty slow).So, should this just use the in-memory entries? If not, we could consider already making the jump to not keep all entries in memory, but then we'd need some (presumably LRU?) caching logic for
DataStorefirst, before we add pagination support/switch it to use pagination?