From 061530d199379723cc1e68bd7152ef95463396ba Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Mon, 29 Jun 2026 06:57:30 +0530 Subject: [PATCH 1/3] hashindex: track unresolved pack location with F_PENDING flag Replace the UNKNOWN_BYTES32 pack_id sentinel (in-band signalling) with a ChunkIndex system flag, queried via is_pending(). Follow-up to #9821. --- src/borg/archive.py | 9 +-------- src/borg/constants.py | 3 ++- src/borg/hashindex.pyx | 14 +++++++++++++- src/borg/repository.py | 28 +++++++++++---------------- src/borg/testsuite/hashindex_test.py | 6 ++++++ src/borg/testsuite/repository_test.py | 11 +++++------ 6 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 3f29cc2755..c2e84d2fca 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -29,7 +29,6 @@ from .crypto.low_level import IntegrityError as IntegrityErrorBase from .helpers import BackupError, BackupRaceConditionError, BackupItemExcluded from .helpers import BackupOSError, BackupPermissionError, BackupFileNotFoundError, BackupIOError -from .hashindex import ChunkIndex, ChunkIndexEntry from .helpers import HardLinkManager from .helpers import ChunkIteratorFileWrapper, open_item from .helpers import Error, IntegrityError, set_ec @@ -1968,13 +1967,7 @@ def add_reference(id_, size, cdata): # either we already have this chunk in repo and chunks index or we add it now if id_ not in self.chunks: assert cdata is not None - self.chunks[id_] = ChunkIndexEntry( - flags=ChunkIndex.F_USED, - size=size, - pack_id=UNKNOWN_BYTES32, - obj_offset=UNKNOWN_INT32, - obj_size=UNKNOWN_INT32, - ) + self.chunks.add(id_, size) if self.repair: pack_results = self.repository.put(id_, cdata) self.chunks.update_pack_info(pack_results) diff --git a/src/borg/constants.py b/src/borg/constants.py index 5c88b6b89e..24533af544 100644 --- a/src/borg/constants.py +++ b/src/borg/constants.py @@ -55,7 +55,8 @@ # Grep for UNKNOWN_INT32 to find every site that still needs updating. UNKNOWN_INT32 = 0xFFFFFFFF -# Placeholder for pack_id (32-byte field) when the value is not yet known. +# Filler for the pack_id (32-byte field) while the real value is unknown. Never interpreted; +# an unresolved pack location is tracked by the ChunkIndex.F_PENDING flag. UNKNOWN_BYTES32 = b"\xff" * 32 # MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header) diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index bfa4d164ad..baccdd6ddf 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -54,6 +54,7 @@ class ChunkIndex(HTProxyMixin, MutableMapping): F_COMPRESS = 2 ** 1 # chunk shall get (re-)compressed # system flags (internal use, always 0 to user, not changeable by user): F_NEW = 2 ** 24 # a new chunk that is not present in repo index/* yet. + F_PENDING = 2 ** 25 # pack location (pack_id, obj_offset, obj_size) not resolved yet. def __init__(self, capacity=1000, path=None, usable=None): if path: @@ -84,6 +85,9 @@ class ChunkIndex(HTProxyMixin, MutableMapping): self[key] = ChunkIndexEntry( flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) + # __setitem__ ignores system flags in the assigned value, so set F_PENDING on the raw entry. + raw = self.ht[key] + self.ht[key] = raw._replace(flags=raw.flags | self.F_PENDING) def __getitem__(self, key): """Specialized __getitem__ that hides system flags.""" @@ -109,12 +113,20 @@ class ChunkIndex(HTProxyMixin, MutableMapping): self.ht[key] = value._replace(flags=system_flags | user_flags) def update_pack_info(self, pack_results): - """Update the on-disk location fields for a list of (chunk_id, pack_id, obj_offset, obj_size) tuples.""" + """Set pack_id, obj_offset and obj_size from a list of (chunk_id, pack_id, obj_offset, obj_size) + tuples and clear F_PENDING.""" if not pack_results: return for chunk_id, pack_id, obj_offset, obj_size in pack_results: existing = self[chunk_id] self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) + # __setitem__ preserves existing system flags, so clear F_PENDING on the raw entry. + raw = self.ht[chunk_id] + self.ht[chunk_id] = raw._replace(flags=raw.flags & ~self.F_PENDING) + + def is_pending(self, key): + """Return whether the chunk's pack location (pack_id, obj_offset, obj_size) is unresolved.""" + return bool(self.ht[key].flags & self.F_PENDING) def clear_new(self): """Clears the F_NEW flag of all items.""" diff --git a/src/borg/repository.py b/src/borg/repository.py index e19714c8c1..5398a59d20 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -106,9 +106,9 @@ def build_rest_backend(location): class PackWriter: """Buffers chunks into a pack file and writes it to the store when full. - add() buffers a (chunk_id, cdata) pair and marks the chunk pending in the - ChunkIndex (pack_id=UNKNOWN_BYTES32); flush() writes the pack and sets each - entry's real pack_id and obj_offset. + add() buffers a (chunk_id, cdata) pair and marks the chunk pending (F_PENDING); + flush() writes the pack and sets each entry's pack_id, obj_offset and obj_size, + clearing F_PENDING. The ChunkIndex comes from the repository, or from an explicit chunks index when there is no repository (see the chunks property). @@ -142,8 +142,6 @@ def chunks(self): def add(self, chunk_id, cdata): """Buffer a chunk. Returns flush results if the pack is now full, else None.""" self.chunks.add(chunk_id, 0) # size: plaintext chunk size, set by the cache layer - # obj_size is final; pack_id and obj_offset get their real values in flush(). - self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, 0, len(cdata))]) self._pieces.append((chunk_id, cdata)) self._size += len(cdata) if (self.max_count is not None and len(self._pieces) >= self.max_count) or ( @@ -179,22 +177,20 @@ def flush(self): offset += obj_size key = "packs/" + bin_to_hex(pack_id) - # ids this flush pre-marked in the index via add() (pack_id still UNKNOWN_BYTES32). pending_ids = [chunk_id for chunk_id, _ in self._pieces] try: self.store.store(key, pack_data) except Exception: - # The pack is not stored, so drop the pending entries: keeping them would let - # seen_chunk() dedup later chunks against bytes that were never written. + # store failed: remove the pending entries so seen_chunk() does not dedup against + # chunks that were never written. for chunk_id in pending_ids: - entry = self.chunks.get(chunk_id) - if entry is not None and entry.pack_id == UNKNOWN_BYTES32: + if chunk_id in self.chunks and self.chunks.is_pending(chunk_id): del self.chunks[chunk_id] raise finally: self._pieces = [] # cleared on success and on failure self._size = 0 - self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id + self.chunks.update_pack_info(results) # set the real location and clear F_PENDING return results @@ -711,7 +707,7 @@ def list(self, limit=None, marker=None): collect = marker is None result = [] for chunk_id, entry in self.chunks.iteritems(): - if entry.pack_id == UNKNOWN_BYTES32: + if self.chunks.is_pending(chunk_id): continue # buffered in PackWriter, not flushed to a pack yet if collect: result.append((chunk_id, entry.obj_size)) @@ -728,11 +724,9 @@ def get(self, id, read_data=True, raise_missing=True): if raise_missing: raise self.ObjectNotFound(id, str(self._location)) return None - if entry.pack_id == UNKNOWN_BYTES32: - # chunk is buffered in PackWriter, not yet flushed to a pack. Everything must be flushed - # before it can be read back, so reaching here points at a flush / index-update ordering - # bug, not a genuinely missing object. this is a code bug, so we crash loudly regardless - # of raise_missing instead of pretending the object is absent. + if self.chunks.is_pending(id): + # buffered but not flushed; a chunk must be flushed before any read, so this is a code + # bug (wrong flush/index ordering), not a missing object: raise regardless of raise_missing. raise self.PackLocationUnknown(id, str(self._location)) pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size id_hex = bin_to_hex(id) diff --git a/src/borg/testsuite/hashindex_test.py b/src/borg/testsuite/hashindex_test.py index af4ab64d61..f4a5fc2559 100644 --- a/src/borg/testsuite/hashindex_test.py +++ b/src/borg/testsuite/hashindex_test.py @@ -21,6 +21,7 @@ def test_chunkindex_add(): chunks = ChunkIndex() x = H2(1) chunks.add(x, 0) + assert chunks.is_pending(x) # unresolved until update_pack_info() assert chunks[x] == ChunkIndexEntry( flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) @@ -43,10 +44,15 @@ def test_chunkindex_update_pack_info(): chunks.add(x2, 20) assert chunks[x1].obj_offset == UNKNOWN_INT32 assert chunks[x2].obj_offset == UNKNOWN_INT32 + assert chunks.is_pending(x1) + assert chunks.is_pending(x2) pack_id = H2(3) # Both chunks land in the same pack: batch update in one call. chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)]) + # resolving the location clears the pending flag + assert not chunks.is_pending(x1) + assert not chunks.is_pending(x2) # Location fields updated; flags and size must be unchanged. assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50) assert chunks[x2] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=20, pack_id=pack_id, obj_offset=50, obj_size=60) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 01ee6f50e2..70cac40dc0 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -5,7 +5,6 @@ import pytest from ..helpers import IntegrityError, Location, bin_to_hex from ..hashindex import ChunkIndex -from ..constants import UNKNOWN_BYTES32 from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter, PackReader from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION from .hashindex_test import H @@ -187,11 +186,11 @@ def test_multi_object_pack_roundtrip(repo_fixtures, request): with get_repository_from_fixture(repo_fixtures, request) as repository: repository._pack_writer.max_count = 2 # this test is written for exactly two objects per pack repository.put(H(0), chunk0) - assert repository.chunks[H(0)].pack_id == UNKNOWN_BYTES32 # buffered: the pack is not full yet + assert repository.chunks.is_pending(H(0)) # buffered: the pack is not full yet repository.put(H(1), chunk1) # fills the pack, flushing both objects at once # both objects share one pack, written exactly once, laid out in put() order pack_id = repository.chunks[H(0)].pack_id - assert pack_id != UNKNOWN_BYTES32 + assert not repository.chunks.is_pending(H(0)) assert repository.chunks[H(1)].pack_id == pack_id assert [info.name for info in repository.store_list("packs")] == [bin_to_hex(pack_id)] assert repository.chunks[H(0)].obj_offset == 0 @@ -509,16 +508,16 @@ def test_get_uses_chunk_index_location(tmp_path): def test_put_marks_id_in_chunk_index(tmp_path): - # put() marks the id pending (pack_id=UNKNOWN_BYTES32); flush() then fills in the - # real pack location for the current session. + # put() marks the id pending; flush() sets the real pack location and clears the pending flag. with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: id1 = H(1) repository.put(id1, fchunk(b"ZEROS")) entry = repository._chunks.get(id1) assert entry is not None - assert entry.pack_id == UNKNOWN_BYTES32 # buffered, not yet flushed + assert repository._chunks.is_pending(id1) # buffered, not yet flushed repository.flush() entry = repository._chunks.get(id1) + assert not repository._chunks.is_pending(id1) assert entry.pack_id == sha256(fchunk(b"ZEROS")).digest() assert entry.size == 0 # uncompressed size filled in by cache layer From f50fd4fa5208639a31e1983e35b67f6632338d43 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Mon, 29 Jun 2026 22:45:55 +0530 Subject: [PATCH 2/3] hashindex: add tests for F_PENDING branches in repository list/get/flush --- src/borg/testsuite/repository_test.py | 39 +++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 70cac40dc0..2bd57a7089 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -522,6 +522,45 @@ def test_put_marks_id_in_chunk_index(tmp_path): assert entry.size == 0 # uncompressed size filled in by cache layer +def test_list_skips_pending_chunk(tmp_path): + # A buffered (not yet flushed) chunk is in the index but has no pack location, so list() must skip it. + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet + assert repository._chunks.is_pending(H(1)) + assert repository.list() == [] # pending chunk not listed + repository.flush() + assert [chunk_id for chunk_id, _ in repository.list()] == [H(1)] # listed once flushed + + +def test_get_pending_chunk_raises(tmp_path): + # get() on a buffered (not yet flushed) chunk is a flush/index ordering bug, not a miss: + # it must raise PackLocationUnknown regardless of raise_missing. + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet + assert repository._chunks.is_pending(H(1)) + with pytest.raises(Repository.PackLocationUnknown): + repository.get(H(1)) + with pytest.raises(Repository.PackLocationUnknown): + repository.get(H(1), raise_missing=False) # still raises: this is a code bug, not a miss + repository.flush() # flush the buffered chunk so close() does not assert on unflushed pieces + + +def test_flush_store_failure_drops_pending_entries(tmp_path): + # If storing the pack fails, flush() must remove the pending index entries so a later seen_chunk() + # does not dedup against chunks that were never written. + with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: + repository.put(H(1), fchunk(b"BUFFERED")) + assert repository._chunks.is_pending(H(1)) + + def boom(*args, **kwargs): + raise OSError("store failed") + + repository._pack_writer.store.store = boom + with pytest.raises(OSError): + repository.flush() + assert H(1) not in repository._chunks # pending entry removed after the failed store + + def test_check_detects_corruption_in_later_object(tmp_path): # Corruption anywhere in a multi-object pack must be caught, not just in the first object: the pack # is named by sha256(content), so flipping any byte makes its stored hash differ from its name. From c640a650014470cc6391d1d297ebcda2803b3349 Mon Sep 17 00:00:00 2001 From: Mrityunjay Raj Date: Mon, 29 Jun 2026 23:47:38 +0530 Subject: [PATCH 3/3] hashindex: make F_PENDING a user flag, drop redundant pending check F_PENDING moves to the user flag range so it is set/cleared via normal assignment; flush()'s store-failure path no longer re-checks is_pending. --- src/borg/hashindex.pyx | 18 ++++++++---------- src/borg/repository.py | 5 ++--- src/borg/testsuite/hashindex_test.py | 9 +++++---- src/borg/testsuite/repository_test.py | 18 ++++++++---------- 4 files changed, 23 insertions(+), 27 deletions(-) diff --git a/src/borg/hashindex.pyx b/src/borg/hashindex.pyx index baccdd6ddf..2241b47f8b 100644 --- a/src/borg/hashindex.pyx +++ b/src/borg/hashindex.pyx @@ -52,9 +52,9 @@ class ChunkIndex(HTProxyMixin, MutableMapping): # user flags: F_USED = 2 ** 0 # chunk is used/referenced F_COMPRESS = 2 ** 1 # chunk shall get (re-)compressed + F_PENDING = 2 ** 2 # pack location (pack_id, obj_offset, obj_size) not resolved yet. # system flags (internal use, always 0 to user, not changeable by user): F_NEW = 2 ** 24 # a new chunk that is not present in repo index/* yet. - F_PENDING = 2 ** 25 # pack location (pack_id, obj_offset, obj_size) not resolved yet. def __init__(self, capacity=1000, path=None, usable=None): if path: @@ -82,12 +82,11 @@ class ChunkIndex(HTProxyMixin, MutableMapping): else: flags = v.flags | self.F_USED assert v.size == 0 or v.size == size + # F_PENDING marks the pack location (pack_id, obj_offset, obj_size) as not yet set. self[key] = ChunkIndexEntry( - flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=flags | self.F_PENDING, size=size, + pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) - # __setitem__ ignores system flags in the assigned value, so set F_PENDING on the raw entry. - raw = self.ht[key] - self.ht[key] = raw._replace(flags=raw.flags | self.F_PENDING) def __getitem__(self, key): """Specialized __getitem__ that hides system flags.""" @@ -119,14 +118,13 @@ class ChunkIndex(HTProxyMixin, MutableMapping): return for chunk_id, pack_id, obj_offset, obj_size in pack_results: existing = self[chunk_id] - self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size) - # __setitem__ preserves existing system flags, so clear F_PENDING on the raw entry. - raw = self.ht[chunk_id] - self.ht[chunk_id] = raw._replace(flags=raw.flags & ~self.F_PENDING) + self[chunk_id] = existing._replace( + flags=existing.flags & ~self.F_PENDING, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size + ) def is_pending(self, key): """Return whether the chunk's pack location (pack_id, obj_offset, obj_size) is unresolved.""" - return bool(self.ht[key].flags & self.F_PENDING) + return bool(self[key].flags & self.F_PENDING) def clear_new(self): """Clears the F_NEW flag of all items.""" diff --git a/src/borg/repository.py b/src/borg/repository.py index 5398a59d20..f46f240f67 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -181,10 +181,9 @@ def flush(self): try: self.store.store(key, pack_data) except Exception: - # store failed: remove the pending entries so seen_chunk() does not dedup against - # chunks that were never written. + # the pack was not stored: drop the index entries for its chunks. for chunk_id in pending_ids: - if chunk_id in self.chunks and self.chunks.is_pending(chunk_id): + if chunk_id in self.chunks: # a chunk_id may appear more than once in this pack del self.chunks[chunk_id] raise finally: diff --git a/src/borg/testsuite/hashindex_test.py b/src/borg/testsuite/hashindex_test.py index f4a5fc2559..67e3c29d87 100644 --- a/src/borg/testsuite/hashindex_test.py +++ b/src/borg/testsuite/hashindex_test.py @@ -21,17 +21,18 @@ def test_chunkindex_add(): chunks = ChunkIndex() x = H2(1) chunks.add(x, 0) - assert chunks.is_pending(x) # unresolved until update_pack_info() + assert chunks.is_pending(x) + pending = ChunkIndex.F_USED | ChunkIndex.F_PENDING # add() sets F_PENDING alongside F_USED assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=pending, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) chunks.add(x, 2) # updating size (we do not have a size yet) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=pending, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) chunks.add(x, 2) assert chunks[x] == ChunkIndexEntry( - flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 + flags=pending, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32 ) with pytest.raises(AssertionError): chunks.add(x, 3) # inconsistent size (we already have a different size) diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 2bd57a7089..46bb438b76 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -523,31 +523,29 @@ def test_put_marks_id_in_chunk_index(tmp_path): def test_list_skips_pending_chunk(tmp_path): - # A buffered (not yet flushed) chunk is in the index but has no pack location, so list() must skip it. + # list() skips a pending chunk and yields it once flushed. with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet assert repository._chunks.is_pending(H(1)) - assert repository.list() == [] # pending chunk not listed + assert repository.list() == [] repository.flush() - assert [chunk_id for chunk_id, _ in repository.list()] == [H(1)] # listed once flushed + assert [chunk_id for chunk_id, _ in repository.list()] == [H(1)] def test_get_pending_chunk_raises(tmp_path): - # get() on a buffered (not yet flushed) chunk is a flush/index ordering bug, not a miss: - # it must raise PackLocationUnknown regardless of raise_missing. + # get() on a pending chunk raises PackLocationUnknown, also with raise_missing=False. with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet assert repository._chunks.is_pending(H(1)) with pytest.raises(Repository.PackLocationUnknown): repository.get(H(1)) with pytest.raises(Repository.PackLocationUnknown): - repository.get(H(1), raise_missing=False) # still raises: this is a code bug, not a miss - repository.flush() # flush the buffered chunk so close() does not assert on unflushed pieces + repository.get(H(1), raise_missing=False) + repository.flush() # close() requires the buffer to be empty def test_flush_store_failure_drops_pending_entries(tmp_path): - # If storing the pack fails, flush() must remove the pending index entries so a later seen_chunk() - # does not dedup against chunks that were never written. + # flush() removes the pending index entries when storing the pack fails. with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository: repository.put(H(1), fchunk(b"BUFFERED")) assert repository._chunks.is_pending(H(1)) @@ -558,7 +556,7 @@ def boom(*args, **kwargs): repository._pack_writer.store.store = boom with pytest.raises(OSError): repository.flush() - assert H(1) not in repository._chunks # pending entry removed after the failed store + assert H(1) not in repository._chunks def test_check_detects_corruption_in_later_object(tmp_path):