-
-
Notifications
You must be signed in to change notification settings - Fork 858
compact: copy-forward packs, retire Repository.delete #9777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,7 +11,7 @@ | |
| from ..helpers import set_ec, EXIT_ERROR, format_file_size, bin_to_hex | ||
| from ..helpers import ProgressIndicatorPercent | ||
| from ..manifest import Manifest | ||
| from ..repository import Repository, repo_lister | ||
| from ..repository import Repository, PackWriter, repo_lister | ||
|
|
||
| from ..logger import create_logger | ||
|
|
||
|
|
@@ -171,29 +171,7 @@ def report_and_delete(self): | |
| logger.warning(f"Soft-deleted archive {name} {hex_id} not found.") | ||
|
|
||
| repo_size_before = self.repository_size | ||
| logger.info("Determining unused objects...") | ||
| unused = set() | ||
| for id, entry in self.chunks.iteritems(): | ||
| if not (entry.flags & ChunkIndex.F_USED): | ||
| unused.add(id) | ||
| logger.info(f"Deleting {len(unused)} unused objects...") | ||
| if unused: | ||
| # Before deleting any repository object, invalidate all centrally cached chunk indexes. | ||
| # Otherwise, if we get interrupted within the deletion loop, the still-existing cache/chunks.* | ||
| # would claim that already-deleted objects are still present. A later "borg create" would then | ||
| # trust that stale index, not re-upload the affected chunks and silently create an archive with | ||
| # dangling object references (see issue #9748). By removing the cached indexes first, an | ||
| # interruption is conservative: clients must rebuild the index from actual repository contents | ||
| # and will re-upload any deleted data. save_chunk_index() writes a fresh, valid index afterwards. | ||
| delete_chunkindex_cache(self.repository) | ||
| pi = ProgressIndicatorPercent( | ||
| total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete" | ||
| ) | ||
| for i, id in enumerate(unused): | ||
| pi.show(i) | ||
| self.repository.delete(id) | ||
| del self.chunks[id] | ||
| pi.finish() | ||
| self.compact_packs() | ||
| repo_size_after = self.repository_size | ||
|
|
||
| count = len(self.chunks) | ||
|
|
@@ -214,6 +192,94 @@ def report_and_delete(self): | |
| else: | ||
| logger.info(f"Repository has data stored in {count} objects.") | ||
|
|
||
| def compact_packs(self): | ||
| """Reclaim space at the granularity the store supports: whole pack files. | ||
|
|
||
| The store is append-only and content-addressed, so we never delete a single object in | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. append-only is a term from the borg 1.x past, don't use it if it makes no sense anymore, that just adds confusion. at the store level, full packs are just added usually. removed only by special borg operations like compact or check. there is also no ordering anymore as in 1.x segment files 0..N, so when looking at the whole store, one maybe can say add-only, but append-only implies an "end" of the store that does not exist anymore. |
||
| place. Instead we look at each pack file as a unit, using the F_USED flags that | ||
| analyze_archives() set on the index: | ||
|
|
||
| - all objects unused -> drop the whole pack file. | ||
| - all objects used -> leave it untouched. | ||
| - mixed (some used) -> copy the still-used objects forward into a fresh pack, repoint | ||
| their index entries, then drop the old pack whole. The unused | ||
| objects are simply never copied -- that is how their space goes. | ||
|
|
||
| At N=1 a pack holds exactly one object, so a pack is always either all-used or all-unused | ||
| and the mixed branch is unreachable; the observable result is identical to deleting the | ||
| unused single-object packs. The mixed branch is the N>1 copy-forward path; it is proven | ||
| here by tests that force max_count > 1. | ||
| """ | ||
| logger.info("Determining unused objects...") | ||
| # Group every known object by the pack file it lives in. We collect into a plain dict | ||
| # first (fully draining the iterator) so we can mutate self.chunks afterwards. | ||
| packs: dict[bytes, dict[str, list[bytes]]] = {} | ||
| for id, entry in self.chunks.iteritems(): | ||
| bucket = packs.setdefault(entry.pack_id, {"used": [], "unused": []}) | ||
|
Comment on lines
+217
to
+218
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never do whole-chunkindex transformations, building new datastructures that contain most of the information from the chunks index. The chunks index is a highly optimized datastructure and even then, it can be many gigabytes in size. If you are transforming that into another arrangement using unoptimized python datastructures, you will likely use too much RAM. |
||
| if entry.flags & ChunkIndex.F_USED: | ||
| bucket["used"].append(id) | ||
| else: | ||
| bucket["unused"].append(id) | ||
|
|
||
| unused = [id for bucket in packs.values() for id in bucket["unused"]] | ||
| packs_to_drop = [pid for pid, b in packs.items() if b["unused"] and not b["used"]] | ||
| packs_to_rewrite = [pid for pid, b in packs.items() if b["unused"] and b["used"]] | ||
|
Comment on lines
+225
to
+226
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically the right idea, just needs a more efficient implementation. |
||
|
|
||
| logger.info(f"Deleting {len(unused)} unused objects...") | ||
| if unused: | ||
| # Before mutating the store, invalidate all centrally cached chunk indexes. | ||
| # Otherwise, if we get interrupted partway through, the still-existing cache/chunks.* | ||
| # would claim that already-removed objects are still present. A later "borg create" would | ||
| # then trust that stale index, not re-upload the affected chunks and silently create an | ||
| # archive with dangling object references (see issue #9748). By removing the cached indexes | ||
| # first, an interruption is conservative: clients must rebuild the index from actual | ||
| # repository contents and will re-upload any removed data. save_chunk_index() writes a | ||
| # fresh, valid index afterwards. | ||
| delete_chunkindex_cache(self.repository) | ||
|
|
||
| pi = ProgressIndicatorPercent( | ||
| total=len(packs_to_drop) + len(packs_to_rewrite), | ||
| msg="Compacting packs %3.1f%%", | ||
| step=0.1, | ||
| msgid="compact.report_and_delete", | ||
| ) | ||
| progress = 0 | ||
| for pack_id in packs_to_drop: | ||
| pi.show(progress) | ||
| progress += 1 | ||
| # every object in this pack is unused -> the file goes as a whole. | ||
| self.repository.delete_pack(pack_id) | ||
| for id in packs[pack_id]["unused"]: | ||
| del self.chunks[id] | ||
| for pack_id in packs_to_rewrite: | ||
| pi.show(progress) | ||
| progress += 1 | ||
| # N>1 only: keep the referenced objects by rewriting them into a fresh pack. | ||
| self._copy_forward_pack(pack_id, packs[pack_id]["used"]) | ||
| for id in packs[pack_id]["unused"]: | ||
| del self.chunks[id] | ||
| pi.finish() | ||
|
|
||
| def _copy_forward_pack(self, pack_id, used_ids): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. usually this is named "compaction". |
||
| """Rewrite a mixed pack's still-used objects into a fresh pack, then drop the old pack. | ||
|
|
||
| Only reached at N>1 (a pack holding more than one object). Reads each survivor's stored | ||
| bytes via the current index location, writes them through a PackWriter bound to this | ||
| collector's index (so the new pack location lands in the index we persist), then drops | ||
| the old pack whole. Reading/writing goes through the local store, which is why this path | ||
| is exercised by local tests; routing it through repository RPC for remote N>1 is left to | ||
| the N>1 PR (it never runs at N=1, the only mode that exists today). | ||
| """ | ||
| store = self.repository.store | ||
| writer = PackWriter(store, max_count=self.repository.pack_max_count, chunks=self.chunks) | ||
| for id in used_ids: | ||
| entry = self.chunks[id] | ||
| key = "packs/" + bin_to_hex(entry.pack_id) | ||
| cdata = store.load(key, offset=entry.obj_offset, size=entry.obj_size) | ||
| writer.add(id, cdata) | ||
| writer.flush() | ||
|
Comment on lines
+274
to
+280
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use borgstore.Store.defrag method, that is way more efficient. |
||
| self.repository.delete_pack(pack_id) | ||
|
|
||
|
|
||
| class CompactMixIn: | ||
| @with_repository(exclusive=True, compatibility=(Manifest.Operation.DELETE,)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -530,6 +530,15 @@ def is_chunk_index_loaded(self): | |
| """ | ||
| return self._chunks is not None | ||
|
|
||
| @property | ||
| def pack_max_count(self): | ||
| """How many objects a freshly written pack may bundle (the N in "N objects per pack"). | ||
|
|
||
| N=1 today, so every pack holds one object; raising this later enables real packing. | ||
| compact reads this to size the pack writer it uses for copy-forward. | ||
| """ | ||
| return self._pack_writer.max_count | ||
|
|
||
| def flush(self): | ||
| """Flush any buffered pack writer chunks.""" | ||
| if self._pack_writer is not None: | ||
|
|
@@ -799,24 +808,30 @@ def put(self, id, data, wait=True): | |
| # PackWriter shares this repository's index, so add() triggers the lazy build itself. | ||
| return self._pack_writer.add(id, data) | ||
|
|
||
| def delete(self, id, wait=True): | ||
| """delete a repo object | ||
| def delete_pack(self, pack_id, wait=True): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. guess you could just use .store_delete at the few places that need this. |
||
| """delete a whole pack file from the store, addressed by its pack_id. | ||
|
|
||
| Note: when doing calls with wait=False this gets async and caller must | ||
| deal with async results / exceptions later. | ||
| This is the only space-reclaiming primitive: the store is append-only and | ||
| content-addressed, so the smallest unit it can remove is a whole pack file. | ||
| There is deliberately no per-object delete -- removing a single object out of a | ||
| pack that still holds referenced neighbours is done by copy-forward (compact | ||
| rewrites the survivors into a fresh pack, then drops the old pack as a whole). | ||
|
|
||
| At N=1 a pack holds exactly one object and pack_id == chunk_id, so dropping the | ||
| pack is the same as dropping that one object; callers that still think in object | ||
| ids pass the id as the pack_id and that stays correct until N>1 lands. | ||
| """ | ||
| self._lock_refresh() | ||
| pack_id = id # N=1: pack_id == chunk_id | ||
| key = "packs/" + bin_to_hex(pack_id) | ||
| try: | ||
| self.store.delete(key) | ||
| except StoreObjectNotFound: | ||
| raise self.ObjectNotFound(id, str(self._location)) from None | ||
| raise self.ObjectNotFound(pack_id, str(self._location)) from None | ||
|
|
||
| def async_response(self, wait=True): | ||
| """Get one async result (only applies to remote repositories). | ||
|
|
||
| async commands (== calls with wait=False, e.g. delete and put) have no results, | ||
| async commands (== calls with wait=False, e.g. put) have no results, | ||
| but may raise exceptions. These async exceptions must get collected later via | ||
| async_response() calls. Repeat the call until it returns None. | ||
| The previous calls might either return one (non-None) result or raise an exception. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal is to get rid of the N=1 limitation, so I suggest you don't add code that makes no sense and is even very harmful when N!=1.