Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion cds_migrator_kit/rdm/migration_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
)

from .permissions import CDSRDMMigrationRecordPermissionPolicy
from invenio_app_rdm.config import (
STATS_EVENTS as _APP_RDM_STATS_EVENTS,
STATS_AGGREGATIONS as _APP_RDM_STATS_AGGREGATIONS,
)


def _(x): # needed to avoid start time failure with lazy strings
Expand Down Expand Up @@ -482,5 +486,30 @@ def resolve_record_pid(pid):
*DefaultRecordsComponents,
CDSResourcePublication,
ClcSyncComponent,
MintAlternateIdentifierComponent,
# component disabled, this part is handled separately in migration code
# due to two conflicting DB updates causing StaleDataError
# MintAlternateIdentifierComponent,
]


# Invenio Stats
# =============

# We override the templates to add new fields needed for the migrated statistic events
_APP_RDM_STATS_EVENTS["file-download"][
"templates"
] = "cds_rdm.stats.templates.events.file_download"
_APP_RDM_STATS_EVENTS["record-view"][
"templates"
] = "cds_rdm.stats.templates.events.record_view"

# Add the yearly suffix
_APP_RDM_STATS_EVENTS["file-download"]["params"]["suffix"] = "%Y"
_APP_RDM_STATS_EVENTS["record-view"]["params"]["suffix"] = "%Y"

# Override the index_interval to be year
_APP_RDM_STATS_AGGREGATIONS["file-download-agg"]["params"]["index_interval"] = "year"
_APP_RDM_STATS_AGGREGATIONS["record-view-agg"]["params"]["index_interval"] = "year"

# don't generate logs for migration
AUDIT_LOGS_ENABLED = False
135 changes: 66 additions & 69 deletions cds_migrator_kit/rdm/records/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@
from cds_rdm.legacy.models import CDSMigrationLegacyRecord
from cds_rdm.legacy.resolver import get_pid_by_legacy_recid
from cds_rdm.minters import legacy_recid_minter
from cds_rdm.tasks import sync_alternate_identifiers
from flask import current_app
from invenio_access.permissions import system_identity
from invenio_accounts.models import User
from invenio_db import db
from invenio_db.uow import UnitOfWork
from invenio_pidstore.errors import PIDAlreadyExists
from invenio_pidstore.models import PersistentIdentifier
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_rdm_migrator.load.base import Load
from invenio_rdm_records.proxies import current_rdm_records_service
from invenio_records.systemfields.relations import InvalidRelationValue
from marshmallow import ValidationError
from sqlalchemy.orm.exc import StaleDataError

from cds_migrator_kit.errors import (
CDSMigrationException,
Expand Down Expand Up @@ -181,7 +182,7 @@ def _load_communities(self, draft, entry):
parent.communities.default = entry["parent"]["json"]["communities"]["default"]
parent.commit()

def _after_publish_update_dois(self, identity, record, entry):
def _after_publish_update_dois(self, identity, record, entry, uow):
"""Update migrated DOIs post publish."""
migrated_pids = entry["record"]["json"]["pids"]
for pid_type, identifier in migrated_pids.items():
Expand All @@ -190,8 +191,13 @@ def _after_publish_update_dois(self, identity, record, entry):
# will return a warning that "This DOI has already been taken"
# In that case, we edit and republish to force an update of the doi with
# the new published metadata as in the new system we have more information available
_draft = current_rdm_records_service.edit(identity, record["id"])
current_rdm_records_service.publish(identity, _draft["id"])
_draft = current_rdm_records_service.edit(
identity, record["id"], uow=uow
)
record = current_rdm_records_service.publish(
identity, _draft["id"], uow=uow
)
return record

def _after_publish_load_parent_access_grants(self, draft, version, entry):
"""Load access grants from metadata and record grants efficiently."""
Expand Down Expand Up @@ -340,7 +346,6 @@ def _create_grant(subject_type, subject_id, permission):
# raise error for missing user
missing_emails = emails - existing_users.keys()
if missing_emails:

raise GrantCreationError(
message=f"Users not found for emails: {', '.join(missing_emails)}",
stage="load",
Expand Down Expand Up @@ -381,17 +386,8 @@ def _after_publish_update_created(self, record, entry, version):
tzinfo=None
)

record_obj = record._record.model
for attempt in range(3):
try:
with db.session.begin_nested():
record_obj.created = creation_date
record._record.commit()
except StaleDataError as e:
db.session.rollback()
record_obj = db.session.merge(record_obj, load=True)
if attempt == 2:
raise e
record._record.model.created = creation_date
record._record.commit()

def _after_publish_mint_recid(self, record, entry, version):
"""Mint legacy ids for redirections assigned to the parent."""
Expand All @@ -416,16 +412,46 @@ def _after_publish_update_files_created(self, record, entry, version):
)
file.commit()

