|
| 1 | +import hashlib |
| 2 | +import json |
| 3 | +import pathlib |
| 4 | +import shutil |
| 5 | +import threading |
| 6 | +from typing import Callable |
| 7 | + |
| 8 | +from ..api import CKANAPI |
| 9 | +from ..dbmodel import APIInterrogator |
| 10 | +from ..download import DownloadJob |
| 11 | +from . import info, manifest |
| 12 | + |
| 13 | + |
| 14 | +def bag_circle(api: CKANAPI, |
| 15 | + circle_name: str, |
| 16 | + target_path: pathlib.Path, |
| 17 | + abort_event: threading.Event = None, |
| 18 | + callback: Callable = None): |
| 19 | + """Download an entire circle to a target directory in BagIt format |
| 20 | +
|
| 21 | + The format follows RFC 8493 "The BagIt File Packaging Format (V1.0)". |
| 22 | +
|
| 23 | + To validate the BagIt bags: |
| 24 | +
|
| 25 | + pip install bagit |
| 26 | + bagit.py --validate --quiet target_path/* |
| 27 | +
|
| 28 | + Parameters |
| 29 | + ---------- |
| 30 | + api |
| 31 | + CKANAPI for connecting to the DCOR instance |
| 32 | + circle_name |
| 33 | + Name of the circle to archive |
| 34 | + target_path |
| 35 | + Download location |
| 36 | + abort_event |
| 37 | + Specify a `threading.Event` to be able to abort archiving; |
| 38 | + when you wish to abort `.set()` the event. |
| 39 | + callback |
| 40 | + Method for progress tracking (returns a float between 0 and 1) |
| 41 | + """ |
| 42 | + # fetch total list of active datasets |
| 43 | + ai = APIInterrogator(api) |
| 44 | + dataset_dicts = ai.search_dataset_via_api(circles=[circle_name], |
| 45 | + limit=0, |
| 46 | + ret_db_extract=False) |
| 47 | + |
| 48 | + num_datasets = len(dataset_dicts) |
| 49 | + |
| 50 | + # sort datasets according to creation date |
| 51 | + dataset_dicts = sorted(dataset_dicts, key=lambda x: x["metadata_created"]) |
| 52 | + |
| 53 | + # compute sha256 hash of all dataset IDs |
| 54 | + hasher = hashlib.sha256() |
| 55 | + for ds_dict in dataset_dicts: |
| 56 | + hasher.update(ds_dict["id"].encode(encoding="utf-8")) |
| 57 | + sha256_hash = hasher.hexdigest() |
| 58 | + |
| 59 | + # Check whether there is already a list of dataset IDs in the directory, |
| 60 | + # and if yes, compute the MD5 hash and compare it. If the comparison |
| 61 | + # fails, then the user has to choose a different `target_path`, because |
| 62 | + # we cannot guarantee data integrity. |
| 63 | + circle_jsonlines_path = target_path / "circle.jsonlines" |
| 64 | + if circle_jsonlines_path.exists(): |
| 65 | + lines = circle_jsonlines_path.read_text().split("\n") |
| 66 | + hasher2 = hashlib.sha256() |
| 67 | + for line in lines: |
| 68 | + hasher2.update(json.loads(line)["id"].encode(encoding="utf-8")) |
| 69 | + sha256_hash2 = hasher2.hexdigest() |
| 70 | + if sha256_hash != sha256_hash2: |
| 71 | + raise ValueError( |
| 72 | + f"A previous attempt to archive circle {circle_name} was made " |
| 73 | + f"in directory {target_path}. However, the number of datasets " |
| 74 | + f"changed since then. Therefore, it is not possible to " |
| 75 | + f"archive this circle to that directory.") |
| 76 | + else: |
| 77 | + # save list of datasets as jsonlines |
| 78 | + with circle_jsonlines_path.open("w", encoding="utf-8") as f: |
| 79 | + for ds_dict in dataset_dicts: |
| 80 | + f.write(json.dumps(ds_dict) + "\n") |
| 81 | + |
| 82 | + # number of digits for enumeration |
| 83 | + max_digits = len(str(num_datasets)) |
| 84 | + |
| 85 | + # bag all dataset |
| 86 | + for ii, ds_dict in enumerate(dataset_dicts): |
| 87 | + if callback: |
| 88 | + callback(ii / num_datasets) |
| 89 | + |
| 90 | + dataset_index = ii+1 |
| 91 | + prefix = str(dataset_index).zfill(max_digits) |
| 92 | + bag_path = target_path / f"{prefix}_{ds_dict['name']}" |
| 93 | + |
| 94 | + if not manifest.is_bagged(bag_path): |
| 95 | + bag_dataset(api=api, |
| 96 | + ds_dict=ds_dict, |
| 97 | + dataset_index=dataset_index, |
| 98 | + num_datasets=num_datasets, |
| 99 | + bag_path=bag_path, |
| 100 | + abort_event=abort_event) |
| 101 | + if abort_event is not None and abort_event.is_set(): |
| 102 | + return |
| 103 | + if callback: |
| 104 | + callback(1) |
| 105 | + |
| 106 | + |
| 107 | +def bag_dataset(api: CKANAPI, |
| 108 | + ds_dict: dict, |
| 109 | + bag_path: pathlib.Path, |
| 110 | + abort_event: threading.Event = None, |
| 111 | + dataset_index: int = 1, |
| 112 | + num_datasets: int = 1, |
| 113 | + ): |
| 114 | + """Download a dataset to a target directory in BagIt format |
| 115 | +
|
| 116 | + Parameters |
| 117 | + ---------- |
| 118 | + api |
| 119 | + CKANAPI for connecting to the DCOR instance |
| 120 | + ds_dict |
| 121 | + CKAN dataset dictionary |
| 122 | + bag_path |
| 123 | + Path of the bag |
| 124 | + abort_event |
| 125 | + Event for aborting (see :func:`bag_circle`) |
| 126 | + dataset_index |
| 127 | + Index of this dataset in the circle |
| 128 | + num_datasets |
| 129 | + Total number of datasets in the circle |
| 130 | + """ |
| 131 | + # clear/create download directory |
| 132 | + if bag_path.exists(): |
| 133 | + shutil.rmtree(bag_path) |
| 134 | + bag_path.mkdir(parents=True, exist_ok=True) |
| 135 | + data_path = bag_path / "data" |
| 136 | + data_path.mkdir(parents=True, exist_ok=True) |
| 137 | + |
| 138 | + # dataset dictionary |
| 139 | + meta = json.dumps(ds_dict, indent=2, sort_keys=True) |
| 140 | + (data_path / "dataset.json").write_text(meta) |
| 141 | + |
| 142 | + # download all resources |
| 143 | + for res_dict in ds_dict["resources"]: |
| 144 | + if abort_event is not None and abort_event.is_set(): |
| 145 | + return |
| 146 | + |
| 147 | + # resource |
| 148 | + download_resource(api=api, |
| 149 | + bag_path=bag_path, |
| 150 | + res_dict=res_dict, |
| 151 | + condensed=False) |
| 152 | + |
| 153 | + # condensed resource |
| 154 | + if res_dict["name"].endswith(".rtdc"): |
| 155 | + download_resource(api=api, |
| 156 | + bag_path=bag_path, |
| 157 | + res_dict=res_dict, |
| 158 | + condensed=True) |
| 159 | + |
| 160 | + # create BagIt files |
| 161 | + info.write_bag_info(bag_path=bag_path, |
| 162 | + bag_index=dataset_index, |
| 163 | + num_bags=num_datasets, |
| 164 | + ds_dict=ds_dict) |
| 165 | + |
| 166 | + # create BagIt manifest files |
| 167 | + manifest.write_manifest(bag_path=bag_path, |
| 168 | + ds_dict=ds_dict) |
| 169 | + |
| 170 | + |
| 171 | +def download_resource(api: CKANAPI, |
| 172 | + bag_path: pathlib.Path, |
| 173 | + res_dict: dict, |
| 174 | + condensed: bool): |
| 175 | + """Download and verify a resource from DCOR |
| 176 | +
|
| 177 | + Parameters |
| 178 | + api |
| 179 | + CKANAPI for connecting to the DCOR instance |
| 180 | + bag_path |
| 181 | + Path of the bag |
| 182 | + res_dict |
| 183 | + CKAN resource dictionary |
| 184 | + condensed |
| 185 | + Whether to download the condensed resource (or the original resource) |
| 186 | + """ |
| 187 | + data_path = bag_path / "data" |
| 188 | + data_path.mkdir(parents=True, exist_ok=True) |
| 189 | + dl_path = data_path / res_dict["name"] |
| 190 | + if condensed: |
| 191 | + dl_path = dl_path.with_name(dl_path.stem + "_condensed.rtdc") |
| 192 | + dj = DownloadJob(api=api, |
| 193 | + resource_id=res_dict["id"], |
| 194 | + download_path=dl_path, |
| 195 | + condensed=condensed |
| 196 | + ) |
| 197 | + dj.task_download_resource() |
| 198 | + dj.task_verify_resource() |
0 commit comments