Skip to content

Commit ad70ef5

Browse files
authored
Add more context around some errors (#4702)
# Description of Changes Updates some errors to include the thing we were trying to read. # Expected complexity level and risk 1
1 parent e33cefb commit ad70ef5

File tree

1 file changed

+53
-22
lines changed

1 file changed

+53
-22
lines changed

crates/snapshot/src/remote.rs

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,14 @@ pub struct SnapshotFetcher<P> {
244244
}
245245

246246
impl<P: BlobProvider> SnapshotFetcher<P> {
247+
fn read_object_error(&self, ty: ObjectType, cause: io::Error) -> SnapshotError {
248+
SnapshotError::ReadObject {
249+
ty,
250+
source_repo: self.dir.0.clone(),
251+
cause,
252+
}
253+
}
254+
247255
pub fn create(
248256
provider: P,
249257
snapshots_dir: SnapshotsPath,
@@ -371,16 +379,22 @@ impl<P: BlobProvider> SnapshotFetcher<P> {
371379
}
372380

373381
async fn fetch_blob(&self, hash: blake3::Hash) -> Result<()> {
374-
let Some(dst_path) = self
375-
.object_file_path(ObjectType::Blob(BlobHash { data: *hash.as_bytes() }))
376-
.await?
377-
else {
382+
let ty = ObjectType::Blob(BlobHash { data: *hash.as_bytes() });
383+
let Some(dst_path) = self.object_file_path(ty).await? else {
378384
return Ok(());
379385
};
380386
atomically((!self.dry_run).then_some(dst_path), |out| async move {
381387
let mut out = BufWriter::new(out);
382-
let mut src = self.provider.blob_reader(hash).await?;
383-
let compressed = src.fill_buf().await?.starts_with(&ZSTD_MAGIC_BYTES);
388+
let mut src = self
389+
.provider
390+
.blob_reader(hash)
391+
.await
392+
.map_err(|e| self.read_object_error(ty, e))?;
393+
let compressed = src
394+
.fill_buf()
395+
.await
396+
.map_err(|e| self.read_object_error(ty, e))?
397+
.starts_with(&ZSTD_MAGIC_BYTES);
384398

385399
// Consume the blob reader,
386400
// write its contents to `out`,
@@ -391,15 +405,17 @@ impl<P: BlobProvider> SnapshotFetcher<P> {
391405
let mut writer = InspectWriter::new(&mut out, |chunk| {
392406
hasher.update(chunk);
393407
});
394-
tokio::io::copy_buf(&mut src, &mut writer).await?;
395-
writer.flush().await?;
408+
tokio::io::copy_buf(&mut src, &mut writer)
409+
.await
410+
.map_err(|e| self.read_object_error(ty, e))?;
411+
writer.flush().await.map_err(|e| self.read_object_error(ty, e))?;
396412

397413
hasher.hash()
398414
} else {
399415
// If the input is compressed, send a copy of all received
400416
// chunks to a separate task that decompresses the stream and
401417
// computes the hash from the decompressed bytes.
402-
let (mut zstd, tx) = zstd_reader()?;
418+
let (mut zstd, tx) = zstd_reader().map_err(|e| self.read_object_error(ty, e))?;
403419
let decompressor = tokio::spawn(async move {
404420
tokio::io::copy_buf(&mut zstd, &mut hasher).await?;
405421
Ok::<_, io::Error>(hasher.hash())
@@ -410,15 +426,17 @@ impl<P: BlobProvider> SnapshotFetcher<P> {
410426
buf.extend_from_slice(chunk);
411427
tx.send(buf.split().freeze()).ok();
412428
});
413-
tokio::io::copy(&mut src, &mut out).await?;
414-
out.flush().await?;
429+
tokio::io::copy(&mut src, &mut out)
430+
.await
431+
.map_err(|e| self.read_object_error(ty, e))?;
432+
out.flush().await.map_err(|e| self.read_object_error(ty, e))?;
415433

416434
drop(tx);
417-
decompressor.await.unwrap()?
435+
decompressor.await.unwrap().map_err(|e| self.read_object_error(ty, e))?
418436
};
419437
if computed_hash != hash {
420438
return Err(SnapshotError::HashMismatch {
421-
ty: ObjectType::Blob(BlobHash { data: *hash.as_bytes() }),
439+
ty,
422440
expected: *hash.as_bytes(),
423441
computed: *computed_hash.as_bytes(),
424442
source_repo: self.dir.0.clone(),
@@ -434,13 +452,22 @@ impl<P: BlobProvider> SnapshotFetcher<P> {
434452
}
435453

436454
async fn fetch_page(&self, hash: blake3::Hash) -> Result<()> {
437-
let Some(dst_path) = self.object_file_path(ObjectType::Page(hash)).await? else {
455+
let ty = ObjectType::Page(hash);
456+
let Some(dst_path) = self.object_file_path(ty).await? else {
438457
return Ok(());
439458
};
440459
atomically((!self.dry_run).then_some(dst_path), |out| async {
441460
let mut out = BufWriter::new(out);
442-
let mut src = self.provider.blob_reader(hash).await?;
443-
let compressed = src.fill_buf().await?.starts_with(&ZSTD_MAGIC_BYTES);
461+
let mut src = self
462+
.provider
463+
.blob_reader(hash)
464+
.await
465+
.map_err(|e| self.read_object_error(ty, e))?;
466+
let compressed = src
467+
.fill_buf()
468+
.await
469+
.map_err(|e| self.read_object_error(ty, e))?
470+
.starts_with(&ZSTD_MAGIC_BYTES);
444471

445472
// To compute the page hash, we need to bsatn deserialize it.
446473
// As bsatn doesn't support streaming deserialization yet,
@@ -452,15 +479,17 @@ impl<P: BlobProvider> SnapshotFetcher<P> {
452479
let mut writer = InspectWriter::new(&mut out, |chunk| {
453480
page_buf.extend_from_slice(chunk);
454481
});
455-
tokio::io::copy_buf(&mut src, &mut writer).await?;
456-
writer.flush().await?;
482+
tokio::io::copy_buf(&mut src, &mut writer)
483+
.await
484+
.map_err(|e| self.read_object_error(ty, e))?;
485+
writer.flush().await.map_err(|e| self.read_object_error(ty, e))?;
457486

458487
page_buf.split().freeze()
459488
} else {
460489
// If the input is compressed, send all received chunks to a
461490
// separate task that decompresses the stream and returns
462491
// the uncompressed bytes.
463-
let (mut zstd, tx) = zstd_reader()?;
492+
let (mut zstd, tx) = zstd_reader().map_err(|e| self.read_object_error(ty, e))?;
464493
let buf_pool = self.buf_pool.clone();
465494
let decompressor = tokio::spawn(async move {
466495
let mut page_buf = buf_pool.get();
@@ -473,11 +502,13 @@ impl<P: BlobProvider> SnapshotFetcher<P> {
473502
buf.extend_from_slice(chunk);
474503
tx.send(buf.split().freeze()).ok();
475504
});
476-
tokio::io::copy_buf(&mut src, &mut writer).await?;
477-
writer.flush().await?;
505+
tokio::io::copy_buf(&mut src, &mut writer)
506+
.await
507+
.map_err(|e| self.read_object_error(ty, e))?;
508+
writer.flush().await.map_err(|e| self.read_object_error(ty, e))?;
478509

479510
drop(tx);
480-
decompressor.await.unwrap()?
511+
decompressor.await.unwrap().map_err(|e| self.read_object_error(ty, e))?
481512
};
482513

483514
self.verify_page(hash, &page_bytes)?;

0 commit comments

Comments
 (0)