def _after_publish(self, identity, published_record, entry, version):
def _after_publish(self, identity, published_record, entry, version, uow):
"""Run fixes after record publish."""
self._after_publish_update_dois(identity, published_record, entry)
record = self._after_publish_update_dois(identity, published_record, entry, uow)
if record:
published_record = record
self._after_publish_update_created(published_record, entry, version)
self._after_publish_mint_recid(published_record, entry, version)
self._after_publish_update_files_created(published_record, entry, version)
self._after_publish_load_parent_access_grants(published_record, version, entry)
db.session.commit()
# db.session.commit()

def _assign_rep_numbers(self, draft):
draft_report_nums = {}
for index, id in enumerate(draft.data["metadata"].get("identifiers", [])):
if id["scheme"] == "cdsrn":
draft_report_nums[id["identifier"]] = index

if not draft_report_nums:
# If no mintable identifiers, return early
return

for report_number, index in draft_report_nums.items():
try:
PersistentIdentifier.create(
pid_type="cdsrn",
pid_value=report_number,
object_type="rec",
object_uuid=draft._record.parent.id,
status=PIDStatus.REGISTERED,
)
except PIDAlreadyExists as e:
pid = PersistentIdentifier.get(pid_type="cdsrn", pid_value=report_number)
if pid.object_uuid != draft._record.parent.id:
# raise only if different parent uuid found, meaning they are 2
# different records and the repnum is duplicated
raise ManualImportRequired(
f"Report number {report_number} already exists."
)

def _pre_publish(self, identity, entry, version, draft):
def _pre_publish(self, identity, entry, version, draft, uow):
"""Create and process draft before publish."""
versions = entry["versions"]
files = versions[version]["files"]
Expand All @@ -440,6 +466,7 @@ def _pre_publish(self, identity, entry, version, draft):
draft = current_rdm_records_service.create(
identity, data=entry["record"]["json"]
)
self._assign_rep_numbers(draft)
except Exception as e:

raise ManualImportRequired(message=str(e))
Expand All @@ -456,7 +483,6 @@ def _pre_publish(self, identity, entry, version, draft):
# TODO we can use unit of work when it is moved to invenio-db module
self._load_parent_access(draft, entry)
self._load_communities(draft, entry)
db.session.commit()
else:
draft = current_rdm_records_service.new_version(identity, draft["id"])
draft_dict = draft.to_dict()
Expand All @@ -473,12 +499,13 @@ def _pre_publish(self, identity, entry, version, draft):
draft = current_rdm_records_service.update_draft(
identity, draft["id"], data=missing_data
)

self._load_record_access(draft, access)
self._load_files(draft, entry, files)

return draft

def _load_versions(self, entry):
def _load_versions(self, entry, uow):
"""Load other versions of the record."""
versions = entry["versions"]
legacy_recid = entry["record"]["recid"]
Expand All @@ -491,15 +518,15 @@ def _load_versions(self, entry):
draft = None
for version in versions.keys():
# Create and prepare draft
draft = self._pre_publish(identity, entry, version, draft)
draft = self._pre_publish(identity, entry, version, draft, uow)

# Publish draft
published_record = current_rdm_records_service.publish(
identity, draft["id"]
identity, draft["id"], uow=uow
)
# Run after publish fixes
self._after_publish(identity, published_record, entry, version)
records.append(published_record._record)
self._after_publish(identity, published_record, entry, version, uow)
records.append(published_record._record)

if records:
record_state_context = self._load_record_state(legacy_recid, records)
Expand Down Expand Up @@ -602,17 +629,7 @@ def _save_original_dumped_record(self, entry, recid_state):
migrated_record_object_uuid=recid_state["latest_version_object_uuid"],
legacy_recid=entry["record"]["recid"],
)

