Skip to content

Commit 7cd72b8

Browse files
authored
commitlog: Resumption of sealed commitlog (#4650)
The commitlog so far assumed that the latest segment is never compressed and can be opened for writing (if it is intact). However, restoring the entire commitlog from cold storage results in all segments being compressed. Make it so the resumption logic reads the metadata from the potentially compressed last segment, and starts a new segment for writing if the latest one was indeed compressed. # Expected complexity level and risk 1.5 # Testing Added a test.
1 parent 68022eb commit 7cd72b8

6 files changed

Lines changed: 203 additions & 47 deletions

File tree

crates/commitlog/src/repo/fs.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tempfile::NamedTempFile;
1010

1111
use crate::segment::FileLike;
1212

13-
use super::{Repo, SegmentLen, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
13+
use super::{Repo, SegmentLen, SegmentReader, TxOffset, TxOffsetIndex, TxOffsetIndexMut};
1414

1515
const SEGMENT_FILE_EXT: &str = ".stdb.log";
1616

@@ -154,9 +154,52 @@ impl FileLike for NamedTempFile {
154154
}
155155
}
156156

157+
/// A file-backed, read-only segment.
158+
///
159+
/// Transparently handles reading compressed segments.
160+
/// [Self::sealed] returns `true` if the segment is compressed.
161+
pub struct ReadOnlySegment {
162+
inner: CompressReader,
163+
}
164+
165+
impl SegmentReader for ReadOnlySegment {
166+
#[inline]
167+
fn sealed(&self) -> bool {
168+
self.inner.is_compressed()
169+
}
170+
}
171+
172+
impl io::Read for ReadOnlySegment {
173+
#[inline]
174+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
175+
self.inner.read(buf)
176+
}
177+
}
178+
179+
impl io::BufRead for ReadOnlySegment {
180+
#[inline]
181+
fn fill_buf(&mut self) -> io::Result<&[u8]> {
182+
self.inner.fill_buf()
183+
}
184+
185+
#[inline]
186+
fn consume(&mut self, amount: usize) {
187+
self.inner.consume(amount);
188+
}
189+
}
190+
191+
impl io::Seek for ReadOnlySegment {
192+
#[inline]
193+
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
194+
self.inner.seek(pos)
195+
}
196+
}
197+
198+
impl SegmentLen for ReadOnlySegment {}
199+
157200
impl Repo for Fs {
158201
type SegmentWriter = File;
159-
type SegmentReader = CompressReader;
202+
type SegmentReader = ReadOnlySegment;
160203

161204
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
162205
File::options()
@@ -198,8 +241,9 @@ impl Repo for Fs {
198241
}
199242

200243
fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
244+
debug!("fs: open segment at {}", self.segment_path(offset).display());
201245
let file = File::open(self.segment_path(offset))?;
202-
CompressReader::new(file)
246+
CompressReader::new(file).map(|inner| ReadOnlySegment { inner })
203247
}
204248

