Skip to content

Commit 2432a33

Browse files
authored
Add more info to segment file errors (#4680)
# Description of Changes Add some more context to errors reading commit logs. # Expected complexity level and risk 1
1 parent 0e796e9 commit 2432a33

3 files changed

Lines changed: 56 additions & 10 deletions

File tree

crates/commitlog/src/repo/fs.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,14 @@ impl Repo for Fs {
240240
File::options().read(true).append(true).open(self.segment_path(offset))
241241
}
242242

243+
fn segment_file_path(&self, offset: u64) -> Option<String> {
244+
Some(self.segment_path(offset).0.to_string_lossy().into_owned())
245+
}
246+
243247
fn open_segment_reader(&self, offset: u64) -> io::Result<Self::SegmentReader> {
244-
debug!("fs: open segment at {}", self.segment_path(offset).display());
245-
let file = File::open(self.segment_path(offset))?;
248+
let path = self.segment_path(offset);
249+
debug!("fs: open segment at {}", path.display());
250+
let file = File::open(&path)?;
246251
CompressReader::new(file).map(|inner| ReadOnlySegment { inner })
247252
}
248253

crates/commitlog/src/repo/mod.rs

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ pub trait Repo: Clone + fmt::Display {
107107
/// will be caught by [`resume_segment_writer`].
108108
fn open_segment_writer(&self, offset: u64) -> io::Result<Self::SegmentWriter>;
109109

110+
/// Return a path-like identifier for debugging logs.
111+
///
112+
/// This is optional and only used to enrich error messages when segment
113+
/// operations fail.
114+
fn segment_file_path(&self, _offset: u64) -> Option<String> {
115+
None
116+
}
117+
110118
/// Remove the segment at the minimum transaction offset `offset`.
111119
///
112120
/// Return [`io::ErrorKind::NotFound`] if no such segment exists.
@@ -252,27 +260,40 @@ pub fn resume_segment_writer<R: Repo>(
252260
opts: Options,
253261
offset: u64,
254262
) -> io::Result<Result<Writer<R::SegmentWriter>, Metadata>> {
255-
let mut reader = repo.open_segment_reader(offset)?;
263+
let mut reader = repo
264+
.open_segment_reader(offset)
265+
.map_err(|source| with_segment_context("opening segment for resume", repo, offset, source))?;
256266
let offset_index = repo.get_offset_index(offset).ok();
257267
let meta = match Metadata::extract(offset, &mut reader, offset_index.as_ref()) {
258268
Err(error::SegmentMetadata::InvalidCommit { sofar, source }) => {
259269
warn!("invalid commit in segment {offset}: {source}");
260270
debug!("sofar={sofar:?}");
261271
return Ok(Err(sofar));
262272
}
263-
Err(error::SegmentMetadata::Io(e)) => return Err(e),
273+
Err(error::SegmentMetadata::Io(e)) => {
274+
return Err(with_segment_context("extracting segment metadata", repo, offset, e));
275+
}
264276
Ok(meta) => meta,
265277
};
266278
meta.header
267279
.ensure_compatible(opts.log_format_version, Commit::CHECKSUM_ALGORITHM)
268-
.map_err(|msg| io::Error::new(io::ErrorKind::InvalidData, msg))?;
280+
.map_err(|msg| {
281+
with_segment_context(
282+
"checking segment compatibility",
283+
repo,
284+
offset,
285+
io::Error::new(io::ErrorKind::InvalidData, msg),
286+
)
287+
})?;
269288
// When resuming, the log format version must be equal.
270289
if meta.header.log_format_version != opts.log_format_version {
271290
return Err(io::Error::new(
272291
io::ErrorKind::InvalidData,
273292
format!(
274-
"log format version mismatch: current={} segment={}",
275-
opts.log_format_version, meta.header.log_format_version
293+
"{}: log format version mismatch: current={} segment={}",
294+
segment_label(repo, offset),
295+
opts.log_format_version,
296+
meta.header.log_format_version
276297
),
277298
));
278299
}
@@ -324,9 +345,25 @@ pub fn open_segment_reader<R: Repo>(
324345
max_log_format_version: u8,
325346
offset: u64,
326347
) -> io::Result<Reader<R::SegmentReader>> {
327-
debug!("open segment reader at {offset}");
328-
let storage = repo.open_segment_reader(offset)?;
348+
let segment = segment_label(repo, offset);
349+
debug!("open segment reader for {segment}");
350+
let storage = repo
351+
.open_segment_reader(offset)
352+
.map_err(|source| with_segment_context("opening segment for read", repo, offset, source))?;
329353
Reader::new(max_log_format_version, offset, storage)
354+
.map_err(|source| with_segment_context("reading segment header", repo, offset, source))
355+
}
356+
357+
fn segment_label<R: Repo>(repo: &R, offset: u64) -> String {
358+
repo.segment_file_path(offset)
359+
.unwrap_or_else(|| format!("offset {offset}"))
360+
}
361+
362+
fn with_segment_context<R: Repo>(context: &'static str, repo: &R, offset: u64, source: io::Error) -> io::Error {
363+
io::Error::new(
364+
source.kind(),
365+
format!("{} [{}]: {}", segment_label(repo, offset), context, source),
366+
)
330367
}
331368

332369
/// Allocate [Options::max_segment_size] of space for [FileLike]

crates/commitlog/src/segment.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ impl Header {
5151
if !buf.starts_with(&MAGIC) {
5252
return Err(io::Error::new(
5353
io::ErrorKind::InvalidData,
54-
"segment header does not start with magic",
54+
format!(
55+
"segment header does not start with magic: expected {:02x?}, got {:02x?}",
56+
MAGIC,
57+
&buf[..MAGIC.len()]
58+
),
5559
));
5660
}
5761

0 commit comments

Comments
 (0)