diff --git a/src/borg/archive.py b/src/borg/archive.py index 3f29cc2755..c2e84d2fca 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -29,7 +29,6 @@ from .crypto.low_level import IntegrityError as IntegrityErrorBase from .helpers import BackupError, BackupRaceConditionError, BackupItemExcluded from .helpers import BackupOSError, BackupPermissionError, BackupFileNotFoundError, BackupIOError -from .hashindex import ChunkIndex, ChunkIndexEntry from .helpers import HardLinkManager from .helpers import ChunkIteratorFileWrapper, open_item from .helpers import Error, IntegrityError, set_ec @@ -1968,13 +1967,7 @@ def add_reference(id_, size, cdata): # either we already have this chunk in repo and chunks index or we add it now if id_ not in self.chunks: assert cdata is not None - self.chunks[id_] = ChunkIndexEntry( - flags=ChunkIndex.F_USED, - size=size, - pack_id=UNKNOWN_BYTES32, - obj_offset=UNKNOWN_INT32, - obj_size=UNKNOWN_INT32, - ) + self.chunks.add(id_, size) if self.repair: pack_results = self.repository.put(id_, cdata) self.chunks.update_pack_info(pack_results) diff --git a/src/borg/constants.py b/src/borg/constants.py index 5c88b6b89e..24533af544 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -55,7 +55,8 @@ # Grep for UNKNOWN_INT32 to find every site that still needs updating. UNKNOWN_INT32 = 0xFFFFFFFF -# Placeholder for pack_id (32-byte field) when the value is not yet known. +# Filler for the pack_id (32-byte field) while the real value is unknown. Never interpreted; +# an unresolved pack location is tracked by the ChunkIndex.F_PENDING flag. UNKNOWN_BYTES32 = b"\xff" * 32 # MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header) diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index bfa4d164ad..2241b47f8b 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -52,6 +52,7 @@ class ChunkIndex(HTProxyMixin, MutableMapping): # user flags: F_USED = 2 ** 0 # chunk is used/referenced F_COMPRESS = 2 ** 1 # chunk shall get (re-)compressed + F_PENDING = 2 ** 2 # pack location (pack_id, obj_offset, obj_size) not resolved yet. # system flags (internal use, always 0 to user, not changeable by user): F_NEW = 2 ** 24 # a new chunk that is not present in repo index/* yet. @@ -81,8 +82,10 @@ class ChunkIndex(HTProxyMixin, MutableMapping): else: flags = v.flags | self.F_USED assert v.size == 0 or v.size == size + # F_PENDING marks the pack location (pack_id, obj_offset, obj_size) as not yet set. self[key] = ChunkIndexEntry( - flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=flags | self.F_PENDING, size=size, + pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) def __getitem__(self, key): @@ -109,12 +112,19 @@ class ChunkIndex(HTProxyMixin, MutableMapping): self.ht[key] = value._replace(flags=system_flags | user_flags) def update_pack_info(self, pack_results): - """Update the on-disk location fields for a list of (chunk_id, pack_id, obj_offset, obj_size) tuples.""" + """Set pack_id, obj_offset and obj_size from a list of (chunk_id, pack_id, obj_offset, obj_size) + tuples and clear F_PENDING.""" if not pack_results: return for chunk_id, pack_id, obj_offset, obj_size in pack_results: existing = self[chunk_id] - self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) + self[chunk_id] = existing._replace( + flags=existing.flags & ~self.F_PENDING, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size + ) + + def is_pending(self, key): + """Return whether the chunk's pack location (pack_id, obj_offset, obj_size) is unresolved.""" + return bool(self[key].flags & self.F_PENDING) def clear_new(self): """Clears the F_NEW flag of all items.""" diff --git a/src/borg/repository.py b/src/borg/repository.py index e19714c8c1..f46f240f67 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -106,9 +106,9 @@ def build_rest_backend(location): class PackWriter: """Buffers chunks into a pack file and writes it to the store when full. - add() buffers a (chunk_id, cdata) pair and marks the chunk pending in the - ChunkIndex (pack_id=UNKNOWN_BYTES32); flush() writes the pack and sets each - entry's real pack_id and obj_offset. + add() buffers a (chunk_id, cdata) pair and marks the chunk pending (F_PENDING); + flush() writes the pack and sets each entry's pack_id, obj_offset and obj_size, + clearing F_PENDING. The ChunkIndex comes from the repository, or from an explicit chunks index when there is no repository (see the chunks property). @@ -142,8 +142,6 @@ def chunks(self): def add(self, chunk_id, cdata): """Buffer a chunk. Returns flush results if the pack is now full, else None.""" self.chunks.add(chunk_id, 0) # size: plaintext chunk size, set by the cache layer - # obj_size is final; pack_id and obj_offset get their real values in flush(). - self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, 0, len(cdata))]) self._pieces.append((chunk_id, cdata)) self._size += len(cdata) if (self.max_count is not None and len(self._pieces) >= self.max_count) or ( @@ -179,22 +177,19 @@ def flush(self): offset += obj_size key = "packs/" + bin_to_hex(pack_id) - # ids this flush pre-marked in the index via add() (pack_id still UNKNOWN_BYTES32). pending_ids = [chunk_id for chunk_id, _ in self._pieces] try: self.store.store(key, pack_data) except Exception: - # The pack is not stored, so drop the pending entries: keeping them would let - # seen_chunk() dedup later chunks against bytes that were never written. + # the pack was not stored: drop the index entries for its chunks. for chunk_id in pending_ids: - entry = self.chunks.get(chunk_id) - if entry is not None and entry.pack_id == UNKNOWN_BYTES32: + if chunk_id in self.chunks: # a chunk_id may appear more than once in this pack del self.chunks[chunk_id] raise finally: self._pieces = [] # cleared on success and on failure self._size = 0 - self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id + self.chunks.update_pack_info(results) # set the real location and clear F_PENDING return results @@ -711,7 +706,7 @@ def list(self, limit=None, marker=None): collect = marker is None result = [] for chunk_id, entry in self.chunks.iteritems(): - if entry.pack_id == UNKNOWN_BYTES32: + if self.chunks.is_pending(chunk_id): continue # buffered in PackWriter, not flushed to a pack yet if collect: result.append((chunk_id, entry.obj_size)) @@ -728,11 +723,9 @@ def get(self, id, read_data=True, raise_missing=True): if raise_missing: raise self.ObjectNotFound(id, str(self._location)) return None - if entry.pack_id == UNKNOWN_BYTES32: - # chunk is buffered in PackWriter, not yet flushed to a pack. Everything must be flushed - # before it can be read back, so reaching here points at a flush / index-update ordering - # bug, not a genuinely missing object. this is a code bug, so we crash loudly regardless - # of raise_missing instead of pretending the object is absent. + if self.chunks.is_pending(id): + # buffered but not flushed; a chunk must be flushed before any read, so this is a code + # bug (wrong flush/index ordering), not a missing object: raise regardless of raise_missing. raise self.PackLocationUnknown(id, str(self._location)) pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) diff --git a/src/borg/testsuite/hashindex_test.py b/src/borg/testsuite/hashindex_test.py index af4ab64d61..67e3c29d87 100644 --- a/src/borg/testsuite/hashindex_test.py +++ b/src/borg/testsuite/hashindex_test.py @@ -21,16 +21,18 @@ def test_chunkindex_add(): chunks = ChunkIndex() x = H2(1) chunks.add(x, 0) + assert chunks.is_pending(x) + pending = ChunkIndex.F_USED | ChunkIndex.F_PENDING # add() sets F_PENDING alongside F_USED assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=pending, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) chunks.add(x, 2) # updating size (we do not have a size yet) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=pending, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) chunks.add(x, 2) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=pending, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) with pytest.raises(AssertionError): chunks.add(x, 3) # inconsistent size (we already have a different size) @@ -43,10 +45,15 @@ def test_chunkindex_update_pack_info(): chunks.add(x2, 20) assert chunks[x1].obj_offset == UNKNOWN_INT32 assert chunks[x2].obj_offset == UNKNOWN_INT32 + assert chunks.is_pending(x1) + assert chunks.is_pending(x2) pack_id = H2(3) # Both chunks land in the same pack: batch update in one call. chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)]) + # resolving the location clears the pending flag + assert not chunks.is_pending(x1) + assert not chunks.is_pending(x2) # Location fields updated; flags and size must be unchanged. assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50) assert chunks[x2] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=20, pack_id=pack_id, obj_offset=50, obj_size=60) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 01ee6f50e2..46bb438b76 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -5,7 +5,6 @@ import pytest from ..helpers import IntegrityError, Location, bin_to_hex from ..hashindex import ChunkIndex -from ..constants import UNKNOWN_BYTES32 from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter, PackReader from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -187,11 +186,11 @@ def test_multi_object_pack_roundtrip(repo_fixtures, request): with get_repository_from_fixture(repo_fixtures, request) as repository: repository._pack_writer.max_count = 2 # this test is written for exactly two objects per pack repository.put(H(0), chunk0) - assert repository.chunks[H(0)].pack_id == UNKNOWN_BYTES32 # buffered: the pack is not full yet + assert repository.chunks.is_pending(H(0)) # buffered: the pack is not full yet repository.put(H(1), chunk1) # fills the pack, flushing both objects at once # both objects share one pack, written exactly once, laid out in put() order pack_id = repository.chunks[H(0)].pack_id - assert pack_id != UNKNOWN_BYTES32 + assert not repository.chunks.is_pending(H(0)) assert repository.chunks[H(1)].pack_id == pack_id assert [info.name for info in repository.store_list("packs")] == [bin_to_hex(pack_id)] assert repository.chunks[H(0)].obj_offset == 0 @@ -509,20 +508,57 @@ def test_get_uses_chunk_index_location(tmp_path): def test_put_marks_id_in_chunk_index(tmp_path): - # put() marks the id pending (pack_id=UNKNOWN_BYTES32); flush() then fills in the - # real pack location for the current session. + # put() marks the id pending; flush() sets the real pack location and clears the pending flag. with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: id1 = H(1) repository.put(id1, fchunk(b"ZEROS")) entry = repository._chunks.get(id1) assert entry is not None - assert entry.pack_id == UNKNOWN_BYTES32 # buffered, not yet flushed + assert repository._chunks.is_pending(id1) # buffered, not yet flushed repository.flush() entry = repository._chunks.get(id1) + assert not repository._chunks.is_pending(id1) assert entry.pack_id == sha256(fchunk(b"ZEROS")).digest() assert entry.size == 0 # uncompressed size filled in by cache layer +def test_list_skips_pending_chunk(tmp_path): + # list() skips a pending chunk and yields it once flushed. + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet + assert repository._chunks.is_pending(H(1)) + assert repository.list() == [] + repository.flush() + assert [chunk_id for chunk_id, _ in repository.list()] == [H(1)] + + +def test_get_pending_chunk_raises(tmp_path): + # get() on a pending chunk raises PackLocationUnknown, also with raise_missing=False. + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet + assert repository._chunks.is_pending(H(1)) + with pytest.raises(Repository.PackLocationUnknown): + repository.get(H(1)) + with pytest.raises(Repository.PackLocationUnknown): + repository.get(H(1), raise_missing=False) + repository.flush() # close() requires the buffer to be empty + + +def test_flush_store_failure_drops_pending_entries(tmp_path): + # flush() removes the pending index entries when storing the pack fails. + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.put(H(1), fchunk(b"BUFFERED")) + assert repository._chunks.is_pending(H(1)) + + def boom(*args, **kwargs): + raise OSError("store failed") + + repository._pack_writer.store.store = boom + with pytest.raises(OSError): + repository.flush() + assert H(1) not in repository._chunks + + def test_check_detects_corruption_in_later_object(tmp_path): # Corruption anywhere in a multi-object pack must be caught, not just in the first object: the pack # is named by sha256(content), so flipping any byte makes its stored hash differ from its name.