205249
fn remove_segment(&self, offset: u64) -> io::Result<()> {
@@ -215,7 +259,7 @@ impl Repo for Fs {
215259
fn compress_segment(&self, offset: u64) -> io::Result<()> {
216260
let src = self.open_segment_reader(offset)?;
217261
// if it's already compressed, leave it be
218-
let CompressReader::None(mut src) = src else {
262+
let CompressReader::None(mut src) = src.inner else {
219263
return Ok(());
220264
};
221265

crates/commitlog/src/repo/mem.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::repo::{
1010
};
1111

1212
mod segment;
13-
pub use segment::Segment;
13+
pub use segment::{ReadOnlySegment, Segment};
1414

1515
pub const PAGE_SIZE: usize = 4096;
1616

@@ -52,7 +52,7 @@ impl fmt::Display for Memory {
5252

5353
impl Repo for Memory {
5454
type SegmentWriter = Segment;
55-
type SegmentReader = io::BufReader<Segment>;
55+
type SegmentReader = ReadOnlySegment;
5656

5757
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
5858
let mut inner = self.segments.write().unwrap();
@@ -88,7 +88,7 @@ impl Repo for Memory {
8888
}
8989

9090
fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
91-
self.open_segment_writer(offset).map(io::BufReader::new)
91+
self.open_segment_writer(offset).map(Into::into)
9292
}
9393

9494
fn remove_segment(&self, offset: u64) -> io::Result<()> {

crates/commitlog/src/repo/mem/segment.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
use crate::{
88
repo::{
99
mem::{SpaceOnDevice, PAGE_SIZE},
10-
SegmentLen,
10+
SegmentLen, SegmentReader,
1111
},
1212
segment::FileLike,
1313
};
@@ -318,3 +318,50 @@ mod async_impls {
318318
}
319319
}
320320
}
321+
322+
pub struct ReadOnlySegment {
323+
inner: io::BufReader<Segment>,
324+
}
325+
326+
impl From<Segment> for ReadOnlySegment {
327+
fn from(inner: Segment) -> Self {
328+
Self {
329+
inner: io::BufReader::new(inner),
330+
}
331+
}
332+
}
333+
334+
impl SegmentReader for ReadOnlySegment {
335+
/// Memory segments dont' support compression, so are never sealed.
336+
fn sealed(&self) -> bool {
337+
false
338+
}
339+
}
340+
341+
impl io::Read for ReadOnlySegment {
342+
#[inline]
343+
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
344+
self.inner.read(buf)
345+
}
346+
}
347+
348+
impl io::BufRead for ReadOnlySegment {
349+
#[inline]
350+
fn fill_buf(&mut self) -> io::Result<&[u8]> {
351+
self.inner.fill_buf()
352+
}
353+
354+
#[inline]
355+
fn consume(&mut self, amount: usize) {
356+
self.inner.consume(amount);
357+
}
358+
}
359+
360+
impl io::Seek for ReadOnlySegment {
361+
#[inline]
362+
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
363+
self.inner.seek(pos)
364+
}
365+
}
366+
367+
impl SegmentLen for ReadOnlySegment {}

crates/commitlog/src/repo/mod.rs

Lines changed: 52 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::{fmt, io};
1+
use std::{
2+
fmt,
3+
io::{self, Seek},
4+
};
25

36
use log::{debug, warn};
47

@@ -52,8 +55,14 @@ pub trait SegmentLen: io::Seek {
5255
}
5356
}
5457

55-
pub trait SegmentReader: io::BufRead + SegmentLen + Send + Sync {}
56-
impl<T: io::BufRead + SegmentLen + Send + Sync> SegmentReader for T {}
58+
pub trait SegmentReader: io::BufRead + SegmentLen + Send + Sync {
59+
/// Whether the segment is considered immutable.
60+
///
61+
/// Currently, this is true when the segment is compressed.
62+
/// [resume_segment_writer] uses this method to indicate that a new segment
63+
/// should be created when opening a commitlog.
64+
fn sealed(&self) -> bool;
65+
}
5766

5867
pub trait SegmentWriter: FileLike + io::Read + io::Write + SegmentLen + Send + Sync {}
5968
impl<T: FileLike + io::Read + io::Write + SegmentLen + Send + Sync> SegmentWriter for T {}
@@ -243,21 +252,9 @@ pub fn resume_segment_writer<R: Repo>(
243252
opts: Options,
244253
offset: u64,
245254
) -> io::Result<Result<Writer<R::SegmentWriter>, Metadata>> {
246-
let mut storage = repo.open_segment_writer(offset)?;
247-
// Ensure we have enough space for this segment.
248-
// The segment could have been created without the `fallocate` feature
249-
// enabled, so we call this here again to ensure writes can't fail due to
250-
// ENOSPC.
251-
fallocate(&mut storage, &opts)?;
255+
let mut reader = repo.open_segment_reader(offset)?;
252256
let offset_index = repo.get_offset_index(offset).ok();
253-
let Metadata {
254-
header,
255-
tx_range,
256-
size_in_bytes,
257-
max_epoch,
258-
max_commit_offset: _,
259-
max_commit: _,
260-
} = match Metadata::extract(offset, &mut storage, offset_index.as_ref()) {
257+
let meta = match Metadata::extract(offset, &mut reader, offset_index.as_ref()) {
261258
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
262259
warn!("invalid commit in segment {offset}: {source}");
263260
debug!("sofar={sofar:?}");
@@ -266,34 +263,55 @@ pub fn resume_segment_writer<R: Repo>(
266263
Err(error::SegmentMetadata::Io(e)) => return Err(e),
267264
Ok(meta) => meta,
268265
};
269-
header
266+
meta.header
270267
.ensure_compatible(opts.log_format_version, Commit::CHECKSUM_ALGORITHM)
271268
.map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
272269
// When resuming, the log format version must be equal.
273-
if header.log_format_version != opts.log_format_version {
270+
if meta.header.log_format_version != opts.log_format_version {
274271
return Err(io::Error::new(
275272
io::ErrorKind::InvalidData,
276273
format!(
277274
"log format version mismatch: current={} segment={}",
278-
opts.log_format_version, header.log_format_version
275+
opts.log_format_version, meta.header.log_format_version
279276
),
280277
));
281278
}
282279

283-
Ok(Ok(Writer {
284-
commit: Commit {
285-
min_tx_offset: tx_range.end,
286-
n: 0,
287-
records: Vec::new(),
288-
epoch: max_epoch,
289-
},
290-
inner: io::BufWriter::new(storage),
291-
292-
min_tx_offset: tx_range.start,
293-
bytes_written: size_in_bytes,
294-
295-
offset_index_head: create_offset_index_writer(repo, offset, opts),
296-
}))
280+
if reader.sealed() {
281+
Ok(Err(meta))
282+
} else {
283+
let Metadata {
284+
header: _,
285+
tx_range,
286+
size_in_bytes,
287+
max_epoch,
288+
max_commit_offset: _,
289+
max_commit: _,
290+
} = meta;
291+
let mut writer = repo.open_segment_writer(offset)?;
292+
// Ensure we have enough space for this segment.
293+
// The segment could have been created without the `fallocate` feature
294+
// enabled, so we call this here again to ensure writes can't fail due
295+
// to ENOSPC.
296+
fallocate(&mut writer, &opts)?;
297+
// We use `O_APPEND`, but make the file offset consistent regardless.
298+
writer.seek(io::SeekFrom::End(0))?;
299+
300+
Ok(Ok(Writer {
301+
commit: Commit {
302+
min_tx_offset: tx_range.end,
303+
n: 0,
304+
records: Vec::new(),
305+
epoch: max_epoch,
306+
},
307+
inner: io::BufWriter::new(writer),
308+
309+
min_tx_offset: tx_range.start,
310+
bytes_written: size_in_bytes,
311+
312+
offset_index_head: create_offset_index_writer(repo, offset, opts),
313+
}))
314+
}
297315
}
298316

299317
/// Open the existing segment at `offset` for reading.

crates/commitlog/src/tests/partial.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ impl fmt::Display for ShortMem {
271271

272272
impl Repo for ShortMem {
273273
type SegmentWriter = ShortSegment;
274-
type SegmentReader = io::BufReader<repo::mem::Segment>;
274+
type SegmentReader = repo::mem::ReadOnlySegment;
275275

276276
fn create_segment(&self, offset: u64) -> io::Result<Self::SegmentWriter> {
277277
self.inner.create_segment(offset).map(|inner| ShortSegment {

crates/commitlog/tests/random_payload/mod.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use log::info;
2+
use spacetimedb_commitlog::repo::Repo;
23
use spacetimedb_commitlog::tests::helpers::enable_logging;
3-
use spacetimedb_commitlog::{payload, Commitlog, Options};
4+
use spacetimedb_commitlog::{commitlog, payload, repo, Commitlog, Options};
45
use spacetimedb_paths::server::CommitLogDir;
56
use spacetimedb_paths::FromPathUnchecked;
67
use tempfile::tempdir;
@@ -75,6 +76,12 @@ fn resets() {
7576
}
7677
}
7778

79+
/// Try to generate commitlogs that will be amenable to compression -
80+
/// random data doesn't compress well, so try and have there be repetition
81+
fn compressible_payloads() -> impl Iterator<Item = [u8; 256]> {
82+
(0..4).map(|_| gen_payload()).cycle()
83+
}
84+
7885
#[test]
7986
fn compression() {
8087
enable_logging();
@@ -90,9 +97,7 @@ fn compression() {
9097
)
9198
.unwrap();
9299

93-
// try to generate commitlogs that will be amenable to compression -
94-
// random data doesn't compress well, so try and have there be repetition
95-
let payloads = (0..4).map(|_| gen_payload()).cycle().take(1024).collect::<Vec<_>>();
100+
let payloads = compressible_payloads().take(1024).collect::<Vec<_>>();
96101
for (i, payload) in payloads.iter().enumerate() {
97102
clog.commit([(i as u64, *payload)]).unwrap();
98103
}
@@ -114,3 +119,45 @@ fn compression() {
114119
.enumerate()
115120
.all(|(i, x)| x.offset == i as u64 && x.txdata == payloads[i]));
116121
}
122+
123+
/// When restoring an archived commitlog, all segments are compressed and should
124+
/// remain immutable.
125+
///
126+
/// Tests that this is upheld, i.e. a fresh segment is created when resuming
127+
/// writes.
128+
#[test]
129+
fn all_segments_sealed() {
130+
enable_logging();
131+
132+
let root = tempdir().unwrap();
133+
let path = CommitLogDir::from_path_unchecked(root.path());
134+
let opts = Options {
135+
max_segment_size: 64 * 1024,
136+
..<_>::default()
137+
};
138+
let num_commits = 1024;
139+
let repo = repo::Fs::new(path, None).unwrap();
140+
{
141+
let mut clog = commitlog::Generic::open(&repo, opts).unwrap();
142+
for (i, payload) in compressible_payloads().take(num_commits).enumerate() {
143+
clog.commit([(i as u64, payload)]).unwrap();
144+
}
145+
clog.flush().unwrap();
146+
clog.sync();
147+
}
148+
149+
let segments = repo.existing_offsets().unwrap();
150+
let num_segments = segments.len();
151+
152+
// Compress all segments via the `repo`,
153+
// to not trigger the assert that the head segment cannot be compressed.
154+
for segment in segments {
155+
repo.compress_segment(segment).unwrap();
156+
}
157+
158+
// Re-opening the commitlog should create a fresh segment at offset `num_commits`.
159+
let _ = commitlog::Generic::<_, [u8; 256]>::open(&repo, opts).unwrap();
160+
let segments = repo.existing_offsets().unwrap();
161+
assert_eq!(num_segments + 1, segments.len());
162+
assert_eq!(segments.last().copied(), Some(num_commits as u64));
163+
}

0 commit comments

Comments
 (0)