Skip to content

Commit c4c3bf7

Browse files
authored
Append commit instead of individual transactions to commitlog (#4140)
Changes the commitlog (and durability) write API, such that the caller decides how many transactions are in a single commit, and has to supply the transaction offsets. This simplifies commitlog-side buffering logic to essentially a `BufWriter` (which, of course, we must not forget to flush). This will help throughput, but offers less opportunity to retry failed writes. This is probably a good thing, as disks can fail in erratic ways, and we should rather crash and re-verify the commitlog (suffix) than continue writing. To that end, this patch liberally raises panics when there is a chance that internal state could be "poisoned" by partial writes, which may be debatable. # Motivation The main motivation is to avoid maintaining the transaction offset in two places in such a way that they could diverge. As ordering commits is the responsibility of the datastore, we make it authoritative on this matter -- the commitlog will still check that offsets are contiguous, and refuse to commit if that's not the case. A secondary, related motivation is the following: A "commit" is an atomic unit of storage, meaning that a torn (partial) write of a commit will render the entire commit corrupt. There hasn't been a compelling case where we would want this, and have always configured the server to write exactly one transaction per commit. The code to handle buffering of transactions is, however, rather complex, as it tries hard to allow the caller to retry writes at commit boundaries. An unfortunate consequence of this is that we'd flush to the OS very often, leaving throughput performance on the table. So, if there is a compelling case for batching multiple transactions in a commit, it should be the datastore's responsibility. # API and ABI breaking changes Breaks internal APIs only. # Expected complexity level and risk 5 - Mostly for the risk # Testing Existing tests.
1 parent c044d96 commit c4c3bf7

15 files changed

Lines changed: 462 additions & 607 deletions

File tree

crates/commitlog/src/commitlog.rs

Lines changed: 154 additions & 118 deletions
Large diffs are not rendered by default.

crates/commitlog/src/lib.rs

Lines changed: 47 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
io,
3-
num::{NonZeroU16, NonZeroU64},
3+
num::NonZeroU64,
44
ops::RangeBounds,
55
sync::{Arc, RwLock},
66
};
@@ -17,10 +17,11 @@ pub mod segment;
1717
mod varchar;
1818
mod varint;
1919

20+
use crate::segment::Committed;
2021
pub use crate::{
2122
commit::{Commit, StoredCommit},
2223
payload::{Decoder, Encode},
23-
repo::fs::SizeOnDisk,
24+
repo::{fs::SizeOnDisk, TxOffset},
2425
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
2526
varchar::Varchar,
2627
};
@@ -57,14 +58,6 @@ pub struct Options {
5758
/// Default: 1GiB
5859
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))]
5960
pub max_segment_size: u64,
60-
/// The maximum number of records in a commit.
61-
///
62-
/// If this number is exceeded, the commit is flushed to disk even without
63-
/// explicitly calling [`Commitlog::flush`].
64-
///
65-
/// Default: 1
66-
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))]
67-
pub max_records_in_commit: NonZeroU16,
6861
/// Whenever at least this many bytes have been written to the currently
6962
/// active segment, an entry is added to its offset index.
7063
///
@@ -96,6 +89,12 @@ pub struct Options {
9689
/// Has no effect if the `fallocate` feature is not enabled.
9790
#[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))]
9891
pub preallocate_segments: bool,
92+
/// Size in bytes of the memory buffer holding commit data before flushing
93+
/// to storage.
94+
///
95+
/// Default: 4KiB
96+
#[cfg_attr(feature = "serde", serde(default = "Options::default_write_buffer_size"))]
97+
pub write_buffer_size: usize,
9998
}
10099

