Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from .crypto.low_level import IntegrityError as IntegrityErrorBase
from .helpers import BackupError, BackupRaceConditionError, BackupItemExcluded
from .helpers import BackupOSError, BackupPermissionError, BackupFileNotFoundError, BackupIOError
from .hashindex import ChunkIndex, ChunkIndexEntry
from .helpers import HardLinkManager
from .helpers import ChunkIteratorFileWrapper, open_item
from .helpers import Error, IntegrityError, set_ec
Expand Down Expand Up @@ -1968,13 +1967,7 @@ def add_reference(id_, size, cdata):
# either we already have this chunk in repo and chunks index or we add it now
if id_ not in self.chunks:
assert cdata is not None
self.chunks[id_] = ChunkIndexEntry(
flags=ChunkIndex.F_USED,
size=size,
pack_id=UNKNOWN_BYTES32,
obj_offset=UNKNOWN_INT32,
obj_size=UNKNOWN_INT32,
)
self.chunks.add(id_, size)
if self.repair:
pack_results = self.repository.put(id_, cdata)
self.chunks.update_pack_info(pack_results)
Expand Down
3 changes: 2 additions & 1 deletion src/borg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
# Grep for UNKNOWN_INT32 to find every site that still needs updating.
UNKNOWN_INT32 = 0xFFFFFFFF

# Placeholder for pack_id (32-byte field) when the value is not yet known.
# Filler for the pack_id (32-byte field) while the real value is unknown. Never interpreted;
# an unresolved pack location is tracked by the ChunkIndex.F_PENDING flag.
UNKNOWN_BYTES32 = b"\xff" * 32

# MAX_OBJECT_SIZE = MAX_DATA_SIZE + len(PUT header)
Expand Down
16 changes: 13 additions & 3 deletions src/borg/hashindex.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
# user flags:
F_USED = 2 ** 0 # chunk is used/referenced
F_COMPRESS = 2 ** 1 # chunk shall get (re-)compressed
F_PENDING = 2 ** 2 # pack location (pack_id, obj_offset, obj_size) not resolved yet.
# system flags (internal use, always 0 to user, not changeable by user):
F_NEW = 2 ** 24 # a new chunk that is not present in repo index/* yet.

Expand Down Expand Up @@ -81,8 +82,10 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
else:
flags = v.flags | self.F_USED
assert v.size == 0 or v.size == size
# F_PENDING marks the pack location (pack_id, obj_offset, obj_size) as not yet set.
self[key] = ChunkIndexEntry(
flags=flags, size=size, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
flags=flags | self.F_PENDING, size=size,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could also add this to line 81 and 83.

btw, check in what case the code in lines 83/84 gets used. it seems to deal with an already existing chunks index entry there - would that also be "pending"?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: it seems to be correct, in that case it will be updated soon after again to the real packid.

Add a one-line code comment: in add(), note that the pack fields are always (re)set to UNKNOWN/pending on purpose, so a re-add invalidates any prior location until the next flush().

pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
)

def __getitem__(self, key):
Expand All @@ -109,12 +112,19 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
self.ht[key] = value._replace(flags=system_flags | user_flags)

def update_pack_info(self, pack_results):
"""Update the on-disk location fields for a list of (chunk_id, pack_id, obj_offset, obj_size) tuples."""
"""Set pack_id, obj_offset and obj_size from a list of (chunk_id, pack_id, obj_offset, obj_size)
tuples and clear F_PENDING."""
if not pack_results:
return
for chunk_id, pack_id, obj_offset, obj_size in pack_results:
existing = self[chunk_id]
self[chunk_id] = existing._replace(pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size)
self[chunk_id] = existing._replace(
flags=existing.flags & ~self.F_PENDING, pack_id=pack_id, obj_offset=obj_offset, obj_size=obj_size
)

def is_pending(self, key):
"""Return whether the chunk's pack location (pack_id, obj_offset, obj_size) is unresolved."""
return bool(self[key].flags & self.F_PENDING)