for attempt in range(3):
try:
with db.session.begin_nested():
db.session.add(_original_dump_model)
db.session.commit()
except StaleDataError as e:
db.session.rollback()
_original_dump_model = db.session.merge(_original_dump_model, load=True)
if attempt == 2:
raise e
db.session.add(_original_dump_model)

def _have_migrated_recid(self, recid):
"""Check if we have minted `lrecid` pid."""
Expand All @@ -636,55 +653,35 @@ def _after_load_clc_sync(self, record_state):
auto_sync=False,
)
db.session.add(sync)
db.session.commit()

def _load(self, entry):
"""Use the services to load the entries."""
if entry:
recid = entry.get("record", {}).get("recid", {})
if self._should_skip_recid(recid):
return

self.clc_sync = deepcopy(entry.get("_clc_sync", False))
if "_clc_sync" in entry:
del entry["_clc_sync"]

recid = entry.get("record", {}).get("recid", {})

if self._should_skip_recid(recid):
return

try:
if self.dry_run:
self._dry_load(entry)
else:
recid_state_after_load = self._load_versions(
entry,
)
if recid_state_after_load:
self._save_original_dumped_record(
entry,
recid_state_after_load,
)
self._after_load_clc_sync(recid_state_after_load)
with UnitOfWork(db.session) as uow:
recid_state_after_load = self._load_versions(entry, uow)
if recid_state_after_load:
self._save_original_dumped_record(
entry, recid_state_after_load
)
self._after_load_clc_sync(recid_state_after_load)
uow.commit()
self.migration_logger.finalise_record(recid)
except ManualImportRequired as e:
self.migration_logger.add_log(e, record=entry)
except GrantCreationError as e:
self.migration_logger.add_log(e, record=entry)
except PIDAlreadyExists as e:
# TODO remove when there is a way of cleaning local environment from
# previous run of migration
exc = ManualImportRequired(
message=str(e),
field="validation",
stage="load",
description="RECORD Already exists.",
recid=recid,
priority="warning",
value=e.pid_value,
subfield="PID",
)
self.migration_logger.add_log(exc, record=entry)
except GrantCreationError as e:
self.migration_logger.add_log(e, record=entry)
except (CDSMigrationException, ValidationError, InvalidRelationValue) as e:

exc = ManualImportRequired(
Expand Down
4 changes: 2 additions & 2 deletions cds_migrator_kit/rdm/records/transform/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
PID_SCHEMES_TO_STORE_IN_IDENTIFIERS = [
"ARXIV",
"HDL",
"HAL"
"HANDLE",
"HAL" "HANDLE",
"URN",
"INIS",
"CERCER",
Expand Down Expand Up @@ -51,6 +50,7 @@
"in2p3",
"eucard",
"inspec",
"desy",
]
KEYWORD_SCHEMES_TO_DROP = ["proquest", "disxa"]

Expand Down
1 change: 1 addition & 0 deletions cds_migrator_kit/rdm/records/transform/models/it.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ITModel(CdsOverdo):
"773__a", # Duplicate DOI
"773__o", # Duplicate meeting title
"773__u", # Duplicate meeting url
"773__t", # CNL articles - duplicate info
"785__t", # Related works platform
"785__x", # Related works type
"7870_r", # detailed description of record relation
Expand Down
27 changes: 13 additions & 14 deletions cds_migrator_kit/rdm/records/transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
from collections import OrderedDict
from copy import deepcopy
from flask import current_app
from pathlib import Path

import arrow
Expand Down Expand Up @@ -349,7 +350,7 @@ def lookup_person_id(creator):
name.json = json_copy

db.session.add(name)
db.session.commit()
# db.session.commit()

def creators(json, key="creators"):
_creators = deepcopy(json.get(key, []))
Expand Down Expand Up @@ -976,19 +977,17 @@ def should_skip(self, entry):

def run(self, entries):
"""Run transformation step."""
if self._workers is None:
for entry in entries:
if self.should_skip(entry):
continue
try:
yield self._transform(entry)
except Exception:
self.logger.exception(entry, exc_info=True)
if self._throw:
raise
continue
else:
yield from self._multiprocess_transform(entries)
for entry in entries:
if self.should_skip(entry):
current_app.logger.warning(f"Skipping entry {entry['recid']}")
continue
try:
yield self._transform(entry)
except Exception:
self.logger.exception(entry, exc_info=True)
if self._throw:
raise
continue

#
#
Expand Down
Loading
Loading