101100
impl Default for Options {
@@ -106,18 +105,18 @@ impl Default for Options {
106105

107106
impl Options {
108107
pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
109-
pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::new(1).expect("1 > 0, qed");
110108
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
111109
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
112110
pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false;
111+
pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 4 * 1024;
113112

114113
pub const DEFAULT: Self = Self {
115114
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
116115
max_segment_size: Self::default_max_segment_size(),
117-
max_records_in_commit: Self::default_max_records_in_commit(),
118116
offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
119117
offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
120118
preallocate_segments: Self::default_preallocate_segments(),
119+
write_buffer_size: Self::default_write_buffer_size(),
121120
};
122121

123122
pub const fn default_log_format_version() -> u8 {
@@ -128,10 +127,6 @@ impl Options {
128127
Self::DEFAULT_MAX_SEGMENT_SIZE
129128
}
130129

131-
pub const fn default_max_records_in_commit() -> NonZeroU16 {
132-
Self::DEFAULT_MAX_RECORDS_IN_COMMIT
133-
}
134-
135130
pub const fn default_offset_index_interval_bytes() -> NonZeroU64 {
136131
Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES
137132
}
@@ -144,6 +139,10 @@ impl Options {
144139
Self::DEFAULT_PREALLOCATE_SEGMENTS
145140
}
146141

142+
pub const fn default_write_buffer_size() -> usize {
143+
Self::DEFAULT_WRITE_BUFFER_SIZE
144+
}
145+
147146
/// Compute the length in bytes of an offset index based on the settings in
148147
/// `self`.
149148
pub fn offset_index_len(&self) -> u64 {
@@ -262,7 +261,7 @@ impl<T> Commitlog<T> {
262261
pub fn flush(&self) -> io::Result<Option<u64>> {
263262
let mut inner = self.inner.write().unwrap();
264263
trace!("flush commitlog");
265-
inner.commit()?;
264+
inner.flush()?;
266265

267266
Ok(inner.max_committed_offset())
268267
}
@@ -282,7 +281,7 @@ impl<T> Commitlog<T> {
282281
pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
283282
let mut inner = self.inner.write().unwrap();
284283
trace!("flush and sync commitlog");
285-
inner.commit()?;
284+
inner.flush()?;
286285
inner.sync();
287286

288287
Ok(inner.max_committed_offset())
@@ -383,57 +382,47 @@ impl<T> Commitlog<T> {
383382
}
384383

385384
impl<T: Encode> Commitlog<T> {
386-
/// Append the record `txdata` to the log.
385+
/// Write `transactions` to the log.
387386
///
388-
/// If the internal buffer exceeds [`Options::max_records_in_commit`], the
389-
/// argument is returned in an `Err`. The caller should [`Self::flush`] the
390-
/// log and try again.
387+
/// This will store all `transactions` as a single [Commit]
388+
/// (note that `transactions` must not yield more than [u16::MAX] elements).
391389
///
392-
/// In case the log is appended to from multiple threads, this may result in
393-
/// a busy loop trying to acquire a slot in the buffer. In such scenarios,
394-
/// [`Self::append_maybe_flush`] is preferable.
395-
pub fn append(&self, txdata: T) -> Result<(), T> {
396-
let mut inner = self.inner.write().unwrap();
397-
inner.append(txdata)
398-
}
399-
400-
/// Append the record `txdata` to the log.
390+
/// Data is buffered internally, call [Self::flush] to force flushing to
391+
/// the underlying storage.
392+
///
393+
/// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed],
394+
/// which contains the offset range and checksum of the commit.
401395
///
402-
/// The `txdata` payload is buffered in memory until either:
396+
/// # Errors
403397
///
404-
/// - [`Self::flush`] is called explicitly, or
405-
/// - [`Options::max_records_in_commit`] is exceeded
398+
/// An `Err` value is returned in the following cases:
406399
///
407-
/// In the latter case, [`Self::append`] flushes implicitly, _before_
408-
/// appending the `txdata` argument.
400+
/// - if the transaction sequence is invalid, e.g. because the transaction
401+
/// offsets are not contiguous.
409402
///
410-
/// I.e. the argument is not guaranteed to be flushed after the method
411-
/// returns. If that is desired, [`Self::flush`] must be called explicitly.
403+
/// In this case, **none** of the `transactions` will be written.
412404
///
413-
/// If writing `txdata` to the commitlog results in a new segment file being opened,
414-
/// we will send a message down `on_new_segment`.
415-
/// This will be hooked up to the `request_snapshot` channel of a `SnapshotWorker`.
405+
/// - if creating the new segment fails due to an I/O error.
416406
///
417-
/// # Errors
407+
/// # Panics
408+
///
409+
/// The method panics if:
410+
///
411+
/// - `transactions` exceeds [u16::MAX] elements
412+
///
413+
/// - [Self::flush] or writing to the underlying buffered writer fails
418414
///
419-
/// If the log needs to be flushed, but an I/O error occurs, ownership of
420-
/// `txdata` is returned back to the caller alongside the [`io::Error`].
415+
/// This is likely caused by some storage issue. As we cannot tell with
416+
/// certainty how much data (if any) has been written, the internal state
417+
/// becomes invalid and thus a panic is raised.
421418
///
422-
/// The value can then be used to retry appending.
423-
pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
419+
/// - [Self::sync] panics (called when rotating segments)
420+
pub fn commit<U: Into<Transaction<T>>>(
421+
&self,
422+
transactions: impl IntoIterator<Item = U>,
423+
) -> io::Result<Option<Committed>> {
424424
let mut inner = self.inner.write().unwrap();
425-
426-
if let Err(txdata) = inner.append(txdata) {
427-
if let Err(source) = inner.commit() {
428-
return Err(error::Append { txdata, source });
429-
}
430-
431-
// `inner.commit.n` must be zero at this point
432-
let res = inner.append(txdata);
433-
debug_assert!(res.is_ok(), "failed to append while holding write lock");
434-
}
435-
436-
Ok(())
425+
inner.commit(transactions)
437426
}
438427

439428
/// Obtain an iterator which traverses the log from the start, yielding

crates/commitlog/src/repo/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,13 +214,11 @@ pub fn create_segment_writer<R: Repo>(
214214
records: Vec::new(),
215215
epoch,
216216
},
217-
inner: io::BufWriter::new(storage),
217+
inner: io::BufWriter::with_capacity(opts.write_buffer_size, storage),
218218

219219
min_tx_offset: offset,
220220
bytes_written: Header::LEN as u64,
221221

222-
max_records_in_commit: opts.max_records_in_commit,
223-
224222
offset_index_head: create_offset_index_writer(repo, offset, opts),
225223
})
226224
}
@@ -293,8 +291,6 @@ pub fn resume_segment_writer<R: Repo>(
293291
min_tx_offset: tx_range.start,
294292
bytes_written: size_in_bytes,
295293

296-
max_records_in_commit: opts.max_records_in_commit,
297-
298294
offset_index_head: create_offset_index_writer(repo, offset, opts),
299295
}))
300296
}

0 commit comments

Comments
 (0)