Skip to content

Commit bad5335

Browse files
Revert "Append commit instead of individual transactions to commitlog (#4140)" (#4292)
Reverts #4140 per @kim's request — was not ready to merge yet. Co-authored-by: clockwork-labs-bot <clockwork-labs-bot@users.noreply.github.com>
1 parent c4c3bf7 commit bad5335

15 files changed

Lines changed: 607 additions & 462 deletions

File tree

crates/commitlog/src/commitlog.rs

Lines changed: 118 additions & 154 deletions
Large diffs are not rendered by default.

crates/commitlog/src/lib.rs

Lines changed: 58 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
io,
3-
num::NonZeroU64,
3+
num::{NonZeroU16, NonZeroU64},
44
ops::RangeBounds,
55
sync::{Arc, RwLock},
66
};
@@ -17,11 +17,10 @@ pub mod segment;
1717
mod varchar;
1818
mod varint;
1919

20-
use crate::segment::Committed;
2120
pub use crate::{
2221
commit::{Commit, StoredCommit},
2322
payload::{Decoder, Encode},
24-
repo::{fs::SizeOnDisk, TxOffset},
23+
repo::fs::SizeOnDisk,
2524
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
2625
varchar::Varchar,
2726
};
@@ -58,6 +57,14 @@ pub struct Options {
5857
/// Default: 1GiB
5958
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_segment_size"))]
6059
pub max_segment_size: u64,
60+
/// The maximum number of records in a commit.
61+
///
62+
/// If this number is exceeded, the commit is flushed to disk even without
63+
/// explicitly calling [`Commitlog::flush`].
64+
///
65+
/// Default: 1
66+
#[cfg_attr(feature = "serde", serde(default = "Options::default_max_records_in_commit"))]
67+
pub max_records_in_commit: NonZeroU16,
6168
/// Whenever at least this many bytes have been written to the currently
6269
/// active segment, an entry is added to its offset index.
6370
///
@@ -89,12 +96,6 @@ pub struct Options {
8996
/// Has no effect if the `fallocate` feature is not enabled.
9097
#[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))]
9198
pub preallocate_segments: bool,
92-
/// Size in bytes of the memory buffer holding commit data before flushing
93-
/// to storage.
94-
///
95-
/// Default: 4KiB
96-
#[cfg_attr(feature = "serde", serde(default = "Options::default_write_buffer_size"))]
97-
pub write_buffer_size: usize,
9899
}
99100

100101
impl Default for Options {
@@ -105,18 +106,18 @@ impl Default for Options {
105106

106107
impl Options {
107108
pub const DEFAULT_MAX_SEGMENT_SIZE: u64 = 1024 * 1024 * 1024;
109+
pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::new(1).expect("1 > 0, qed");
108110
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
109111
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
110112
pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false;
111-
pub const DEFAULT_WRITE_BUFFER_SIZE: usize = 4 * 1024;
112113

113114
pub const DEFAULT: Self = Self {
114115
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
115116
max_segment_size: Self::default_max_segment_size(),
117+
max_records_in_commit: Self::default_max_records_in_commit(),
116118
offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
117119
offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
118120
preallocate_segments: Self::default_preallocate_segments(),
119-
write_buffer_size: Self::default_write_buffer_size(),
120121
};
121122

122123
pub const fn default_log_format_version() -> u8 {
@@ -127,6 +128,10 @@ impl Options {
127128
Self::DEFAULT_MAX_SEGMENT_SIZE
128129
}
129130

131+
pub const fn default_max_records_in_commit() -> NonZeroU16 {
132+
Self::DEFAULT_MAX_RECORDS_IN_COMMIT
133+
}
134+
130135
pub const fn default_offset_index_interval_bytes() -> NonZeroU64 {
131136
Self::DEFAULT_OFFSET_INDEX_INTERVAL_BYTES
132137
}
@@ -139,10 +144,6 @@ impl Options {
139144
Self::DEFAULT_PREALLOCATE_SEGMENTS
140145
}
141146

142-
pub const fn default_write_buffer_size() -> usize {
143-
Self::DEFAULT_WRITE_BUFFER_SIZE
144-
}
145-
146147
/// Compute the length in bytes of an offset index based on the settings in
147148
/// `self`.
148149
pub fn offset_index_len(&self) -> u64 {
@@ -261,7 +262,7 @@ impl<T> Commitlog<T> {
261262
pub fn flush(&self) -> io::Result<Option<u64>> {
262263
let mut inner = self.inner.write().unwrap();
263264
trace!("flush commitlog");
264-
inner.flush()?;
265+
inner.commit()?;
265266

266267
Ok(inner.max_committed_offset())
267268
}
@@ -281,7 +282,7 @@ impl<T> Commitlog<T> {
281282
pub fn flush_and_sync(&self) -> io::Result<Option<u64>> {
282283
let mut inner = self.inner.write().unwrap();
283284
trace!("flush and sync commitlog");
284-
inner.flush()?;
285+
inner.commit()?;
285286
inner.sync();
286287

287288
Ok(inner.max_committed_offset())
@@ -382,47 +383,57 @@ impl<T> Commitlog<T> {
382383
}
383384

384385
impl<T: Encode> Commitlog<T> {
385-
/// Write `transactions` to the log.
386-
///
387-
/// This will store all `transactions` as a single [Commit]
388-
/// (note that `transactions` must not yield more than [u16::MAX] elements).
389-
///
390-
/// Data is buffered internally, call [Self::flush] to force flushing to
391-
/// the underlying storage.
392-
///
393-
/// Returns `Ok(None)` if `transactions` was empty, otherwise [Committed],
394-
/// which contains the offset range and checksum of the commit.
395-
///
396-
/// # Errors
386+
/// Append the record `txdata` to the log.
397387
///
398-
/// An `Err` value is returned in the following cases:
388+
/// If the internal buffer exceeds [`Options::max_records_in_commit`], the
389+
/// argument is returned in an `Err`. The caller should [`Self::flush`] the
390+
/// log and try again.
399391
///
400-
/// - if the transaction sequence is invalid, e.g. because the transaction
401-
/// offsets are not contiguous.
392+
/// In case the log is appended to from multiple threads, this may result in
393+
/// a busy loop trying to acquire a slot in the buffer. In such scenarios,
394+
/// [`Self::append_maybe_flush`] is preferable.
395+
pub fn append(&self, txdata: T) -> Result<(), T> {
396+
let mut inner = self.inner.write().unwrap();
397+
inner.append(txdata)
398+
}
399+
400+
/// Append the record `txdata` to the log.
402401
///
403-
/// In this case, **none** of the `transactions` will be written.
402+
/// The `txdata` payload is buffered in memory until either:
404403
///
405-
/// - if creating the new segment fails due to an I/O error.
404+
/// - [`Self::flush`] is called explicitly, or
405+
/// - [`Options::max_records_in_commit`] is exceeded
406406
///
407-
/// # Panics
407+
/// In the latter case, [`Self::append`] flushes implicitly, _before_
408+
/// appending the `txdata` argument.
408409
///
409-
/// The method panics if:
410+
/// I.e. the argument is not guaranteed to be flushed after the method
411+
/// returns. If that is desired, [`Self::flush`] must be called explicitly.
410412
///
411-
/// - `transactions` exceeds [u16::MAX] elements
413+
/// If writing `txdata` to the commitlog results in a new segment file being opened,
414+
/// we will send a message down `on_new_segment`.
415+
/// This will be hooked up to the `request_snapshot` channel of a `SnapshotWorker`.
412416
///
413-
/// - [Self::flush] or writing to the underlying buffered writer fails
417+
/// # Errors
414418
///
415-
/// This is likely caused by some storage issue. As we cannot tell with
416-
/// certainty how much data (if any) has been written, the internal state
417-
/// becomes invalid and thus a panic is raised.
419+
/// If the log needs to be flushed, but an I/O error occurs, ownership of
420+
/// `txdata` is returned back to the caller alongside the [`io::Error`].
418421
///
419-
/// - [Self::sync] panics (called when rotating segments)
420-
pub fn commit<U: Into<Transaction<T>>>(
421-
&self,
422-
transactions: impl IntoIterator<Item = U>,
423-
) -> io::Result<Option<Committed>> {
422+
/// The value can then be used to retry appending.
423+
pub fn append_maybe_flush(&self, txdata: T) -> Result<(), error::Append<T>> {
424424
let mut inner = self.inner.write().unwrap();
425-
inner.commit(transactions)
425+
426+
if let Err(txdata) = inner.append(txdata) {
427+
if let Err(source) = inner.commit() {
428+
return Err(error::Append { txdata, source });
429+
}
430+
431+
// `inner.commit.n` must be zero at this point
432+
let res = inner.append(txdata);
433+
debug_assert!(res.is_ok(), "failed to append while holding write lock");
434+
}
435+
436+
Ok(())
426437
}
427438

428439
/// Obtain an iterator which traverses the log from the start, yielding

crates/commitlog/src/repo/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,13 @@ pub fn create_segment_writer<R: Repo>(
214214
records: Vec::new(),
215215
epoch,
216216
},
217-
inner: io::BufWriter::with_capacity(opts.write_buffer_size, storage),
217+
inner: io::BufWriter::new(storage),
218218

219219
min_tx_offset: offset,
220220
bytes_written: Header::LEN as u64,
221221

222+
max_records_in_commit: opts.max_records_in_commit,
223+
222224
offset_index_head: create_offset_index_writer(repo, offset, opts),
223225
})
224226
}
@@ -291,6 +293,8 @@ pub fn resume_segment_writer<R: Repo>(
291293
min_tx_offset: tx_range.start,
292294
bytes_written: size_in_bytes,
293295

296+
max_records_in_commit: opts.max_records_in_commit,
297+
294298
offset_index_head: create_offset_index_writer(repo, offset, opts),
295299
}))
296300
}

0 commit comments

Comments
 (0)