def clear_new(self):
"""Clears the F_NEW flag of all items."""
Expand Down
27 changes: 10 additions & 17 deletions src/borg/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ def build_rest_backend(location):
class PackWriter:
"""Buffers chunks into a pack file and writes it to the store when full.

add() buffers a (chunk_id, cdata) pair and marks the chunk pending in the
ChunkIndex (pack_id=UNKNOWN_BYTES32); flush() writes the pack and sets each
entry's real pack_id and obj_offset.
add() buffers a (chunk_id, cdata) pair and marks the chunk pending (F_PENDING);
flush() writes the pack and sets each entry's pack_id, obj_offset and obj_size,
clearing F_PENDING.

The ChunkIndex comes from the repository, or from an explicit chunks index when
there is no repository (see the chunks property).
Expand Down Expand Up @@ -142,8 +142,6 @@ def chunks(self):
def add(self, chunk_id, cdata):
"""Buffer a chunk. Returns flush results if the pack is now full, else None."""
self.chunks.add(chunk_id, 0) # size: plaintext chunk size, set by the cache layer
# obj_size is final; pack_id and obj_offset get their real values in flush().
self.chunks.update_pack_info([(chunk_id, UNKNOWN_BYTES32, 0, len(cdata))])
self._pieces.append((chunk_id, cdata))
self._size += len(cdata)
if (self.max_count is not None and len(self._pieces) >= self.max_count) or (
Expand Down Expand Up @@ -179,22 +177,19 @@ def flush(self):
offset += obj_size

key = "packs/" + bin_to_hex(pack_id)
# ids this flush pre-marked in the index via add() (pack_id still UNKNOWN_BYTES32).
pending_ids = [chunk_id for chunk_id, _ in self._pieces]
try:
self.store.store(key, pack_data)
except Exception:
# The pack is not stored, so drop the pending entries: keeping them would let
# seen_chunk() dedup later chunks against bytes that were never written.
# the pack was not stored: drop the index entries for its chunks.
for chunk_id in pending_ids:
entry = self.chunks.get(chunk_id)
if entry is not None and entry.pack_id == UNKNOWN_BYTES32:
if chunk_id in self.chunks: # a chunk_id may appear more than once in this pack
del self.chunks[chunk_id]
raise
finally:
self._pieces = [] # cleared on success and on failure
self._size = 0
self.chunks.update_pack_info(results) # replace UNKNOWN_BYTES32 with real pack_id
self.chunks.update_pack_info(results) # set the real location and clear F_PENDING
return results


Expand Down Expand Up @@ -711,7 +706,7 @@ def list(self, limit=None, marker=None):
collect = marker is None
result = []
for chunk_id, entry in self.chunks.iteritems():
if entry.pack_id == UNKNOWN_BYTES32:
if self.chunks.is_pending(chunk_id):
continue # buffered in PackWriter, not flushed to a pack yet
if collect:
result.append((chunk_id, entry.obj_size))
Expand All @@ -728,11 +723,9 @@ def get(self, id, read_data=True, raise_missing=True):
if raise_missing:
raise self.ObjectNotFound(id, str(self._location))
return None
if entry.pack_id == UNKNOWN_BYTES32:
# chunk is buffered in PackWriter, not yet flushed to a pack. Everything must be flushed
# before it can be read back, so reaching here points at a flush / index-update ordering
# bug, not a genuinely missing object. this is a code bug, so we crash loudly regardless
# of raise_missing instead of pretending the object is absent.
if self.chunks.is_pending(id):
# buffered but not flushed; a chunk must be flushed before any read, so this is a code
# bug (wrong flush/index ordering), not a missing object: raise regardless of raise_missing.
raise self.PackLocationUnknown(id, str(self._location))
pack_id, obj_offset, obj_size = entry.pack_id, entry.obj_offset, entry.obj_size
id_hex = bin_to_hex(id)
Expand Down
13 changes: 10 additions & 3 deletions src/borg/testsuite/hashindex_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ def test_chunkindex_add():
chunks = ChunkIndex()
x = H2(1)
chunks.add(x, 0)
assert chunks.is_pending(x)
pending = ChunkIndex.F_USED | ChunkIndex.F_PENDING # add() sets F_PENDING alongside F_USED
assert chunks[x] == ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
flags=pending, size=0, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
)
chunks.add(x, 2) # updating size (we do not have a size yet)
assert chunks[x] == ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
flags=pending, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
)
chunks.add(x, 2)
assert chunks[x] == ChunkIndexEntry(
flags=ChunkIndex.F_USED, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
flags=pending, size=2, pack_id=UNKNOWN_BYTES32, obj_offset=UNKNOWN_INT32, obj_size=UNKNOWN_INT32
)
with pytest.raises(AssertionError):
chunks.add(x, 3) # inconsistent size (we already have a different size)
Expand All @@ -43,10 +45,15 @@ def test_chunkindex_update_pack_info():
chunks.add(x2, 20)
assert chunks[x1].obj_offset == UNKNOWN_INT32
assert chunks[x2].obj_offset == UNKNOWN_INT32
assert chunks.is_pending(x1)
assert chunks.is_pending(x2)

pack_id = H2(3)
# Both chunks land in the same pack: batch update in one call.
chunks.update_pack_info([(x1, pack_id, 0, 50), (x2, pack_id, 50, 60)])
# resolving the location clears the pending flag
assert not chunks.is_pending(x1)
assert not chunks.is_pending(x2)
# Location fields updated; flags and size must be unchanged.
assert chunks[x1] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=10, pack_id=pack_id, obj_offset=0, obj_size=50)
assert chunks[x2] == ChunkIndexEntry(flags=ChunkIndex.F_USED, size=20, pack_id=pack_id, obj_offset=50, obj_size=60)
Expand Down
48 changes: 42 additions & 6 deletions src/borg/testsuite/repository_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import pytest
from ..helpers import IntegrityError, Location, bin_to_hex
from ..hashindex import ChunkIndex
from ..constants import UNKNOWN_BYTES32
from ..repository import Repository, MAX_DATA_SIZE, rest_serve_command, PackWriter, PackReader
from ..repoobj import RepoObj, OBJ_MAGIC, OBJ_VERSION
from .hashindex_test import H
Expand Down Expand Up @@ -187,11 +186,11 @@ def test_multi_object_pack_roundtrip(repo_fixtures, request):
with get_repository_from_fixture(repo_fixtures, request) as repository:
repository._pack_writer.max_count = 2 # this test is written for exactly two objects per pack
repository.put(H(0), chunk0)
assert repository.chunks[H(0)].pack_id == UNKNOWN_BYTES32 # buffered: the pack is not full yet
assert repository.chunks.is_pending(H(0)) # buffered: the pack is not full yet
repository.put(H(1), chunk1) # fills the pack, flushing both objects at once
# both objects share one pack, written exactly once, laid out in put() order
pack_id = repository.chunks[H(0)].pack_id
assert pack_id != UNKNOWN_BYTES32
assert not repository.chunks.is_pending(H(0))
assert repository.chunks[H(1)].pack_id == pack_id
assert [info.name for info in repository.store_list("packs")] == [bin_to_hex(pack_id)]
assert repository.chunks[H(0)].obj_offset == 0
Expand Down Expand Up @@ -509,20 +508,57 @@ def test_get_uses_chunk_index_location(tmp_path):


def test_put_marks_id_in_chunk_index(tmp_path):
# put() marks the id pending (pack_id=UNKNOWN_BYTES32); flush() then fills in the
# real pack location for the current session.
# put() marks the id pending; flush() sets the real pack location and clears the pending flag.
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
id1 = H(1)
repository.put(id1, fchunk(b"ZEROS"))
entry = repository._chunks.get(id1)
assert entry is not None
assert entry.pack_id == UNKNOWN_BYTES32 # buffered, not yet flushed
assert repository._chunks.is_pending(id1) # buffered, not yet flushed
repository.flush()
entry = repository._chunks.get(id1)
assert not repository._chunks.is_pending(id1)
assert entry.pack_id == sha256(fchunk(b"ZEROS")).digest()
assert entry.size == 0 # uncompressed size filled in by cache layer


def test_list_skips_pending_chunk(tmp_path):
# list() skips a pending chunk and yields it once flushed.
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet
assert repository._chunks.is_pending(H(1))
assert repository.list() == []
repository.flush()
assert [chunk_id for chunk_id, _ in repository.list()] == [H(1)]


def test_get_pending_chunk_raises(tmp_path):
# get() on a pending chunk raises PackLocationUnknown, also with raise_missing=False.
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
repository.put(H(1), fchunk(b"BUFFERED")) # buffered: the pack is not full yet
assert repository._chunks.is_pending(H(1))
with pytest.raises(Repository.PackLocationUnknown):
repository.get(H(1))
with pytest.raises(Repository.PackLocationUnknown):
repository.get(H(1), raise_missing=False)
repository.flush() # close() requires the buffer to be empty


def test_flush_store_failure_drops_pending_entries(tmp_path):
# flush() removes the pending index entries when storing the pack fails.
with Repository(str(tmp_path / "repo"), exclusive=True, create=True) as repository:
repository.put(H(1), fchunk(b"BUFFERED"))
assert repository._chunks.is_pending(H(1))

def boom(*args, **kwargs):
raise OSError("store failed")

repository._pack_writer.store.store = boom
with pytest.raises(OSError):
repository.flush()
assert H(1) not in repository._chunks


def test_check_detects_corruption_in_later_object(tmp_path):
# Corruption anywhere in a multi-object pack must be caught, not just in the first object: the pack
# is named by sha256(content), so flipping any byte makes its stored hash differ from its name.
Expand Down
Loading