diff --git a/src/borg/archive.py b/src/borg/archive.py index 9c1f9015fc..00d5c9043f 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1861,9 +1861,14 @@ def verify_data(self): # we must decompress, so it'll call assert_id() in there: self.repo_objs.parse(defect_chunk, encrypted_data, decompress=True, ro_type=ROBJ_DONTCARE) except IntegrityErrorBase: - # failed twice -> get rid of this chunk + # failed twice -> get rid of this chunk by dropping the pack file it lives in. + # At N=1 a pack holds exactly this one object (pack_id == chunk_id), so this + # removes only the defect. N>1 TODO: a defect inside a shared pack needs + # copy-forward of the good neighbours (see + # ArchiveGarbageCollector._copy_forward_pack); until then we address by chunk id. + pack_id = self.chunks[defect_chunk].pack_id del self.chunks[defect_chunk] - self.repository.delete(defect_chunk) + self.repository.delete_pack(pack_id) logger.debug("chunk %s deleted.", bin_to_hex(defect_chunk)) else: logger.warning("chunk %s not deleted, did not consistently fail.", bin_to_hex(defect_chunk)) diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 335826d26b..69443ed9b7 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -11,7 +11,7 @@ from ..helpers import set_ec, EXIT_ERROR, format_file_size, bin_to_hex from ..helpers import ProgressIndicatorPercent from ..manifest import Manifest -from ..repository import Repository, repo_lister +from ..repository import Repository, PackWriter, repo_lister from ..logger import create_logger @@ -171,29 +171,7 @@ def report_and_delete(self): logger.warning(f"Soft-deleted archive {name} {hex_id} not found.") repo_size_before = self.repository_size - logger.info("Determining unused objects...") - unused = set() - for id, entry in self.chunks.iteritems(): - if not (entry.flags & ChunkIndex.F_USED): - unused.add(id) - logger.info(f"Deleting {len(unused)} unused objects...") - if unused: - # Before deleting any repository object, invalidate all centrally cached chunk indexes. - # Otherwise, if we get interrupted within the deletion loop, the still-existing cache/chunks.* - # would claim that already-deleted objects are still present. A later "borg create" would then - # trust that stale index, not re-upload the affected chunks and silently create an archive with - # dangling object references (see issue #9748). By removing the cached indexes first, an - # interruption is conservative: clients must rebuild the index from actual repository contents - # and will re-upload any deleted data. save_chunk_index() writes a fresh, valid index afterwards. - delete_chunkindex_cache(self.repository) - pi = ProgressIndicatorPercent( - total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete" - ) - for i, id in enumerate(unused): - pi.show(i) - self.repository.delete(id) - del self.chunks[id] - pi.finish() + self.compact_packs() repo_size_after = self.repository_size count = len(self.chunks) @@ -214,6 +192,94 @@ def report_and_delete(self): else: logger.info(f"Repository has data stored in {count} objects.") + def compact_packs(self): + """Reclaim space at the granularity the store supports: whole pack files. + + The store is append-only and content-addressed, so we never delete a single object in + place. Instead we look at each pack file as a unit, using the F_USED flags that + analyze_archives() set on the index: + + - all objects unused -> drop the whole pack file. + - all objects used -> leave it untouched. + - mixed (some used) -> copy the still-used objects forward into a fresh pack, repoint + their index entries, then drop the old pack whole. The unused + objects are simply never copied -- that is how their space goes. + + At N=1 a pack holds exactly one object, so a pack is always either all-used or all-unused + and the mixed branch is unreachable; the observable result is identical to deleting the + unused single-object packs. The mixed branch is the N>1 copy-forward path; it is proven + here by tests that force max_count > 1. + """ + logger.info("Determining unused objects...") + # Group every known object by the pack file it lives in. We collect into a plain dict + # first (fully draining the iterator) so we can mutate self.chunks afterwards. + packs: dict[bytes, dict[str, list[bytes]]] = {} + for id, entry in self.chunks.iteritems(): + bucket = packs.setdefault(entry.pack_id, {"used": [], "unused": []}) + if entry.flags & ChunkIndex.F_USED: + bucket["used"].append(id) + else: + bucket["unused"].append(id) + + unused = [id for bucket in packs.values() for id in bucket["unused"]] + packs_to_drop = [pid for pid, b in packs.items() if b["unused"] and not b["used"]] + packs_to_rewrite = [pid for pid, b in packs.items() if b["unused"] and b["used"]] + + logger.info(f"Deleting {len(unused)} unused objects...") + if unused: + # Before mutating the store, invalidate all centrally cached chunk indexes. + # Otherwise, if we get interrupted partway through, the still-existing cache/chunks.* + # would claim that already-removed objects are still present. A later "borg create" would + # then trust that stale index, not re-upload the affected chunks and silently create an + # archive with dangling object references (see issue #9748). By removing the cached indexes + # first, an interruption is conservative: clients must rebuild the index from actual + # repository contents and will re-upload any removed data. save_chunk_index() writes a + # fresh, valid index afterwards. + delete_chunkindex_cache(self.repository) + + pi = ProgressIndicatorPercent( + total=len(packs_to_drop) + len(packs_to_rewrite), + msg="Compacting packs %3.1f%%", + step=0.1, + msgid="compact.report_and_delete", + ) + progress = 0 + for pack_id in packs_to_drop: + pi.show(progress) + progress += 1 + # every object in this pack is unused -> the file goes as a whole. + self.repository.delete_pack(pack_id) + for id in packs[pack_id]["unused"]: + del self.chunks[id] + for pack_id in packs_to_rewrite: + pi.show(progress) + progress += 1 + # N>1 only: keep the referenced objects by rewriting them into a fresh pack. + self._copy_forward_pack(pack_id, packs[pack_id]["used"]) + for id in packs[pack_id]["unused"]: + del self.chunks[id] + pi.finish() + + def _copy_forward_pack(self, pack_id, used_ids): + """Rewrite a mixed pack's still-used objects into a fresh pack, then drop the old pack. + + Only reached at N>1 (a pack holding more than one object). Reads each survivor's stored + bytes via the current index location, writes them through a PackWriter bound to this + collector's index (so the new pack location lands in the index we persist), then drops + the old pack whole. Reading/writing goes through the local store, which is why this path + is exercised by local tests; routing it through repository RPC for remote N>1 is left to + the N>1 PR (it never runs at N=1, the only mode that exists today). + """ + store = self.repository.store + writer = PackWriter(store, max_count=self.repository.pack_max_count, chunks=self.chunks) + for id in used_ids: + entry = self.chunks[id] + key = "packs/" + bin_to_hex(entry.pack_id) + cdata = store.load(key, offset=entry.obj_offset, size=entry.obj_size) + writer.add(id, cdata) + writer.flush() + self.repository.delete_pack(pack_id) + class CompactMixIn: @with_repository(exclusive=True, compatibility=(Manifest.Operation.DELETE,)) diff --git a/src/borg/archiver/debug_cmd.py b/src/borg/archiver/debug_cmd.py index 723d413af5..592022bc3f 100644 --- a/src/borg/archiver/debug_cmd.py +++ b/src/borg/archiver/debug_cmd.py @@ -293,7 +293,10 @@ def do_debug_delete_obj(self, args, repository): print("object id %s is invalid." % hex_id) else: try: - repository.delete(id) + # N=1: pack_id == chunk_id, so the given object id names its pack file and we + # drop that pack whole. N>1 TODO: this should mark the object deleted in the + # index instead, since a pack then holds more than one object. + repository.delete_pack(id) print("object %s deleted." % hex_id) except Repository.ObjectNotFound: print("object %s not found." % hex_id) diff --git a/src/borg/repository.py b/src/borg/repository.py index e30421436a..7307c125ea 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -530,6 +530,15 @@ def is_chunk_index_loaded(self): """ return self._chunks is not None + @property + def pack_max_count(self): + """How many objects a freshly written pack may bundle (the N in "N objects per pack"). + + N=1 today, so every pack holds one object; raising this later enables real packing. + compact reads this to size the pack writer it uses for copy-forward. + """ + return self._pack_writer.max_count + def flush(self): """Flush any buffered pack writer chunks.""" if self._pack_writer is not None: @@ -799,24 +808,30 @@ def put(self, id, data, wait=True): # PackWriter shares this repository's index, so add() triggers the lazy build itself. return self._pack_writer.add(id, data) - def delete(self, id, wait=True): - """delete a repo object + def delete_pack(self, pack_id, wait=True): + """delete a whole pack file from the store, addressed by its pack_id. - Note: when doing calls with wait=False this gets async and caller must - deal with async results / exceptions later. + This is the only space-reclaiming primitive: the store is append-only and + content-addressed, so the smallest unit it can remove is a whole pack file. + There is deliberately no per-object delete -- removing a single object out of a + pack that still holds referenced neighbours is done by copy-forward (compact + rewrites the survivors into a fresh pack, then drops the old pack as a whole). + + At N=1 a pack holds exactly one object and pack_id == chunk_id, so dropping the + pack is the same as dropping that one object; callers that still think in object + ids pass the id as the pack_id and that stays correct until N>1 lands. """ self._lock_refresh() - pack_id = id # N=1: pack_id == chunk_id key = "packs/" + bin_to_hex(pack_id) try: self.store.delete(key) except StoreObjectNotFound: - raise self.ObjectNotFound(id, str(self._location)) from None + raise self.ObjectNotFound(pack_id, str(self._location)) from None def async_response(self, wait=True): """Get one async result (only applies to remote repositories). - async commands (== calls with wait=False, e.g. delete and put) have no results, + async commands (== calls with wait=False, e.g. put) have no results, but may raise exceptions. These async exceptions must get collected later via async_response() calls. Repeat the call until it returns None. The previous calls might either return one (non-None) result or raise an exception. diff --git a/src/borg/testsuite/archiver/check_cmd_test.py b/src/borg/testsuite/archiver/check_cmd_test.py index 17804d7990..313e71148e 100644 --- a/src/borg/testsuite/archiver/check_cmd_test.py +++ b/src/borg/testsuite/archiver/check_cmd_test.py @@ -162,7 +162,7 @@ def test_missing_file_chunk(archivers, request): if item.path.endswith(src_file): valid_chunks = item.chunks killed_chunk = valid_chunks[-1] - repository.delete(killed_chunk.id) + repository.delete_pack(killed_chunk.id) # N=1: pack_id == chunk_id break else: pytest.fail("should not happen") # convert 'fail' @@ -198,7 +198,7 @@ def test_missing_archive_item_chunk(archivers, request): check_cmd_setup(archiver) archive, repository = open_archive(archiver.repository_path, "archive1") with repository: - repository.delete(archive.metadata.items[0]) + repository.delete_pack(archive.metadata.items[0]) # N=1: pack_id == chunk_id cmd(archiver, "check", exit_code=1) cmd(archiver, "check", "--repair", exit_code=0) cmd(archiver, "check", exit_code=0) @@ -209,7 +209,7 @@ def test_missing_archive_metadata(archivers, request): check_cmd_setup(archiver) archive, repository = open_archive(archiver.repository_path, "archive1") with repository: - repository.delete(archive.id) + repository.delete_pack(archive.id) # N=1: pack_id == chunk_id cmd(archiver, "check", exit_code=1) cmd(archiver, "check", "--repair", exit_code=0) cmd(archiver, "check", exit_code=0) @@ -446,5 +446,5 @@ def test_empty_repository(archivers, request): check_cmd_setup(archiver) with Repository(archiver.repository_location, exclusive=True) as repository: for id, _ in repository.list(): - repository.delete(id) + repository.delete_pack(id) # N=1: pack_id == chunk_id cmd(archiver, "check", exit_code=1) diff --git a/src/borg/testsuite/archiver/compact_cmd_test.py b/src/borg/testsuite/archiver/compact_cmd_test.py index ef87785a6d..481bcd8a5a 100644 --- a/src/borg/testsuite/archiver/compact_cmd_test.py +++ b/src/borg/testsuite/archiver/compact_cmd_test.py @@ -4,11 +4,16 @@ import pytest from ...constants import * # NOQA -from ...helpers import get_cache_dir +from ...archiver.compact_cmd import ArchiveGarbageCollector +from ...hashindex import ChunkIndex +from ...helpers import get_cache_dir, bin_to_hex from ...cache import files_cache_name, discover_files_cache_names, list_chunkindex_hashes from ...manifest import Manifest +from ...repository import Repository from . import cmd, create_regular_file, create_src_archive, generate_archiver_tests, open_repository, RK_ENCRYPTION from . import changedir +from ..hashindex_test import H +from ..repository_test import fchunk, pdchunk pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary") # NOQA @@ -193,3 +198,87 @@ def test_compact_files_cache_cleanup(archivers, request): # Get expected cache files for remaining archives expected_cache_files = {files_cache_name(name) for name in ["archive1", "archive3"]} assert expected_cache_files == remaining_cache_files, "Unexpected cache files found" + + +def _pack_names(repo): + # set of pack file names (hex pack_ids) currently present in the store. + return {info.name for info in repo.store.list("packs")} + + +def _load_obj(repo, index, chunk_id): + # read an object's stored bytes via its current index location (same range-load get() does). + entry = index[chunk_id] + return repo.store.load("packs/" + bin_to_hex(entry.pack_id), offset=entry.obj_offset, size=entry.obj_size) + + +def test_compact_drops_whole_unused_packs(tmp_path): + """N=1: each pack holds exactly one object, so compact reclaims space by dropping whole + unused pack files -- there is no per-object delete. The used object's pack stays; the + unused ones are removed as a whole.""" + repo_location = os.fspath(tmp_path / "repo") + with Repository(repo_location, exclusive=True, create=True) as repo: + # default max_count == 1: one object per pack. + for i in range(3): + repo.put(H(i), fchunk(b"data%d" % i)) + repo.flush() + index = repo.chunks + assert len(_pack_names(repo)) == 3 + + # simulate analyze_archives(): start from "unused" (put() marks new chunks F_USED), then + # mark only H(0) as referenced; H(1) and H(2) stay unused. + gc = ArchiveGarbageCollector(repo, manifest=None, stats=False, iec=False) + gc.chunks = index + for i in range(3): + entry = index[H(i)] + flags = ChunkIndex.F_USED if i == 0 else ChunkIndex.F_NONE + index[H(i)] = entry._replace(flags=flags) + + gc.compact_packs() + + # only the used object's pack survives; the two unused single-object packs are gone. + assert _pack_names(repo) == {bin_to_hex(index[H(0)].pack_id)} + assert H(0) in index + assert H(1) not in index and H(2) not in index + assert pdchunk(_load_obj(repo, index, H(0))) == b"data0" + + +def test_compact_copy_forward_mixed_pack(tmp_path): + """N>1: a single pack can hold both used and unused objects. compact must keep the used + ones by copying them into a fresh pack and drop the old pack as a whole -- never delete a + single object in place. We force max_count > 1 to build such a mixed pack, since the N=1 + production path never produces one. This proves the copy-forward design ahead of N>1.""" + repo_location = os.fspath(tmp_path / "repo") + with Repository(repo_location, exclusive=True, create=True) as repo: + # build ONE pack holding three objects by bundling (max_count = 3). + repo._pack_writer.max_count = 3 + repo.put(H(0), fchunk(b"data0")) + repo.put(H(1), fchunk(b"data1")) + repo.put(H(2), fchunk(b"data2")) # third add fills the pack and flushes it + repo.flush() + + index = repo.chunks + old_pack_id = index[H(0)].pack_id + assert {index[H(i)].pack_id for i in range(3)} == {old_pack_id} # all three share one pack + assert _pack_names(repo) == {bin_to_hex(old_pack_id)} + + # simulate analyze_archives(): start from "unused" (put() marks new chunks F_USED), then + # mark H(0) and H(2) as referenced; H(1) stays unused. + gc = ArchiveGarbageCollector(repo, manifest=None, stats=False, iec=False) + gc.chunks = index + for i in range(3): + entry = index[H(i)] + flags = ChunkIndex.F_USED if i in (0, 2) else ChunkIndex.F_NONE + index[H(i)] = entry._replace(flags=flags) + + gc.compact_packs() + + # the unused object is gone from the index; the survivors moved into a new pack. + assert H(1) not in index + new_pack_id = index[H(0)].pack_id + assert new_pack_id != old_pack_id + assert index[H(2)].pack_id == new_pack_id + + # old pack dropped whole, new pack present, survivors still hold their original bytes. + assert _pack_names(repo) == {bin_to_hex(new_pack_id)} + assert pdchunk(_load_obj(repo, index, H(0))) == b"data0" + assert pdchunk(_load_obj(repo, index, H(2))) == b"data2" diff --git a/src/borg/testsuite/archiver/extract_cmd_test.py b/src/borg/testsuite/archiver/extract_cmd_test.py index 7a19d46b5a..d8a30b3bc3 100644 --- a/src/borg/testsuite/archiver/extract_cmd_test.py +++ b/src/borg/testsuite/archiver/extract_cmd_test.py @@ -800,7 +800,7 @@ def test_extract_file_with_missing_chunk(archivers, request): for item in archive.iter_items(): if item.path.endswith(src_file): chunk = item.chunks[-1] - repository.delete(chunk.id) + repository.delete_pack(chunk.id) # N=1: pack_id == chunk_id break else: assert False # missed the file diff --git a/src/borg/testsuite/archiver/mount_cmds_test.py b/src/borg/testsuite/archiver/mount_cmds_test.py index c979ba4e3d..08453fc5a4 100644 --- a/src/borg/testsuite/archiver/mount_cmds_test.py +++ b/src/borg/testsuite/archiver/mount_cmds_test.py @@ -234,7 +234,7 @@ def test_fuse_allow_damaged_files(archivers, request): with repository: for item in archive.iter_items(): if item.path.endswith(src_file): - repository.delete(item.chunks[-1].id) + repository.delete_pack(item.chunks[-1].id) # N=1: pack_id == chunk_id path = item.path # store full path for later break else: diff --git a/src/borg/testsuite/repository_test.py b/src/borg/testsuite/repository_test.py index 1192e9f12a..abd4b63d00 100644 --- a/src/borg/testsuite/repository_test.py +++ b/src/borg/testsuite/repository_test.py @@ -82,7 +82,7 @@ def test_basic_operations(repo_fixtures, request): repository.put(H(x), fchunk(b"SOMEDATA")) # put() updates _chunks via PackWriter key50 = H(50) assert pdchunk(repository.get(key50)) == b"SOMEDATA" - repository.delete(key50) + repository.delete_pack(key50) # N=1: pack_id == chunk_id with pytest.raises(Repository.ObjectNotFound): repository.get(key50) # no manual hand-off of the index across reopen: close() persisted it to the repo cache, @@ -142,7 +142,7 @@ def test_consistency(repo_fixtures, request): assert pdchunk(repository.get(H(0))) == b"foo2" repository.put(H(0), fchunk(b"bar")) assert pdchunk(repository.get(H(0))) == b"bar" - repository.delete(H(0)) + repository.delete_pack(H(0)) # N=1: pack_id == chunk_id with pytest.raises(Repository.ObjectNotFound): repository.get(H(0)) @@ -169,7 +169,7 @@ def test_max_data_size(repo_fixtures, request): assert pdchunk(repository.get(H(0))) == max_data with pytest.raises(IntegrityError): repository.put(H(1), fchunk(max_data + b"x")) - repository.delete(H(0)) + repository.delete_pack(H(0)) # N=1: pack_id == chunk_id def check(repository, repo_path, repair=False, status=True):