diff --git a/digital_land/commands.py b/digital_land/commands.py index 3eb03b615..ac5ae4ca3 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -39,6 +39,7 @@ from digital_land.phase.organisation import OrganisationPhase from digital_land.phase.parse import ParsePhase from digital_land.phase.patch import PatchPhase +from digital_land.phase.priority import PriorityPhase from digital_land.phase.pivot import PivotPhase from digital_land.phase.prefix import EntityPrefixPhase from digital_land.phase.prune import FieldPrunePhase, EntityPrunePhase, FactPrunePhase @@ -262,6 +263,7 @@ def pipeline_run( enabled=save_harmonised, ), EntityPrunePhase(dataset_resource_log=dataset_resource_log), + PriorityPhase(), PivotPhase(), FactCombinePhase(issue_log=issue_log, fields=combine_fields), FactorPhase(), diff --git a/digital_land/package/dataset.py b/digital_land/package/dataset.py index c161a3f8d..90166f968 100644 --- a/digital_land/package/dataset.py +++ b/digital_land/package/dataset.py @@ -155,7 +155,7 @@ def load_entities(self): self.execute( "select entity, field, value from fact" " where value != '' or field == 'end-date'" - " order by entity, field, entry_date" + " order by entity, field, priority desc, entry_date" ) results = self.cursor.fetchall() diff --git a/digital_land/phase/pivot.py b/digital_land/phase/pivot.py index dce6b4d8b..5ae206dd6 100644 --- a/digital_land/phase/pivot.py +++ b/digital_land/phase/pivot.py @@ -20,6 +20,7 @@ def process(self, stream): "field": field, "value": value, # entry + "priority": block["priority"], "resource": block["resource"], "line-number": block["line-number"], "entry-number": block["entry-number"], diff --git a/digital_land/phase/priority.py b/digital_land/phase/priority.py new file mode 100644 index 000000000..6a981aad9 --- /dev/null +++ b/digital_land/phase/priority.py @@ -0,0 +1,29 @@ +import sqlite3 +from .phase import Phase + + +class PriorityPhase(Phase): + """ + Deduce priority of the entry when assembling facts + """ + + def __init__(self, connection=None): + if not connection: + connection = sqlite3.connect("var/cache/pipeline.sqlite3") + self.cursor = connection.cursor() + + def entity_organisation(self, entity): + self.cursor.execute( + f"select organisation from entity_organisation where entity_minimum <= {entity} and entity_maximum >= {entity}" + ) + row = self.cursor.fetchone() + return row[0] if row else None + + def priority(self, entity, organisation): + return 1 if self.entity_organisation(entity) == organisation else 2 + + def process(self, stream): + for block in stream: + row = block["row"] + block["priority"] = self.priority(row["entity"], row["organisation"]) + yield block diff --git a/setup.py b/setup.py index be56f7459..39187e5c6 100644 --- a/setup.py +++ b/setup.py @@ -37,6 +37,7 @@ def get_long_description(): "canonicaljson", "click", "cchardet", + "dask[dataframe]", "esridump", "pandas", "pyproj",