diff --git a/site/cds_rdm/inspire_harvester/jobs.py b/site/cds_rdm/inspire_harvester/jobs.py index 1a11eccf..a55fcf3e 100644 --- a/site/cds_rdm/inspire_harvester/jobs.py +++ b/site/cds_rdm/inspire_harvester/jobs.py @@ -133,7 +133,7 @@ def build_task_arguments( }, } ], - "batch_size": 10, + "batch_size": 100, "write_many": False, "transformers": [{"type": "inspire-json-transformer"}], } diff --git a/site/cds_rdm/inspire_harvester/reader.py b/site/cds_rdm/inspire_harvester/reader.py index 417c7441..144be2d6 100644 --- a/site/cds_rdm/inspire_harvester/reader.py +++ b/site/cds_rdm/inspire_harvester/reader.py @@ -40,6 +40,7 @@ def _iter(self, url, *args, **kwargs): """Yields HTTP response.""" # header set to include additional data (external file URLs and more detailed metadata headers = {"Accept": "application/vnd+inspire.record.expanded+json"} + initial_url = url while url: # Continue until there is no "next" link current_app.logger.info(f"Querying URL: {url}.") @@ -47,12 +48,19 @@ def _iter(self, url, *args, **kwargs): data = response.json() if response.status_code == 200: current_app.logger.debug("Request response is successful (200).") - if data["hits"]["total"] == 0: + total = data["hits"]["total"] + hits = data["hits"]["hits"] + + if total == 0: current_app.logger.warning( f"No results found when querying INSPIRE. See URL: {url}." ) + elif url == initial_url: + current_app.logger.info( + f"Records found: {total}." + ) - for inspire_record in data["hits"]["hits"]: + for inspire_record in hits: current_app.logger.debug( f"Sending INSPIRE record #{inspire_record['id']} to transformer." ) @@ -71,33 +79,39 @@ def read(self, item=None, *args, **kwargs): # Fetch all document types marked for CDS via the OAI set oai_set = "ForCDS" + document_type = "thesis" + + q = f"_oai.sets:{oai_set}" + if document_type: + q += f" AND document_type:{document_type}" + if self._inspire_id: # get by INSPIRE id current_app.logger.info( f"Fetching records by ID {self._inspire_id} from INSPIRE." ) - query_params = {"q": f"_oai.sets:{oai_set} AND id:{self._inspire_id}"} + query_params = {"q": f"{q} AND id:{self._inspire_id}"} elif self._on_date: # get by the exact date current_app.logger.info( f"Fetching records by exact date {self._on_date} from INSPIRE." ) - query_params = {"q": f"_oai.sets:{oai_set} AND du:{self._on_date}"} + query_params = {"q": f"{q} AND du:{self._on_date}"} elif self._until: # get by the date range current_app.logger.info( f"Fetching records by the date range {self._since} - {self._until} from INSPIRE." ) query_params = { - "q": f"_oai.sets:{oai_set} AND du >= {self._since} AND du <= {self._until}" + "q": f"{q} AND du >= {self._since} AND du <= {self._until}" } else: # get since specified date until now current_app.logger.info( f"Fetching records since {self._since} from INSPIRE." ) - query_params = {"q": f"_oai.sets:{oai_set} AND du >= {self._since}"} + query_params = {"q": f"{q} AND du >= {self._since}"} base_url = "https://inspirehep.net/api/literature" encoded_query = urlencode(query_params) diff --git a/site/cds_rdm/inspire_harvester/transform/mappers/basic_metadata.py b/site/cds_rdm/inspire_harvester/transform/mappers/basic_metadata.py index a95a3ae3..b8d3ff2f 100644 --- a/site/cds_rdm/inspire_harvester/transform/mappers/basic_metadata.py +++ b/site/cds_rdm/inspire_harvester/transform/mappers/basic_metadata.py @@ -23,7 +23,7 @@ class ResourceTypeMapper(MapperBase): id = "metadata.resource_type.id" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map resource type value.""" return ctx.resource_type.value @@ -34,8 +34,9 @@ class TitleMapper(MapperBase): id = "metadata.title" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map title value.""" + src_metadata = src_record.get("metadata", {}) inspire_titles = src_metadata.get("titles", []) return inspire_titles[0].get("title") @@ -46,28 +47,41 @@ class AdditionalTitlesMapper(MapperBase): id = "metadata.additional_titles" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map additional titles.""" + src_metadata = src_record.get("metadata", {}) inspire_titles = src_metadata.get("titles", []) rdm_additional_titles = [] + seen_titles = [] + seen_subtitles = [] + if len(inspire_titles) > 1: + seen_titles.append(inspire_titles[0]) for i, inspire_title in enumerate(inspire_titles[1:]): try: - alt_title = { - "title": inspire_title.get("title"), - "type": { - "id": "alternative-title", - }, - } - rdm_additional_titles.append(alt_title) - if inspire_title.get("subtitle"): + _title = inspire_title.get("title") + if _title and _title not in seen_titles: + seen_titles.append(_title) + alt_title = { + "title": _title, + "type": { + "id": "alternative-title", + }, + } + + rdm_additional_titles.append(alt_title) + + _subtitle = inspire_title.get("title") + if _subtitle and _subtitle not in seen_subtitles: + seen_subtitles.append(_subtitle) subtitle = { - "title": inspire_title.get("subtitle"), + "title": _subtitle, "type": { "id": "subtitle", }, } rdm_additional_titles.append(subtitle) + except Exception as e: ctx.errors.append( f"Title {inspire_title} transform failed. INSPIRE#{ctx.inspire_id}. Error: {e}." @@ -88,8 +102,9 @@ def validate(self, src, ctx): if len(imprints) > 1: ctx.errors.append(f"More than 1 imprint found. INSPIRE#{ctx.inspire_id}.") - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map publisher value.""" + src_metadata = src_record.get("metadata", {}) imprints = src_metadata.get("imprints", []) imprint = None publisher = None @@ -115,15 +130,16 @@ class PublicationDateMapper(MapperBase): id = "metadata.publication_date" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Transform publication date.""" + src_metadata = src_record.get("metadata", {}) imprints = src_metadata.get("imprints", []) imprint_date = imprints[0].get("date") if imprints else None publication_info = src_metadata.get("publication_info", []) publication_date = publication_info[0].get("year") if publication_info else None - creation_date = src_metadata.get("created") + creation_date = src_record.get("created") date = publication_date or imprint_date or creation_date if date and isinstance(date, int): @@ -145,8 +161,9 @@ class CopyrightMapper(MapperBase): id = "metadata.copyright" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Transform copyrights.""" + src_metadata = src_record.get("metadata", {}) # format: "© {holder} {year}, {statement} {url}" copyrights = src_metadata.get("copyright", []) result_list = [] @@ -179,8 +196,9 @@ class DescriptionMapper(MapperBase): id = "metadata.description" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Mapping of abstracts.""" + src_metadata = src_record.get("metadata", {}) abstracts = src_metadata.get("abstracts", []) if abstracts: return abstracts[0]["value"] @@ -192,16 +210,21 @@ class AdditionalDescriptionsMapper(MapperBase): id = "metadata.additional_descriptions" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Mapping of additional descriptions.""" + src_metadata = src_record.get("metadata", {}) abstracts = src_metadata.get("abstracts", []) additional_descriptions = [] if len(abstracts) > 1: + seen_abstracts = [abstracts[0]["value"]] for x in abstracts[1:]: - additional_descriptions.append( - {"description": x["value"], "type": {"id": "abstract"}} - ) + new_abstract = x["value"] + if new_abstract not in seen_abstracts: + seen_abstracts.append(new_abstract) + additional_descriptions.append( + {"description": new_abstract, "type": {"id": "abstract"}} + ) # TODO move it to book resource? book_series = src_metadata.get("book_series", []) @@ -226,8 +249,9 @@ class SubjectsMapper(MapperBase): id = "metadata.subjects" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Mapping of keywords to subjects.""" + src_metadata = src_record.get("metadata", {}) keywords = src_metadata.get("keywords", []) mapped_subjects = [] for keyword in keywords: @@ -248,8 +272,9 @@ class LanguagesMapper(MapperBase): id = "metadata.languages" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Mapping and converting of languages.""" + src_metadata = src_record.get("metadata", {}) languages = src_metadata.get("languages", []) mapped_langs = [] for lang in languages: diff --git a/site/cds_rdm/inspire_harvester/transform/mappers/contributors.py b/site/cds_rdm/inspire_harvester/transform/mappers/contributors.py index 487d0219..bb8e1f75 100644 --- a/site/cds_rdm/inspire_harvester/transform/mappers/contributors.py +++ b/site/cds_rdm/inspire_harvester/transform/mappers/contributors.py @@ -52,6 +52,8 @@ def _transform_author_affiliations(self, author): def _transform_creatibutors(self, authors, ctx): """Transform creatibutors.""" creatibutors = [] + if not authors: + return creatibutors try: for author in authors: first_name = author.get("first_name") @@ -98,7 +100,7 @@ def _transform_creatibutors(self, authors, ctx): ) return None - def map_value(self, src, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map creatibutors value (to be implemented by subclasses).""" pass @@ -112,8 +114,9 @@ class AuthorsMapper(CreatibutorsMapper): id = "metadata.creators" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map authors to RDM creators.""" + src_metadata = src_record.get("metadata", {}) authors = src_metadata.get("authors", []) creators = [] for author in authors: @@ -136,7 +139,11 @@ def map_value(self, src_metadata, ctx, logger): } mapped_corporate_authors.append(contributor) - return self._transform_creatibutors(creators, ctx) + mapped_corporate_authors + contributors = self._transform_creatibutors(creators, ctx) + if not contributors: + contributors = [] + + return contributors + mapped_corporate_authors @dataclass(frozen=True) @@ -145,8 +152,9 @@ class ContributorsMapper(CreatibutorsMapper): id = "metadata.contributors" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map authors to RDM contributors.""" + src_metadata = src_record.get("metadata", {}) authors = src_metadata.get("authors", []) contributors = [] diff --git a/site/cds_rdm/inspire_harvester/transform/mappers/custom_fields.py b/site/cds_rdm/inspire_harvester/transform/mappers/custom_fields.py index 0fa0b5b1..6f962ed3 100644 --- a/site/cds_rdm/inspire_harvester/transform/mappers/custom_fields.py +++ b/site/cds_rdm/inspire_harvester/transform/mappers/custom_fields.py @@ -12,7 +12,7 @@ from idutils.normalizers import normalize_isbn from cds_rdm.inspire_harvester.transform.mappers.mapper import MapperBase -from cds_rdm.inspire_harvester.transform.utils import search_vocabulary +from cds_rdm.inspire_harvester.utils import search_vocabulary @dataclass(frozen=True) @@ -21,8 +21,9 @@ class ImprintMapper(MapperBase): id = "custom_fields.imprint:imprint" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Apply thesis field mapping.""" + src_metadata = src_record.get("metadata", {}) imprints = src_metadata.get("imprints", []) imprint = imprints[0] if imprints else None isbns = src_metadata.get("isbns", []) @@ -58,8 +59,9 @@ class CERNFieldsMapper(MapperBase): id = "custom_fields" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Apply mapping.""" + src_metadata = src_record.get("metadata", {}) acc_exp_list = src_metadata.get("accelerator_experiments", []) _accelerators = [] _experiments = [] @@ -72,7 +74,10 @@ def map_value(self, src_metadata, ctx, logger): logger.debug( f"Searching vocabulary 'accelerator' for term: '{accelerator}'" ) - accelerator = f"{institution} {accelerator}" + if institution: + accelerator = f"{institution} {accelerator}" + else: + accelerator = f"{accelerator}" result = search_vocabulary(accelerator, "accelerators", ctx, logger) if result.total == 1: logger.info(f"Found accelerator '{accelerator}'") diff --git a/site/cds_rdm/inspire_harvester/transform/mappers/files.py b/site/cds_rdm/inspire_harvester/transform/mappers/files.py index e8ee7b4c..1bd14c4a 100644 --- a/site/cds_rdm/inspire_harvester/transform/mappers/files.py +++ b/site/cds_rdm/inspire_harvester/transform/mappers/files.py @@ -18,8 +18,9 @@ class FilesMapper(MapperBase): id = "files" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map files from INSPIRE documents to RDM files.""" + src_metadata = src_record.get("metadata", {}) logger.debug(f"Starting _transform_files") rdm_files_entries = {} @@ -29,21 +30,33 @@ def map_value(self, src_metadata, ctx, logger): for file in inspire_files: logger.debug(f"Processing file: {file.get('filename', 'unknown')}") filename = file["filename"] + source = file.get("source") + url = file["url"] if "pdf" not in filename: # INSPIRE only exposes pdfs for us filename = f"{filename}.pdf" + + # link to files directly from arxiv + if "key" not in file and source != "arxiv": + ctx.errors.append( + f"File: {filename}. Does not have a checksum. INSPIRE record id: {ctx.inspire_id}" + ) + return {} + + src_checksum = f"md5:{file['key']}" if "key" in file else None try: file_details = { - "checksum": f"md5:{file['key']}", + "checksum": src_checksum, "key": filename, "access": {"hidden": False}, - "inspire_url": file["url"], # put this somewhere else + "source_url": file["url"], + } rdm_files_entries[filename] = file_details logger.info(f"File mapped: {file_details}. File name: {filename}.") - file_metadata = {} + file_metadata = {"source": source, "source_url": file["url"]} file_description = file.get("description") file_original_url = file.get("original_url") if file_description: @@ -56,7 +69,10 @@ def map_value(self, src_metadata, ctx, logger): except Exception as e: ctx.errors.append( - f"Error occurred while mapping files. File key: {file['key']}. INSPIRE record id: {ctx.inspire_id}. Error: {e}." + f"Error occurred while mapping files. " + f"File key: {src_checksum}. " + f"File_url: {url}. " + f"INSPIRE record id: {ctx.inspire_id}. Error: {e}." ) logger.debug(f"Files transformation completed with {len(ctx.errors)} errors") diff --git a/site/cds_rdm/inspire_harvester/transform/mappers/identifiers.py b/site/cds_rdm/inspire_harvester/transform/mappers/identifiers.py index b16a66cf..3177c2da 100644 --- a/site/cds_rdm/inspire_harvester/transform/mappers/identifiers.py +++ b/site/cds_rdm/inspire_harvester/transform/mappers/identifiers.py @@ -23,8 +23,9 @@ class DOIMapper(MapperBase): id = "pids" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Mapping of record dois.""" + src_metadata = src_record.get("metadata", {}) DATACITE_PREFIX = current_app.config["DATACITE_PREFIX"] dois = src_metadata.get("dois", []) @@ -67,8 +68,9 @@ class IdentifiersMapper(MapperBase): id = "metadata.identifiers" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map identifiers from external system identifiers.""" + src_metadata = src_record.get("metadata", {}) identifiers = [] RDM_RECORDS_IDENTIFIERS_SCHEMES = current_app.config[ "RDM_RECORDS_IDENTIFIERS_SCHEMES" @@ -83,7 +85,7 @@ def map_value(self, src_metadata, ctx, logger): schema = external_sys_id.get("schema").lower() value = external_sys_id.get("value") if schema == "cdsrdm": - schema = "cds" + continue if schema in RDM_RECORDS_IDENTIFIERS_SCHEMES.keys(): identifiers.append({"identifier": value, "scheme": schema}) elif schema in RDM_RECORDS_RELATED_IDENTIFIERS_SCHEMES.keys(): @@ -102,8 +104,9 @@ class RelatedIdentifiersMapper(MapperBase): id = "metadata.related_identifiers" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Mapping of alternate identifiers.""" + src_metadata = src_record.get("metadata", {}) identifiers = [] RDM_RECORDS_IDENTIFIERS_SCHEMES = current_app.config[ "RDM_RECORDS_IDENTIFIERS_SCHEMES" @@ -122,7 +125,9 @@ def map_value(self, src_metadata, ctx, logger): schema = persistent_id.get("schema").lower() schema = CDS_INSPIRE_IDS_SCHEMES_MAPPING.get(schema, schema) value = persistent_id.get("value") - if schema in RDM_RECORDS_RELATED_IDENTIFIERS_SCHEMES.keys(): + if schema in RDM_RECORDS_IDENTIFIERS_SCHEMES.keys(): + continue + elif schema in RDM_RECORDS_RELATED_IDENTIFIERS_SCHEMES.keys(): new_id = { "identifier": value, "scheme": schema, @@ -132,8 +137,6 @@ def map_value(self, src_metadata, ctx, logger): if schema == "doi": new_id["relation_type"] = {"id": "isversionof"} identifiers.append(new_id) - elif schema in RDM_RECORDS_IDENTIFIERS_SCHEMES.keys(): - continue else: ctx.errors.append( f"Unexpected schema found in persistent_identifiers. Schema: {schema}, value: {value}. INSPIRE#: {ctx.inspire_id}." @@ -144,9 +147,13 @@ def map_value(self, src_metadata, ctx, logger): for external_sys_id in external_sys_ids: schema = external_sys_id.get("schema").lower() value = external_sys_id.get("value") - if schema == "cdsrdm": + + # these schemes are already in identifiers + if schema in ["cdsrdm", "cds"]: continue - if schema in RDM_RECORDS_RELATED_IDENTIFIERS_SCHEMES.keys(): + if schema in RDM_RECORDS_IDENTIFIERS_SCHEMES.keys(): + continue + elif schema in RDM_RECORDS_RELATED_IDENTIFIERS_SCHEMES.keys(): new_id = { "identifier": value, "scheme": schema, @@ -156,8 +163,7 @@ def map_value(self, src_metadata, ctx, logger): if schema == "doi": new_id["relation_type"] = {"id": "isversionof"} identifiers.append(new_id) - elif schema in RDM_RECORDS_IDENTIFIERS_SCHEMES.keys(): - continue + else: ctx.errors.append( f"Unexpected schema found in external_system_identifiers. Schema: {schema}, value: {value}. INSPIRE record id: {ctx.inspire_id}." diff --git a/site/cds_rdm/inspire_harvester/transform/mappers/mapper.py b/site/cds_rdm/inspire_harvester/transform/mappers/mapper.py index f79d44e9..18a90be2 100644 --- a/site/cds_rdm/inspire_harvester/transform/mappers/mapper.py +++ b/site/cds_rdm/inspire_harvester/transform/mappers/mapper.py @@ -9,7 +9,7 @@ from abc import ABC, abstractmethod -from cds_rdm.inspire_harvester.transform.utils import set_path +from cds_rdm.inspire_harvester.utils import build_path class MapperBase(ABC): @@ -18,9 +18,9 @@ class MapperBase(ABC): id: str returns_patch: bool = False - def apply(self, src_metadata, ctx, logger): + def apply(self, src_record, ctx, logger): """Apply the mapper to source metadata and return the result.""" - result = self.map_value(src_metadata, ctx, logger) + result = self.map_value(src_record, ctx, logger) if not result: return if self.returns_patch: @@ -32,9 +32,9 @@ def apply(self, src_metadata, ctx, logger): return result # Normal mode: wrap result under self.id - return set_path(self.id, result) + return build_path(self.id, result) @abstractmethod - def map_value(self, src, ctx, logger): + def map_value(self, src_record, ctx, logger): """Return a value (not a patch). Return None for no-op.""" raise NotImplementedError diff --git a/site/cds_rdm/inspire_harvester/transform/mappers/thesis.py b/site/cds_rdm/inspire_harvester/transform/mappers/thesis.py index 8149683d..7fb560cc 100644 --- a/site/cds_rdm/inspire_harvester/transform/mappers/thesis.py +++ b/site/cds_rdm/inspire_harvester/transform/mappers/thesis.py @@ -22,8 +22,9 @@ class ThesisPublicationDateMapper(MapperBase): id = "metadata.publication_date" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Mapping of INSPIRE thesis_info.date to metadata.publication_date.""" + src_metadata = src_record.get("metadata", {}) imprints = src_metadata.get("imprints", []) imprint_date = imprints[0].get("date") if imprints else None thesis_info = src_metadata.get("thesis_info", {}) @@ -54,8 +55,9 @@ class ThesisDefenceDateMapper(MapperBase): id = "custom_fields.thesis:thesis.defense_date" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Apply thesis field mapping.""" + src_metadata = src_record.get("metadata", {}) thesis_info = src_metadata.get("thesis_info", {}) defense_date = thesis_info.get("defense_date") return defense_date @@ -67,8 +69,9 @@ class ThesisUniversityMappers(MapperBase): id = "custom_fields.thesis:thesis.university" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Apply thesis field mapping.""" + src_metadata = src_record.get("metadata", {}) thesis_info = src_metadata.get("thesis_info", {}) institutions = thesis_info.get("institutions") if institutions: @@ -83,8 +86,9 @@ class ThesisTypeMappers(MapperBase): id = "custom_fields.thesis:thesis.type" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Apply thesis field mapping.""" + src_metadata = src_record.get("metadata", {}) thesis_info = src_metadata.get("thesis_info", {}) type = thesis_info.get("degree_type") return type @@ -97,12 +101,18 @@ class ThesisContributorsMapper(ContributorsMapper): id = "metadata.contributors" - def map_value(self, src_metadata, ctx, logger): + def map_value(self, src_record, ctx, logger): """Map thesis contributors and supervisors.""" - contributors = super().map_value(src_metadata, ctx, logger) + src_metadata = src_record.get("metadata", {}) + contributors = super().map_value(src_record, ctx, logger) _supervisors = src_metadata.get("supervisors") supervisors = self._transform_creatibutors(_supervisors, ctx) + if not contributors: + contributors = [] + if not supervisors: + supervisors = [] + return contributors + supervisors diff --git a/site/cds_rdm/inspire_harvester/transform/resource_types.py b/site/cds_rdm/inspire_harvester/transform/resource_types.py index d798bdef..e1f7b764 100644 --- a/site/cds_rdm/inspire_harvester/transform/resource_types.py +++ b/site/cds_rdm/inspire_harvester/transform/resource_types.py @@ -15,7 +15,7 @@ class ResourceType(str, Enum): ARTICLE = "publication-article" BOOK = "publication-book" - BOOK_CHAPTER = "publication-bookchapter" + BOOK_CHAPTER = "publication-section" CONFERENCE_PAPER = "publication-conferencepaper" NOTE = "publication-technicalnote" OTHER = "other" diff --git a/site/cds_rdm/inspire_harvester/transform/transform_entry.py b/site/cds_rdm/inspire_harvester/transform/transform_entry.py index 02eb51ca..e9a55297 100644 --- a/site/cds_rdm/inspire_harvester/transform/transform_entry.py +++ b/site/cds_rdm/inspire_harvester/transform/transform_entry.py @@ -17,7 +17,7 @@ from cds_rdm.inspire_harvester.transform.resource_types import ( ResourceTypeDetector, ) -from cds_rdm.inspire_harvester.transform.utils import assert_unique_ids, deep_merge_all +from cds_rdm.inspire_harvester.utils import assert_unique_ids, deep_merge_all class RDMEntry: @@ -39,9 +39,21 @@ def _record(self): self.errors.extend(self.transformer.ctx.errors) return record - def _files(self): + def _files(self, record): """Transformation of files.""" - pass + inspire_id = self.inspire_record.get("id") + files = record.get("files") + + if not files: + current_app.logger.error( + f"[inspire_id={inspire_id}] No files found in INSPIRE record - aborting transformation" + ) + self.errors.append( + f"INSPIRE record #{self.inspire_metadata['control_number']} has no files. Metadata-only records are not supported. Aborting record transformation." + ) + return None, self.errors + + return files def _parent(self): """Record parent minimal values.""" @@ -75,7 +87,7 @@ def build(self): ) if not inspire_files: - current_app.logger.warning( + current_app.logger.error( f"[inspire_id={inspire_id}] No files found in INSPIRE record - aborting transformation" ) self.errors.append( @@ -95,7 +107,7 @@ def build(self): "id": self._id(), "metadata": record["metadata"], "custom_fields": record["custom_fields"], - "files": record["files"], + "files": self._files(record), "parent": self._parent(), "access": self._access(), } @@ -113,7 +125,8 @@ class Inspire2RDM: """INSPIRE to CDS-RDM record mapping.""" def __init__( - self, inspire_record, detector_cls=ResourceTypeDetector, policy=mapper_policy + self, inspire_record, detector_cls=ResourceTypeDetector, + policy=mapper_policy ): """Initializes the Inspire2RDM class.""" self.policy = policy @@ -136,8 +149,9 @@ def __init__( self.resource_type = rt - # pre-clean data + # pre-clean data and update the record with cleaned metadata self.inspire_metadata = self._clean_data(self.inspire_original_metadata) + self.inspire_record["metadata"] = self.inspire_metadata def _clean_data(self, src_metadata): """Cleans the input data.""" @@ -189,7 +203,8 @@ def transform_record(self): mappers = self.policy.build_for(self.resource_type) assert_unique_ids(mappers) patches = [ - m.apply(self.inspire_metadata, self.ctx, self.logger) for m in mappers + m.apply(self.inspire_record, self.ctx, self.logger) + for m in mappers ] out_record = deep_merge_all(patches) diff --git a/site/cds_rdm/inspire_harvester/update/__init__.py b/site/cds_rdm/inspire_harvester/update/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/site/cds_rdm/inspire_harvester/update/config.py b/site/cds_rdm/inspire_harvester/update/config.py new file mode 100644 index 00000000..dfca187d --- /dev/null +++ b/site/cds_rdm/inspire_harvester/update/config.py @@ -0,0 +1,26 @@ +from cds_rdm.inspire_harvester.update.fields.base import PreferCurrentMergeDictUpdate, \ + OverwriteFieldUpdate, ListOfDictAppendUniqueUpdate +from cds_rdm.inspire_harvester.update.fields.creatibutors import CreatibutorsFieldUpdate +from cds_rdm.inspire_harvester.update.fields.custom_fields import ThesisFieldUpdate +from cds_rdm.inspire_harvester.update.fields.identifiers import \ + RelatedIdentifiersUpdate, IdentifiersFieldUpdate +from cds_rdm.inspire_harvester.update.fields.metadata import PublicationDateUpdate + +UPDATE_STRATEGY_CONFIG = { + # fields not included in the strategy raise error on update attempt + "pids": PreferCurrentMergeDictUpdate(keep_incoming_keys=[]), + # "files": FilesUpdate(), + "metadata.creators": CreatibutorsFieldUpdate(strict=True), + "metadata.contributors": CreatibutorsFieldUpdate(strict=False), + "metadata.identifiers": IdentifiersFieldUpdate(), + "metadata.related_identifiers": RelatedIdentifiersUpdate(), + "metadata.publication_date": PublicationDateUpdate(), + "metadata.subjects": ListOfDictAppendUniqueUpdate(key_field="subject"), + "metadata.languages": ListOfDictAppendUniqueUpdate(key_field="id"), + "metadata.description": OverwriteFieldUpdate(), + "metadata.title": OverwriteFieldUpdate(), + "custom_fields.thesis:thesis": ThesisFieldUpdate(), + "custom_fields.cern:accelerators": ListOfDictAppendUniqueUpdate(key_field="id"), + "custom_fields.cern:experiments": ListOfDictAppendUniqueUpdate(key_field="id"), + # "custom_fields.cern:beams": IgnoreFieldUpdate(), +} \ No newline at end of file diff --git a/site/cds_rdm/inspire_harvester/update/engine.py b/site/cds_rdm/inspire_harvester/update/engine.py new file mode 100644 index 00000000..1779ce41 --- /dev/null +++ b/site/cds_rdm/inspire_harvester/update/engine.py @@ -0,0 +1,48 @@ +from cds_rdm.inspire_harvester.update.field import FieldUpdateBase +from dataclasses import dataclass, field +from typing import Any, Dict, List +import copy + +Json = Dict[str, Any] + +@dataclass +class UpdateConflict: + path: str + kind: str + message: str + current: Any = None + incoming: Any = None + details: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class UpdateResult: + updated: Json + conflicts: List[UpdateConflict] = field(default_factory=list) + audit: List[str] = field(default_factory=list) + + +@dataclass +class UpdateContext: + source: str | None = None + +@dataclass +class UpdateEngine: + strategies: Dict[str, FieldUpdateBase] + fail_on_conflict: bool = False + + def update(self, current, incoming, ctx): + updated = copy.deepcopy(current) + conflicts = [] + audit = [] + + for path, strategy in self.strategies.items(): + res = strategy.apply(updated, incoming, path, ctx) + updated = res.updated + conflicts.extend(res.conflicts) + audit.extend(res.audit) + + if self.fail_on_conflict and conflicts: + raise RuntimeError(conflicts) + + return UpdateResult(updated=updated, conflicts=conflicts, audit=audit) \ No newline at end of file diff --git a/site/cds_rdm/inspire_harvester/update/field.py b/site/cds_rdm/inspire_harvester/update/field.py new file mode 100644 index 00000000..9daa437b --- /dev/null +++ b/site/cds_rdm/inspire_harvester/update/field.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2026 CERN. +# +# CDS-RDM is free software; you can redistribute it and/or modify it under +# the terms of the MIT License; see LICENSE file for more details. + +"""INSPIRE to CDS harvester context module.""" + +from abc import ABC, abstractmethod + + +class FieldUpdateBase(ABC): + """Base class for metadata mappers.""" + + + def apply(self, current_record, incoming_update, path, ctx): + """Apply the mapper to source metadata and return the result.""" + return self.update(current_record, incoming_update, path, ctx) + + @abstractmethod + def update(self, current_record, incoming_update, path, ctx): + """Return a value (not a patch). Return None for no-op.""" + raise NotImplementedError diff --git a/site/cds_rdm/inspire_harvester/update/fields/__init__.py b/site/cds_rdm/inspire_harvester/update/fields/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/site/cds_rdm/inspire_harvester/update/fields/base.py b/site/cds_rdm/inspire_harvester/update/fields/base.py new file mode 100644 index 00000000..106b37a8 --- /dev/null +++ b/site/cds_rdm/inspire_harvester/update/fields/base.py @@ -0,0 +1,142 @@ +import copy + +from cds_rdm.inspire_harvester.update.engine import UpdateResult, UpdateConflict +from cds_rdm.inspire_harvester.update.field import FieldUpdateBase +from cds_rdm.inspire_harvester.utils import get_path, set_path + + +class OverwriteFieldUpdate(FieldUpdateBase): + + def update(self, current, incoming, path, ctx): + inc_v = get_path(incoming, path) + if inc_v is None: + return UpdateResult(updated=current) + + updated = copy.deepcopy(current) + set_path(updated, path, copy.deepcopy(inc_v)) + return UpdateResult(updated=updated, audit=[f"{path}: overwritten"]) + + +class PreferCurrentMergeDictUpdate(FieldUpdateBase): + + def __init__(self, keep_incoming_keys): + self.keep_incoming_keys = keep_incoming_keys + + def update(self, current, incoming, path, ctx): + cur_v = get_path(current, path) + inc_v = get_path(incoming, path) + + if inc_v is None: + return UpdateResult(updated=current) + + if cur_v is None: + updated = copy.deepcopy(current) + set_path(updated, path, copy.deepcopy(inc_v)) + return UpdateResult(updated=updated) + + if not isinstance(cur_v, dict) or not isinstance(inc_v, dict): + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="type_mismatch", + message="Expected dicts to merge", + current=cur_v, + incoming=inc_v, + )], + ) + + merged = copy.deepcopy(inc_v) + for k, v in cur_v.items(): + if k in self.keep_incoming_keys: + continue + if k not in merged or merged[k] in (None, "", [], {}): + merged[k] = copy.deepcopy(v) + + updated = copy.deepcopy(current) + set_path(updated, path, merged) + return UpdateResult(updated=updated, audit=[f"{path}: merged dict"]) + + +class ListOfDictAppendUniqueUpdate(FieldUpdateBase): + """ + Append-only merge for a list-of-dicts field. + + - Identifies items by a key extracted from each dict (e.g. item["subject"] or item["id"]). + - Appends incoming items whose key is not already present in current. + - Never removes anything. + - Optionally "enrich" an existing item (same key) by filling missing fields. + + Example: + current = [{"subject":"A"}, {"subject":"B"}] + incoming = [{"subject":"C"}] + -> [{"subject":"A"}, {"subject":"B"}, {"subject":"C"}] + """ + + def __init__( + self, + key_field: str, + *, + enrich_existing: bool = False, + ): + self.key_field = key_field + self.enrich_existing = enrich_existing + + + def _deep_fill_missing(self, base, inc): + out = copy.deepcopy(base) + for k, v in (inc or {}).items(): + if k not in out or out[k] in (None, "", [], {}): + out[k] = copy.deepcopy(v) + elif isinstance(out[k], dict) and isinstance(v, dict): + out[k] = self._deep_fill_missing(out[k], v) + return out + + def update(self, current, incoming, path, ctx): + cur_list = get_path(current, path) or [] + inc_list = get_path(incoming, path) + + if inc_list is None: + return UpdateResult(updated=current) + + if not isinstance(cur_list, list) or not isinstance(inc_list, list): + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="type_mismatch", + message="Expected lists at path", + current=cur_list, + incoming=inc_list, + )], + ) + + updated_list = copy.deepcopy(cur_list) + audit = [] + conflicts = [] + + # index current by key + idx_by_key = {} + for idx, item in enumerate(cur_list): + k = item.get(self.key_field) + # keep first occurrence + idx_by_key.setdefault(k, idx) + + # process incoming + for inc_item in inc_list: + + k = inc_item.get(self.key_field) + if k in idx_by_key: + if self.enrich_existing: + idx = idx_by_key[k] + updated_list[idx] = self._deep_fill_missing(updated_list[idx], inc_item) + audit.append(f"{path}: enriched item {self.key_field}={k!r}") + continue + + updated_list.append(copy.deepcopy(inc_item)) + idx_by_key[k] = len(updated_list) - 1 + audit.append(f"{path}: appended item {self.key_field}={k!r}") + + updated = copy.deepcopy(current) + set_path(updated, path, updated_list) + return UpdateResult(updated=updated, conflicts=conflicts, audit=audit) \ No newline at end of file diff --git a/site/cds_rdm/inspire_harvester/update/fields/creatibutors.py b/site/cds_rdm/inspire_harvester/update/fields/creatibutors.py new file mode 100644 index 00000000..8cec80e9 --- /dev/null +++ b/site/cds_rdm/inspire_harvester/update/fields/creatibutors.py @@ -0,0 +1,136 @@ +import copy + +from cds_rdm.inspire_harvester.update.engine import UpdateResult, UpdateConflict +from cds_rdm.inspire_harvester.update.field import FieldUpdateBase +from cds_rdm.inspire_harvester.utils import get_path, set_path + + +class CreatibutorsFieldUpdate(FieldUpdateBase): + """ + Merge list-of-dicts of creators/contributors. + Either merges or emits conflicts for human review. + """ + + def __init__(self, strict=True): + self.strict = strict + + + def _union_affiliations(self, cur_list, inc_list): + """ + Union by affiliation['name'] (normalized). Keeps full dict objects as present. + """ + cur_list = cur_list or [] + inc_list = inc_list or [] + + out = [] + seen = set() + + def add(item): + if not isinstance(item, dict): + return + nm = item.get("name") + if nm and nm in seen: + return + out.append(copy.deepcopy(item)) + if nm: + seen.add(nm) + + for a in cur_list: + add(a) + for a in inc_list: + add(a) + + return out + + + def _key(self, creator: dict): + p = creator.get("person_or_org") or {} + ids = p.get("identifiers") or [] + for i in ids: + if i.get("scheme") and i.get("identifier"): + return ("id", i["scheme"], i["identifier"]) + return ( + "name", + (p.get("family_name") or "").lower(), + (p.get("given_name") or "").lower(), + (p.get("name") or "").lower(), + ) + + def _merge_creator(self, cur, inc): + merged = copy.deepcopy(inc) + + # Affiliations: union, never remove + if "affiliations" in cur or "affiliations" in inc: + merged["affiliations"] = self._union_affiliations( + cur.get("affiliations"), inc.get("affiliations") + ) + # merge person_or_org + cur_p = cur.get("person_or_org") or {} + inc_p = inc.get("person_or_org") or {} + mp = copy.deepcopy(inc_p) + + for k, v in cur_p.items(): + if k == "identifiers": + continue + if k not in mp or mp[k] in (None, "", [], {}): + mp[k] = copy.deepcopy(v) + + # union identifiers + seen = {(i.get("scheme"), i.get("identifier")) for i in mp.get("identifiers", [])} + for i in cur_p.get("identifiers", []) or []: + key = (i.get("scheme"), i.get("identifier")) + if key not in seen: + mp.setdefault("identifiers", []).append(copy.deepcopy(i)) + + merged["person_or_org"] = mp + return merged + + def update(self, current, incoming, path, ctx): + cur_list = get_path(current, path) or [] + inc_list = get_path(incoming, path) + + if inc_list is None: + return UpdateResult(updated=current) + + updated_list = copy.deepcopy(cur_list) + conflicts = [] + audit = [] + + # index current + index = {} + for i, c in enumerate(cur_list): + index.setdefault(self._key(c), []).append(i) + + for inc in inc_list: + k = self._key(inc) + matches = index.get(k, []) + + if not matches: + if self.strict: + conflicts.append(UpdateConflict( + path=path, + kind="unknown_creator", + message="Incoming creator not found", + incoming=inc, + )) + else: + updated_list.append(copy.deepcopy(inc)) + audit.append(f"{path}: appended creator {k}") + continue + + if len(matches) > 1: + conflicts.append(UpdateConflict( + path=path, + kind="ambiguous_match", + message="Multiple creators match incoming", + incoming=inc, + )) + continue + + idx = matches[0] + updated_list[idx] = self._merge_creator(cur_list[idx], inc) + audit.append(f"{path}: merged creator {k}") + + updated = copy.deepcopy(current) + set_path(updated, path, updated_list) + return UpdateResult(updated=updated, conflicts=conflicts, audit=audit) \ No newline at end of file diff --git a/site/cds_rdm/inspire_harvester/update/fields/custom_fields.py b/site/cds_rdm/inspire_harvester/update/fields/custom_fields.py new file mode 100644 index 00000000..f1e335e8 --- /dev/null +++ b/site/cds_rdm/inspire_harvester/update/fields/custom_fields.py @@ -0,0 +1,78 @@ +import copy + +from cds_rdm.inspire_harvester.update.engine import UpdateResult, UpdateConflict +from cds_rdm.inspire_harvester.update.field import FieldUpdateBase +from cds_rdm.inspire_harvester.utils import get_path, set_path + + +class ThesisFieldUpdate(FieldUpdateBase): + """ + Strategy for `metadata.thesis:thesis` (InvenioRDM custom field style). + + Requirements: + - Allow updating `university` and `type` from incoming. + - Leave any keys that are missing in incoming intact on the current record + (e.g. keep current `date_defended`, `date_submitted` if incoming doesn't provide them). + - Do not delete keys. + - If incoming thesis object is missing entirely -> no-op. + + Notes: + - If current thesis object is missing and incoming exists -> set it (full copy). + - If either side isn't a dict -> conflict. + """ + + def __init__(self, updatable_keys = ("university", "type")): + self.updatable_keys = updatable_keys + + def update(self, current, incoming, path, ctx): + cur_obj = get_path(current, path) + inc_obj = get_path(incoming, path) + + # No incoming thesis data -> nothing to do + if inc_obj is None: + return UpdateResult(updated=current) + + # If no current thesis -> set from incoming (safe default) + if cur_obj is None: + if not isinstance(inc_obj, dict): + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="type_mismatch", + message="Incoming thesis field is not an object", + incoming=inc_obj, + )], + ) + updated = copy.deepcopy(current) + set_path(updated, path, copy.deepcopy(inc_obj)) + return UpdateResult(updated=updated, audit=[f"{path}: set (was missing)"]) + + # Both must be dicts to merge + if not isinstance(cur_obj, dict) or not isinstance(inc_obj, dict): + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="type_mismatch", + message="Expected thesis field to be an object in both current and incoming", + current=cur_obj, + incoming=inc_obj, + )], + ) + + merged = copy.deepcopy(cur_obj) + + # Only overwrite explicitly allowed keys IF they exist in incoming. + for k in self.updatable_keys: + if k in inc_obj and inc_obj[k] not in (None, "", [], {}): + merged[k] = copy.deepcopy(inc_obj[k]) + + # Keep all other current keys as-is (including date_defended/date_submitted) + updated = copy.deepcopy(current) + set_path(updated, path, merged) + + changed_keys = [k for k in self.updatable_keys if k in inc_obj] + audit = [f"{path}: updated keys {changed_keys} (missing keys preserved)"] if changed_keys else [] + + return UpdateResult(updated=updated, audit=audit) \ No newline at end of file diff --git a/site/cds_rdm/inspire_harvester/update/fields/identifiers.py b/site/cds_rdm/inspire_harvester/update/fields/identifiers.py new file mode 100644 index 00000000..f9840bbf --- /dev/null +++ b/site/cds_rdm/inspire_harvester/update/fields/identifiers.py @@ -0,0 +1,307 @@ +from typing import List + +import copy + +from cds_rdm.inspire_harvester.update.engine import Json, UpdateContext, UpdateResult, \ + UpdateConflict +from cds_rdm.inspire_harvester.update.field import FieldUpdateBase +from cds_rdm.inspire_harvester.utils import get_path, set_path + + +class IdentifiersFieldUpdate(FieldUpdateBase): + """ + Strategy for list-of-dicts identifiers fields (e.g. metadata.related_identifiers). + + Behaviour: + 1) Recognise existing identifiers by (scheme, identifier) pair. + 2) Append identifiers that are present in incoming but not in current (by pair). + 3) If the *same scheme* exists in both current and incoming but the identifier value + differs -> CONFLICT (human review). + 4) If current contains identifier schemes that incoming does not contain -> WARNING. + + Notes: + - This assumes "scheme" is intended to be unique (one identifier per scheme). If you + legitimately allow multiple identifiers per scheme, you should change the conflict + rule (e.g. compare by pair only, not by scheme). + - For an existing (scheme, identifier) pair, we "enrich" the current item with any + missing keys from incoming (so incoming can add resource_type, etc.) without deleting + current keys. + """ + + def __init__(self, warn_on_extra_current_schemes: bool = True): + self.warn_on_extra_current_schemes = warn_on_extra_current_schemes + + def _deep_fill_missing(self, current: dict, incoming: dict) -> dict: + """Fill missing/empty values in base from incoming, recursively for dicts.""" + out = copy.deepcopy(current) + for k, v in (incoming or {}).items(): + if k not in out or out[k] in (None, "", [], {}): + out[k] = copy.deepcopy(v) + elif isinstance(out[k], dict) and isinstance(v, dict): + out[k] = self._deep_fill_missing(out[k], v) + return out + + def update(self, current: Json, incoming: Json, path: str, ctx: UpdateContext) -> UpdateResult: + cur_list = get_path(current, path) or [] + inc_list = get_path(incoming, path) + + if inc_list is None: + return UpdateResult(updated=current) + + if not isinstance(cur_list, list) or not isinstance(inc_list, list): + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="type_mismatch", + message="Expected lists at path", + current=cur_list, + incoming=inc_list, + )], + ) + + updated_list = copy.deepcopy(cur_list) + conflicts =[] + audit = [] + + # Index current by scheme -> (identifier_value, list_index, item) + # If current already has multiple different identifiers for the same scheme, that's suspicious. + cur_by_scheme = {} + cur_pairs = set() + + for idx, item in enumerate(cur_list): + if not isinstance(item, dict): + conflicts.append(UpdateConflict( + path=path, + kind="type_mismatch", + message="Identifier entry is not an object", + current=item, + )) + continue + + scheme = item.get("scheme") + ident = item.get("identifier") + if not scheme or not ident: + conflicts.append(UpdateConflict( + path=path, + kind="invalid_identifier", + message="Identifier entry missing 'scheme' or 'identifier'", + current=item, + )) + continue + + cur_pairs.add((scheme, ident)) + if scheme in cur_by_scheme and cur_by_scheme[scheme]["identifier"] != ident: + conflicts.append(UpdateConflict( + path=path, + kind="duplicate_scheme_in_current", + message="Current record has multiple different identifiers for the same scheme", + current={"existing": cur_by_scheme[scheme]["item"], "another": item}, + details={"scheme": scheme}, + )) + else: + cur_by_scheme[scheme] = {"identifier": ident, "idx": idx, "item": item} + + inc_schemes = set() + cur_schemes = set(cur_by_scheme.keys()) + + # Process incoming + for inc_item in inc_list: + if not isinstance(inc_item, dict): + conflicts.append(UpdateConflict( + path=path, + kind="type_mismatch", + message="Incoming identifier entry is not an object", + incoming=inc_item, + )) + continue + + scheme = inc_item.get("scheme") + ident = inc_item.get("identifier") + if not scheme or not ident: + conflicts.append(UpdateConflict( + path=path, + kind="invalid_identifier", + message="Incoming identifier missing 'scheme' or 'identifier'", + incoming=inc_item, + )) + continue + + inc_schemes.add(scheme) + + # Conflict rule: same scheme but different identifier value + if scheme in cur_by_scheme and cur_by_scheme[scheme]["identifier"] != ident: + conflicts.append(UpdateConflict( + path=path, + kind="scheme_identifier_mismatch", + message="Incoming identifier differs for the same scheme", + current=cur_by_scheme[scheme]["item"], + incoming=inc_item, + details={"scheme": scheme, + "current_identifier": cur_by_scheme[scheme]["identifier"], + "incoming_identifier": ident}, + )) + continue + + # Pair exists -> enrich existing entry with missing fields from incoming + if (scheme, ident) in cur_pairs: + idx = cur_by_scheme[scheme]["idx"] + updated_list[idx] = self._deep_fill_missing(updated_list[idx], inc_item) + audit.append(f"{path}: enriched existing identifier ({scheme}, {ident})") + continue + + # New pair -> append + updated_list.append(copy.deepcopy(inc_item)) + cur_pairs.add((scheme, ident)) + audit.append(f"{path}: appended identifier ({scheme}, {ident})") + + # Warnings: current has more schemes than incoming + if self.warn_on_extra_current_schemes: + extra = sorted(cur_schemes - inc_schemes) + if extra: + # If you have a dedicated warnings channel in your system, + # replace this with `warnings=[...]`. Here we emit audit warnings. + audit.append( + f"WARNING {path}: current has schemes not present in incoming: {extra}" + ) + + updated = copy.deepcopy(current) + set_path(updated, path, updated_list) + return UpdateResult(updated=updated, conflicts=conflicts, audit=audit) + + +class RelatedIdentifiersUpdate(FieldUpdateBase): + """ + Strategy for InvenioRDM-style `metadata.related_identifiers` (list of dicts). + + Based on the diff screenshot: + - Incoming may add new related identifiers (green blocks). + - Current may have extra identifiers that incoming removed (red block). + + Behaviour: + 1) Identify existing entries by (scheme, identifier) pair. + 2) Append any incoming entries whose (scheme, identifier) pair is not present in current. + 3) "Enrich" existing entries with missing fields from incoming (e.g. resource_type), + without deleting current fields. + 4) WARNING if current has more related identifiers entries than incoming + (i.e., incoming appears to have removed some). + + Notes: + - This does *not* delete anything automatically. + - It does *not* treat "same scheme but different identifier" as conflict, because + related_identifiers commonly have multiple entries with the same scheme (e.g. many URLs). + If you want that stricter behaviour, you can add it. + """ + + def __init__(self, warn_if_current_has_more: bool = True): + self.warn_if_current_has_more = warn_if_current_has_more + + def _pair(self, item: dict) -> tuple: + return (item.get("scheme"), item.get("identifier")) + + def _deep_fill_missing(self, base: dict, inc: dict) -> dict: + """Fill missing/empty values in base from inc, recursively for dicts.""" + out = copy.deepcopy(base) + for k, v in (inc or {}).items(): + if k not in out or out[k] in (None, "", [], {}): + out[k] = copy.deepcopy(v) + elif isinstance(out[k], dict) and isinstance(v, dict): + out[k] = self._deep_fill_missing(out[k], v) + return out + + def update(self, current: Json, incoming: Json, path: str, ctx: UpdateContext) -> UpdateResult: + cur_list = get_path(current, path) or [] + inc_list = get_path(incoming, path) + + if inc_list is None: + return UpdateResult(updated=current) + + if not isinstance(cur_list, list) or not isinstance(inc_list, list): + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="type_mismatch", + message="Expected lists at path", + current=cur_list, + incoming=inc_list, + )], + ) + + updated_list = copy.deepcopy(cur_list) + conflicts: List[UpdateConflict] = [] + audit: List[str] = [] + + # Index current by (scheme, identifier) -> index in list + # If duplicates exist, we keep the first index and still handle pairs by membership. + cur_index ={} + cur_pairs = set() + + for idx, item in enumerate(cur_list): + if not isinstance(item, dict): + conflicts.append(UpdateConflict( + path=path, + kind="type_mismatch", + message="Current related_identifier entry is not an object", + current=item, + )) + continue + + scheme, ident = self._pair(item) + if not scheme or not ident: + conflicts.append(UpdateConflict( + path=path, + kind="invalid_related_identifier", + message="Current related_identifier missing 'scheme' or 'identifier'", + current=item, + )) + continue + + pair = (scheme, ident) + cur_pairs.add(pair) + cur_index.setdefault(pair, idx) + + # Apply incoming changes + for inc_item in inc_list: + if not isinstance(inc_item, dict): + conflicts.append(UpdateConflict( + path=path, + kind="type_mismatch", + message="Incoming related_identifier entry is not an object", + incoming=inc_item, + )) + continue + + scheme, ident = self._pair(inc_item) + if not scheme or not ident: + conflicts.append(UpdateConflict( + path=path, + kind="invalid_related_identifier", + message="Incoming related_identifier missing 'scheme' or 'identifier'", + incoming=inc_item, + )) + continue + + pair = (scheme, ident) + + if pair in cur_pairs: + # enrich existing entry (e.g. add resource_type) + idx = cur_index[pair] + updated_list[idx] = self._deep_fill_missing(updated_list[idx], inc_item) + audit.append(f"{path}: enriched existing related_identifier {pair}") + else: + updated_list.append(copy.deepcopy(inc_item)) + cur_pairs.add(pair) + cur_index[pair] = len(updated_list) - 1 + audit.append(f"{path}: appended related_identifier {pair}") + + # Warning: current had more entries than incoming (possible removals) + if self.warn_if_current_has_more and len(cur_list) > len(inc_list): + audit.append( + f"WARNING {path}: current has {len(cur_list)} entries, incoming has {len(inc_list)} " + f"(incoming may have removed {len(cur_list) - len(inc_list)} related_identifiers)" + ) + + updated = copy.deepcopy(current) + set_path(updated, path, updated_list) + return UpdateResult(updated=updated, conflicts=conflicts, audit=audit) \ No newline at end of file diff --git a/site/cds_rdm/inspire_harvester/update/fields/metadata.py b/site/cds_rdm/inspire_harvester/update/fields/metadata.py new file mode 100644 index 00000000..a7985a31 --- /dev/null +++ b/site/cds_rdm/inspire_harvester/update/fields/metadata.py @@ -0,0 +1,131 @@ +import copy + +import dateparser + +from cds_rdm.inspire_harvester.update.engine import UpdateResult, UpdateConflict +from cds_rdm.inspire_harvester.update.field import FieldUpdateBase +from cds_rdm.inspire_harvester.utils import get_path, set_path + + +class PublicationDateUpdate(FieldUpdateBase): + """ + Update publication_date only if incoming value is more accurate. + + Accuracy (granularity): + YYYY < YYYY-MM < YYYY-MM-DD + + Uses dateutil for parsing and validation. + """ + + def __init__(self, + conflict_on_year_mismatch: bool = True, + conflict_on_same_year_mismatch: bool = True): + self.conflict_on_year_mismatch = conflict_on_year_mismatch + self.conflict_on_same_year_mismatch = conflict_on_same_year_mismatch + + def _granularity(self, s: str) -> int: + """Return 1,2,3 for year, month, day granularity.""" + s = s.strip() + if len(s) == 4: + return 1 + if len(s) == 7: + return 2 + if len(s) == 10: + return 3 + raise ValueError("Invalid publication_date format") + + def _parse(self, s: str): + """Parse and validate date string.""" + # dateutil parses partial dates but fills missing parts with defaults. + dt = dateparser.parse(s) + return dt, self._granularity(s) + + def update(self, current, incoming, path, ctx): + cur_v = get_path(current, path) + inc_v = get_path(incoming, path) + + if inc_v is None: + return UpdateResult(updated=current) + + # Parse both + try: + cur_dt, cur_g = self._parse(str(cur_v)) + except ValueError: + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="invalid_date", + message="Current publication_date invalid", + current=cur_v, + )], + ) + + try: + inc_dt, inc_g = self._parse(str(inc_v)) + except ValueError: + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="invalid_date", + message="Incoming publication_date invalid", + incoming=inc_v, + )], + ) + + # Year mismatch + if cur_dt.year != inc_dt.year: + if self.conflict_on_year_mismatch: + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="year_mismatch", + message="Incoming publication_date year differs", + current=cur_v, + incoming=inc_v, + )], + ) + return UpdateResult(updated=current) + + # Same year but contradicts known month/day + if inc_g >= 2 and cur_g >= 2 and inc_dt.month != cur_dt.month: + if self.conflict_on_same_year_mismatch: + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="month_mismatch", + message="Incoming publication_date contradicts current month", + current=cur_v, + incoming=inc_v, + )], + ) + return UpdateResult(updated=current) + + if inc_g == 3 and cur_g == 3 and inc_dt.day != cur_dt.day: + if self.conflict_on_same_year_mismatch: + return UpdateResult( + updated=current, + conflicts=[UpdateConflict( + path=path, + kind="day_mismatch", + message="Incoming publication_date contradicts current day", + current=cur_v, + incoming=inc_v, + )], + ) + return UpdateResult(updated=current) + + # Incoming more accurate → update + if inc_g > cur_g: + updated = copy.deepcopy(current) + set_path(updated, path, inc_v.strip()) + return UpdateResult( + updated=updated, + audit=[f"{path}: updated to more accurate value ({cur_v} → {inc_v})"], + ) + + # Otherwise keep current + return UpdateResult(updated=current) \ No newline at end of file diff --git a/site/cds_rdm/inspire_harvester/transform/utils.py b/site/cds_rdm/inspire_harvester/utils.py similarity index 83% rename from site/cds_rdm/inspire_harvester/transform/utils.py rename to site/cds_rdm/inspire_harvester/utils.py index 2cd5508d..190aad9b 100644 --- a/site/cds_rdm/inspire_harvester/transform/utils.py +++ b/site/cds_rdm/inspire_harvester/utils.py @@ -24,7 +24,24 @@ def assert_unique_ids(mappers): raise ValueError(f"Duplicate mapper ids in pipeline: {dupes}") -def set_path(path, value): +def get_path(record, path): + """Get value of dict from dotted path.""" + cur = record + for part in path.split("."): + if not isinstance(cur, dict) or part not in cur: + return None + cur = cur[part] + return cur + +def set_path(doc, path, value) -> None: + parts = path.split(".") + cur = doc + for p in parts[:-1]: + cur = cur.setdefault(p, {}) + cur[parts[-1]] = value + + +def build_path(path, value): """Build nested dict from dotted path.""" keys = path.split(".") d = {} diff --git a/site/cds_rdm/inspire_harvester/writer.py b/site/cds_rdm/inspire_harvester/writer.py index 235ce77d..65c3b199 100644 --- a/site/cds_rdm/inspire_harvester/writer.py +++ b/site/cds_rdm/inspire_harvester/writer.py @@ -16,6 +16,9 @@ from flask import current_app from invenio_access.permissions import system_identity from invenio_db import db + +from cds_rdm.inspire_harvester.update.config import UPDATE_STRATEGY_CONFIG +from cds_rdm.inspire_harvester.update.engine import UpdateContext, UpdateEngine from invenio_rdm_records.proxies import current_rdm_records_service from invenio_rdm_records.services.errors import ValidationErrorWithMessageAsList from invenio_search.engine import dsl @@ -81,9 +84,9 @@ def _process_entry( error_message = f"Error while processing entry : {str(e)}." except ValidationError as e: error_message = f"Validation error while processing entry: {str(e)}." - # except Exception as e: - # - # error_message = f"Unexpected error while processing entry: {str(e)}." + except Exception as e: + raise e + # error_message = f"Unexpected error while processing entry: {str(e)}." if error_message: logger.error(error_message) stream_entry.errors.append(f"[inspire_id={inspire_id}] {error_message}") @@ -122,6 +125,7 @@ def _get_existing_records( related_identifiers = entry["metadata"].get("related_identifiers", []) cds_id = self._retrieve_identifier(related_identifiers, "cds") + cdsrdm_id = self._retrieve_identifier(related_identifiers, "cdsrdm") arxiv_id = self._retrieve_identifier(related_identifiers, "arxiv") report_number = self._retrieve_identifier(related_identifiers, "cdsrn") @@ -130,7 +134,7 @@ def _get_existing_records( ] cds_filters = [ - dsl.Q("term", **{"id": cds_id}), + dsl.Q("term", **{"id": cdsrdm_id}), ] inspire_filters = [ @@ -207,7 +211,7 @@ def update_record( logger.debug(f"Existing files' checksums: {existing_checksums}.") logger.debug(f"New files' checksums: {new_checksums}.") - existing_record_has_doi = record.data["pids"].get("doi", {}) + existing_record_has_doi = bool(record.data["pids"].get("doi", {})) existing_record_has_cds_doi = ( record.data["pids"].get("doi", {}).get("provider") == "datacite" ) @@ -224,21 +228,31 @@ def update_record( if should_update_files and not files_enabled: stream_entry.entry["files"]["enabled"] = True + entry["files"]["enabled"] = True - if should_create_new_version: + engine = UpdateEngine( + strategies=UPDATE_STRATEGY_CONFIG, + fail_on_conflict=False + ) + ctx = UpdateContext(source="inspire_import") - self._create_new_version(stream_entry, record) + result = engine.update(record_dict, entry, ctx) + update_metadata = result.updated + logger.warning(str(result.conflicts)) + logger.info(str(result.audit)) + + if should_create_new_version: + self._create_new_version(stream_entry, update_metadata, record) else: logger.debug("Create draft for metadata update") - # TODO make this indempotent (check if metadata + files differs, if not, don't create) draft = current_rdm_records_service.edit(system_identity, record_pid) logger.debug(f"Draft created with ID: {draft.id}") draft = current_rdm_records_service.update_draft( - system_identity, draft.id, data=entry + system_identity, draft.id, data=update_metadata ) if should_update_files: @@ -279,7 +293,6 @@ def _update_files( inspire_id=None, logger=None, ): - entry = stream_entry.entry logger.info("Updating files for record {}".format(record.id)) @@ -316,7 +329,7 @@ def _update_files( for key, file in new_files.items(): if file["checksum"] in files_to_create: logger.debug(f"Processing new file: {key}") - inspire_url = file.pop("inspire_url") + inspire_url = file.pop("source_url") file_content = self._fetch_file(stream_entry, inspire_url) if not file_content: @@ -328,7 +341,7 @@ def _update_files( @hlog def _create_new_version( - self, stream_entry, record, inspire_id=None, record_pid=None, logger=None + self, stream_entry, update_metadata, record, inspire_id=None, record_pid=None, logger=None ): """For records with updated files coming from INSPIRE, create and publish a new version.""" entry = stream_entry.entry @@ -336,7 +349,7 @@ def _create_new_version( system_identity, record["id"] ) - new_version_entry = deepcopy(entry) + new_version_entry = deepcopy(update_metadata) # delete the previous DOI for new version if "pids" in entry: del new_version_entry["pids"] @@ -379,13 +392,6 @@ def _create_new_version( raise WriterError( f"Failure: draft {new_version_draft.id} not published, validation errors: {e.messages}." ) - # except Exception as e: - # current_rdm_records_service.delete_draft( - # system_identity, new_version_draft.id - # ) - # raise WriterError( - # f"Draft {new_version_draft.id} failed publishing because of an unexpected error: {str(e)}." - # ) @hlog def _add_community( @@ -434,7 +440,7 @@ def _create_new_record( for key, file_data in file_entries.items(): logger.debug(f"Processing file: {key}") - inspire_url = file_data.pop("inspire_url") + inspire_url = file_data.pop("source_url", None) file_content = self._fetch_file(stream_entry, inspire_url) if not file_content: logger.error(f"Failed to fetch file content for: {key}") @@ -535,6 +541,9 @@ def _create_file( """Create a new file.""" logger.debug(f"Filename: '{file_data['key']}'.") service = current_rdm_records_service + inspire_checksum = file_data["checksum"] + file_source = file_data.get("source") + new_checksum = None try: service.draft_files.init_files( @@ -557,13 +566,17 @@ def _create_file( result = service.draft_files.commit_file( system_identity, draft.id, file_data["key"] ) - inspire_checksum = file_data["checksum"] - new_checksum = result.to_dict()["checksum"] + + new_checksum = result.data["checksum"] logger.debug( - f"Filename: '{file_data['key']}' committed. File checksum: {result.to_dict()['checksum']}." + f"Filename: '{file_data['key']}' committed. File checksum: {result.data['checksum']}." ) - assert inspire_checksum == new_checksum + if file_source != "arxiv": + # arxiv files do not expose checksum via their API + assert inspire_checksum == new_checksum + elif inspire_checksum and file_source == "arxiv": + assert inspire_checksum == new_checksum except AssertionError as e: ## TODO draft? delete record completely? logger.error( @@ -574,15 +587,4 @@ def _create_file( raise WriterError( f"File {file_data['key']} checksum mismatch. Expected: {inspire_checksum}, got: {new_checksum}." - ) - except Exception as e: - logger.error( - f"An error occurred while creating a file. Delete draft file: '{file_data['key']}'. Error: {e}." - ) - - service.draft_files.delete_file(system_identity, draft.id, file_data["key"]) - - raise e - # raise WriterError( - # f"File {file_data['key']} creation failed because of an unexpected error: {str(e)}." - # ) + ) \ No newline at end of file diff --git a/site/tests/inspire_harvester/test_harvester_job.py b/site/tests/inspire_harvester/test_harvester_job.py index ac182041..62e5652d 100644 --- a/site/tests/inspire_harvester/test_harvester_job.py +++ b/site/tests/inspire_harvester/test_harvester_job.py @@ -8,6 +8,7 @@ """ISNPIRE harvester job tests.""" import json from pathlib import Path +from urllib.parse import parse_qs, urlparse import pytest from invenio_access.permissions import system_identity @@ -319,13 +320,21 @@ def mock_requests_get_pagination( ): page_1_file = DATA_DIR / "inspire_response_15_records_page_1.json" page_2_file = DATA_DIR / "inspire_response_15_records_page_2.json" + url_page_1 = "https://inspirehep.net/api/literature?q=_oai.sets%3AForCDS+AND+du+%3E%3D+2024-11-15+AND+du+%3C%3D+2025-01-09" url_page_2 = "https://inspirehep.net/api/literature/?q=_oai.sets%3AForCDS+AND+du+%3E%3D+2024-11-15+AND+du+%3C%3D+2025-01-09&size=10&page=2" filepath = "" - if url == url_page_1: + # Parse the URL + parsed = urlparse(url) + + # Parse query parameters + params = parse_qs(parsed.query) + page = params.get("page", [None])[0] + + if not page or page == "1": filepath = page_1_file - elif url == url_page_2: + elif page and page == "2": filepath = page_2_file content = "" @@ -335,7 +344,6 @@ def mock_requests_get_pagination( "r", ) as f: content = json.load(f) - return mock_requests_get(url, mock_content=content) with pytest.raises(TaskExecutionPartialError) as e: diff --git a/site/tests/inspire_harvester/test_reader.py b/site/tests/inspire_harvester/test_reader.py index 3240695e..70676872 100644 --- a/site/tests/inspire_harvester/test_reader.py +++ b/site/tests/inspire_harvester/test_reader.py @@ -29,9 +29,9 @@ def test_reader_response_400(running_app): with pytest.raises(ReaderError) as e: list(reader.read()) - assert str(e.value).startswith( - "Error occurred while getting JSON data from INSPIRE. See URL: https://inspirehep.net/api/literature?q=_oai.sets%3AForCDS+AND+id%3A1234. Error message: " - ) + assert str(e.value).startswith( + "Error occurred while getting JSON data from INSPIRE. See URL: https://inspirehep.net/api/literature?q=_oai.sets%3AForCDS+AND+id%3A1234. Error message: " + ) def test_reader_empty_results(running_app, caplog): diff --git a/site/tests/inspire_harvester/test_transformer.py b/site/tests/inspire_harvester/test_transformer.py index d6428c02..c5980c49 100644 --- a/site/tests/inspire_harvester/test_transformer.py +++ b/site/tests/inspire_harvester/test_transformer.py @@ -60,8 +60,9 @@ def test_transform_related_identifiers(mock_normalize_isbn, running_app): ) logger = Logger(inspire_id="12345") mapper = RelatedIdentifiersMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) # Should include arXiv, INSPIRE ID, and ISBN (CDS should be in identifiers) assert len(result) == 6 @@ -98,8 +99,9 @@ def test_transform_identifiers(running_app): ) logger = Logger(inspire_id="12345") mapper = IdentifiersMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert len(result) == 1 assert {"identifier": "2633876", "scheme": "cds"} in result @@ -116,8 +118,9 @@ def test_transform_dois_valid_external(mock_is_doi, running_app): ) logger = Logger(inspire_id="12345") mapper = DOIMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result["doi"]["identifier"] == "10.5281/zenodo.12345" assert result["doi"]["provider"] == "external" @@ -133,8 +136,9 @@ def test_transform_dois_valid_datacite(mock_is_doi, running_app): ) logger = Logger(inspire_id="12345") mapper = DOIMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result["doi"]["identifier"] == "10.17181/405kf-bmq61" assert result["doi"]["provider"] == "datacite" @@ -151,8 +155,9 @@ def test_transform_dois_valid_external_second(mock_is_doi, running_app): ) logger = Logger(inspire_id="12345") mapper = DOIMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result["doi"]["identifier"] == "10.1000/test" assert result["doi"]["provider"] == "external" @@ -169,8 +174,9 @@ def test_transform_dois_invalid(mock_is_doi, running_app): ) logger = Logger(inspire_id="12345") mapper = DOIMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result is None assert len(ctx.errors) == 1 @@ -190,8 +196,9 @@ def test_transform_dois_multiple(running_app): ) logger = Logger(inspire_id="12345") mapper = DOIMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result is None assert len(ctx.errors) == 1 @@ -265,12 +272,13 @@ def test_transform_titles_single_title(running_app): resource_type=ResourceType.OTHER, inspire_id="12345" ) logger = Logger(inspire_id="12345") + src_record = {"metadata": src_metadata, "created": "2023-01-01"} title_mapper = TitleMapper() - title = title_mapper.map_value(src_metadata, ctx, logger) + title = title_mapper.map_value(src_record, ctx, logger) additional_titles_mapper = AdditionalTitlesMapper() - additional_titles = additional_titles_mapper.map_value(src_metadata, ctx, logger) + additional_titles = additional_titles_mapper.map_value(src_record, ctx, logger) assert title == "Main Title" assert additional_titles == [] @@ -289,12 +297,13 @@ def test_transform_titles_multiple_titles_with_subtitle(running_app): resource_type=ResourceType.OTHER, inspire_id="12345" ) logger = Logger(inspire_id="12345") + src_record = {"metadata": src_metadata, "created": "2023-01-01"} title_mapper = TitleMapper() - title = title_mapper.map_value(src_metadata, ctx, logger) + title = title_mapper.map_value(src_record, ctx, logger) additional_titles_mapper = AdditionalTitlesMapper() - additional_titles = additional_titles_mapper.map_value(src_metadata, ctx, logger) + additional_titles = additional_titles_mapper.map_value(src_record, ctx, logger) assert title == "Main Title" assert len(additional_titles) == 3 @@ -334,8 +343,9 @@ def test_transform_creators(running_app): ) logger = Logger(inspire_id="12345") mapper = AuthorsMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) # Check corporate authors corporate_authors = [ @@ -367,8 +377,9 @@ def test_transform_contributors(running_app): ) logger = Logger(inspire_id="12345") mapper = ContributorsMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) # Should include supervisors (author went to creators) assert len(result) == 1 # 1 supervisor @@ -458,8 +469,9 @@ def test_transform_copyrights_complete(running_app): ) logger = Logger(inspire_id="12345") mapper = CopyrightMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result == "© CERN 2023, All rights reserved https://cern.ch" @@ -477,8 +489,9 @@ def test_transform_copyrights_multiple(running_app): ) logger = Logger(inspire_id="12345") mapper = CopyrightMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result == "© CERN 2023
© CC BY 4.0" @@ -491,8 +504,9 @@ def test_transform_copyrights_empty(running_app): ) logger = Logger(inspire_id="12345") mapper = CopyrightMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result is None @@ -510,8 +524,9 @@ def test_transform_abstracts(running_app): ) logger = Logger(inspire_id="12345") mapper = DescriptionMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result == "This is the main abstract" @@ -524,8 +539,9 @@ def test_transform_abstracts_empty(running_app): ) logger = Logger(inspire_id="12345") mapper = DescriptionMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result is None @@ -544,8 +560,9 @@ def test_transform_subjects(running_app): ) logger = Logger(inspire_id="12345") mapper = SubjectsMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert len(result) == 2 assert {"subject": "quantum mechanics"} in result @@ -565,8 +582,9 @@ def test_transform_languages(mock_pycountry, running_app): ) logger = Logger(inspire_id="12345") mapper = LanguagesMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result == [{"id": "eng"}] @@ -582,8 +600,9 @@ def test_transform_languages_invalid(mock_pycountry, running_app): ) logger = Logger(inspire_id="12345") mapper = LanguagesMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result == [] assert len(ctx.errors) == 1 @@ -603,8 +622,9 @@ def test_transform_additional_descriptions(running_app): ) logger = Logger(inspire_id="12345") mapper = AdditionalDescriptionsMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert len(result) == 3 assert { @@ -636,8 +656,9 @@ def test_transform_files(running_app): ) logger = Logger(inspire_id="12345") mapper = FilesMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result["enabled"] is True assert "test.pdf" in result["entries"] @@ -645,7 +666,7 @@ def test_transform_files(running_app): file_entry = result["entries"]["test.pdf"] assert file_entry["checksum"] == "md5:abc123" assert file_entry["key"] == "test.pdf" - assert file_entry["inspire_url"] == "https://example.com/file" + assert file_entry["source_url"] == "https://example.com/file" assert file_entry["metadata"]["description"] == "Test file" assert file_entry["metadata"]["original_url"] == "https://original.com/file" @@ -663,8 +684,9 @@ def test_transform_no_files_error(running_app): ) logger = Logger(inspire_id="12345") mapper = FilesMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result == {"enabled": True, "entries": {}} @@ -680,8 +702,9 @@ def test_transform_imprint_place(running_app): ) logger = Logger(inspire_id="12345") mapper = ImprintMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert "place" in result assert result["place"] == "Geneva" @@ -699,8 +722,9 @@ def test_transform_imprint_place_with_isbn(running_app): ) logger = Logger(inspire_id="12345") mapper = ImprintMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert "place" in result assert result["place"] == "New York" @@ -718,8 +742,9 @@ def test_transform_imprint_place_no_imprints(running_app): ) logger = Logger(inspire_id="12345") mapper = ImprintMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) assert result == {} @@ -753,8 +778,9 @@ def test_transform_files_figures_omitted(running_app): ) logger = Logger(inspire_id="12345") mapper = FilesMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) # Documents should be included assert "thesis.pdf" in result["entries"] @@ -783,8 +809,9 @@ def test_transform_files_pdf_extension(running_app): ) logger = Logger(inspire_id="12345") mapper = FilesMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - result = mapper.map_value(src_metadata, ctx, logger) + result = mapper.map_value(src_record, ctx, logger) # Should not add .pdf extension if already present assert "document.pdf" in result["entries"] @@ -798,8 +825,9 @@ def test_transform_publisher(running_app): ) logger = Logger(inspire_id="12345") mapper = PublisherMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - publisher = mapper.map_value(src_metadata, ctx, logger) + publisher = mapper.map_value(src_record, ctx, logger) assert publisher == "Test Publisher" @@ -815,8 +843,9 @@ def test_transform_publication_date_from_imprint(mock_parse_edtf, running_app): ) logger = Logger(inspire_id="12345") mapper = PublicationDateMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - date = mapper.map_value(src_metadata, ctx, logger) + date = mapper.map_value(src_record, ctx, logger) assert date == "2023" @@ -835,8 +864,9 @@ def test_transform_publication_date_parse_exception(mock_parse_edtf, running_app ) logger = Logger(inspire_id="12345") mapper = PublicationDateMapper() + src_record = {"metadata": src_metadata, "created": "2023-01-01"} - date = mapper.map_value(src_metadata, ctx, logger) + date = mapper.map_value(src_record, ctx, logger) assert date is None assert len(ctx.errors) == 1 diff --git a/site/tests/inspire_harvester/test_writer.py b/site/tests/inspire_harvester/test_writer.py index a4bf61d4..bbc6a653 100644 --- a/site/tests/inspire_harvester/test_writer.py +++ b/site/tests/inspire_harvester/test_writer.py @@ -54,7 +54,7 @@ def transformed_record_1_file(scope="function"): "checksum": "md5:4c993d7ec1c1faf3c8e3a290219de361", "key": "fulltext.pdf", "access": {"hidden": False}, - "inspire_url": "https://inspirehep.net/files/4c993d7ec1c1faf3c8e3a290219de361", + "source_url": "https://inspirehep.net/files/4c993d7ec1c1faf3c8e3a290219de361", } } }, @@ -95,13 +95,13 @@ def transformed_record_2_files(): "checksum": "md5:4c993d7ec1c1faf3c8e3a290219de361", "key": "fulltext.pdf", "access": {"hidden": False}, - "inspire_url": "https://inspirehep.net/files/4c993d7ec1c1faf3c8e3a290219de361", + "source_url": "https://inspirehep.net/files/4c993d7ec1c1faf3c8e3a290219de361", }, "Afiq_Anuar_PhD_v3_DESY-THESIS.pdf": { "checksum": "md5:f45abb6d082da30cb6ee7e828454c680", "key": "Afiq_Anuar_PhD_v3_DESY-THESIS.pdf", "access": {"hidden": False}, - "inspire_url": "https://inspirehep.net/files/f45abb6d082da30cb6ee7e828454c680", + "source_url": "https://inspirehep.net/files/f45abb6d082da30cb6ee7e828454c680", }, } }, @@ -148,8 +148,8 @@ def test_writer_1_rec_1_file( == transformed_record_1_file["files"]["entries"]["fulltext.pdf"]["key"] ) - # check that we removed inspire_url - assert "inspire_url" not in files["entries"]["fulltext.pdf"] + # check that we removed source_url + assert "source_url" not in files["entries"]["fulltext.pdf"] _cleanup_record(record["id"]) @@ -163,7 +163,7 @@ def test_writer_1_rec_1_file_failed( # make url invalid transformed_record["files"]["entries"]["fulltext.pdf"]["checksum"] = "fake" transformed_record["files"]["entries"]["fulltext.pdf"][ - "inspire_url" + "source_url" ] = "https://inspirehep.net/files/fake" # call writer @@ -218,7 +218,7 @@ def test_writer_2_records( "checksum": "md5:0b0532554c3864fa80e73f54df9b77c6", "key": "fulltext.pdf", "access": {"hidden": False}, - "inspire_url": "https://inspirehep.net/files/0b0532554c3864fa80e73f54df9b77c6", + "source_url": "https://inspirehep.net/files/0b0532554c3864fa80e73f54df9b77c6", } } }, @@ -375,7 +375,7 @@ def test_writer_1_existing_found_file_changed_new_version_created( "checksum": "md5:f45abb6d082da30cb6ee7e828454c680", "key": "Afiq_Anuar_PhD_v3_DESY-THESIS.pdf", "access": {"hidden": False}, - "inspire_url": "https://inspirehep.net/files/f45abb6d082da30cb6ee7e828454c680", + "source_url": "https://inspirehep.net/files/f45abb6d082da30cb6ee7e828454c680", } # call writer @@ -423,7 +423,7 @@ def test_writer_1_existing_found_file_and_metadata_changed( "checksum": "md5:f45abb6d082da30cb6ee7e828454c680", "key": "Afiq_Anuar_PhD_v3_DESY-THESIS.pdf", "access": {"hidden": False}, - "inspire_url": "https://inspirehep.net/files/f45abb6d082da30cb6ee7e828454c680", + "source_url": "https://inspirehep.net/files/f45abb6d082da30cb6ee7e828454c680", } # make changes to metadata @@ -480,7 +480,7 @@ def test_writer_1_existing_found_1_more_file_added( "checksum": "md5:f45abb6d082da30cb6ee7e828454c680", "key": "Afiq_Anuar_PhD_v3_DESY-THESIS.pdf", "access": {"hidden": False}, - "inspire_url": "https://inspirehep.net/files/f45abb6d082da30cb6ee7e828454c680", + "source_url": "https://inspirehep.net/files/f45abb6d082da30cb6ee7e828454c680", } # call writer @@ -579,7 +579,7 @@ def test_writer_1_existing_found_with_2_files_1_deleted_1_added( "checksum": "md5:0f9dd913d49cf6bf2413b2310088bed6", "key": "Maier.pdf", "access": {"hidden": False}, - "inspire_url": "https://inspirehep.net/files/0f9dd913d49cf6bf2413b2310088bed6", + "source_url": "https://inspirehep.net/files/0f9dd913d49cf6bf2413b2310088bed6", } # call writer @@ -631,7 +631,7 @@ def test_writer_1_existing_found_new_version_creation_failed( # make url invalid transformed_record["files"]["entries"]["fulltext.pdf"]["checksum"] = "fake" transformed_record["files"]["entries"]["fulltext.pdf"][ - "inspire_url" + "source_url" ] = "https://inspirehep.net/files/fake" # call writer