diff --git a/digital_land/__init__.py b/digital_land/__init__.py index b21843faa..d8171bebc 100644 --- a/digital_land/__init__.py +++ b/digital_land/__init__.py @@ -1,8 +1,15 @@ +import os import sys import csv import logging from importlib.metadata import version, PackageNotFoundError +# On macOS, fork() after loading C extensions (numpy, pandas, pyproj, etc.) +# can segfault because the Objective-C runtime's post-fork safety checks abort +# the child process. Setting this before any C extension is imported ensures +# it is inherited by any subprocess spawned later (e.g. ogr2ogr via gdal_utils). +os.environ.setdefault("OBJC_DISABLE_INITIALIZE_FORK_SAFETY", "YES") + logger = logging.getLogger(__name__) PACKAGE_NAME = "digital-land" try: diff --git a/digital_land/expectations/utils.py b/digital_land/expectations/utils.py index 554e67905..935ef032a 100644 --- a/digital_land/expectations/utils.py +++ b/digital_land/expectations/utils.py @@ -1,12 +1,8 @@ """ Utility functions to support functions in the expectation module - -notes: -- might want to remove QueryRunner at a future date as it loads spatialite which may -not be useful for everything """ -import spatialite +import sqlite3 import yaml import pandas as pd @@ -49,10 +45,14 @@ def run_query(self, sql_query: str, return_only_first_col_as_set: bool = False): would mean having to dev thread-lcoal connection pools. For more info see: https://stackoverflow.com/a/14520670 """ - with spatialite.connect(self.tested_dataset_path) as con: - cursor = con.execute(sql_query) - cols = [column[0] for column in cursor.description] - results = pd.DataFrame.from_records(data=cursor.fetchall(), columns=cols) + con = sqlite3.connect(self.tested_dataset_path) + con.enable_load_extension(True) + con.load_extension("mod_spatialite") + con.enable_load_extension(False) + cursor = con.execute(sql_query) + cols = [column[0] for column in cursor.description] + results = pd.DataFrame.from_records(data=cursor.fetchall(), columns=cols) + con.close() if return_only_first_col_as_set: return transform_df_first_column_into_set(results) diff --git a/digital_land/phase/convert.py b/digital_land/phase/convert.py index ebb5c93e1..771c117d2 100644 --- a/digital_land/phase/convert.py +++ b/digital_land/phase/convert.py @@ -4,9 +4,11 @@ import json_stream import os import os.path +import signal import sqlite3 import subprocess import tempfile +import threading import time import zipfile from packaging.version import Version @@ -61,16 +63,65 @@ def load_csv(path, encoding="UTF-8", log=None): def execute(command, env=os.environ): logging.debug("execute: %s", command) + + # On macOS, subprocess.Popen uses fork() which crashes when C extensions + # that initialise GCD/libdispatch (e.g. spatialite) are already loaded. + # os.posix_spawnp avoids fork entirely, so it is safe in all cases. + # TODO once we have migrated to python3.13+ we can use sub process again + if hasattr(os, "posix_spawnp"): + stdout_r, stdout_w = os.pipe() + stderr_r, stderr_w = os.pipe() + file_actions = [ + (os.POSIX_SPAWN_DUP2, stdout_w, 1), + (os.POSIX_SPAWN_DUP2, stderr_w, 2), + (os.POSIX_SPAWN_CLOSE, stdout_r), + (os.POSIX_SPAWN_CLOSE, stderr_r), + (os.POSIX_SPAWN_CLOSE, stdout_w), + (os.POSIX_SPAWN_CLOSE, stderr_w), + ] + try: + pid = os.posix_spawnp(command[0], command, env, file_actions=file_actions) + except BaseException: + for fd in (stdout_r, stdout_w, stderr_r, stderr_w): + os.close(fd) + raise + os.close(stdout_w) + os.close(stderr_w) + stdout_data, stderr_data = [], [] + + def _read(fd, buf): + with os.fdopen(fd, "rb") as f: + buf.append(f.read()) + + t1 = threading.Thread(target=_read, args=(stdout_r, stdout_data), daemon=True) + t2 = threading.Thread(target=_read, args=(stderr_r, stderr_data), daemon=True) + t1.start() + t2.start() + t1.join(timeout=600) + t2.join(timeout=600) + if t1.is_alive() or t2.is_alive(): + os.kill(pid, signal.SIGKILL) + t1.join() + t2.join() + _, raw_status = os.waitpid(pid, 0) + if os.WIFEXITED(raw_status): + returncode = os.WEXITSTATUS(raw_status) + elif os.WIFSIGNALED(raw_status): + returncode = -os.WTERMSIG(raw_status) + else: + returncode = -1 + outs = stdout_data[0] if stdout_data else b"" + errs = stderr_data[0] if stderr_data else b"" + return returncode, outs.decode("utf-8"), errs.decode("utf-8") + proc = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env ) - try: outs, errs = proc.communicate(timeout=600) except subprocess.TimeoutExpired: proc.kill() outs, errs = proc.communicate() - return proc.returncode, outs.decode("utf-8"), errs.decode("utf-8") diff --git a/digital_land/utils/gdal_utils.py b/digital_land/utils/gdal_utils.py index 0c1c02ae2..690383df6 100644 --- a/digital_land/utils/gdal_utils.py +++ b/digital_land/utils/gdal_utils.py @@ -1,8 +1,10 @@ import re import subprocess +from functools import lru_cache from packaging.version import Version +@lru_cache(maxsize=None) def get_gdal_version(): out, _ = subprocess.Popen( ["ogr2ogr", "--version"], @@ -10,4 +12,4 @@ def get_gdal_version(): stderr=subprocess.DEVNULL, ).communicate() - return Version(re.compile(r"GDAL\s([0-9.]+),").match(out.decode("ascii")).group(1)) + return Version(re.compile(r"GDAL\s([0-9.]+)").match(out.decode("ascii")).group(1)) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 3a34b023b..8e03af873 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -1,3 +1,16 @@ +def pytest_configure(config): + # Pre-warm the gdal version cache before any test runs. On macOS, calling + # fork() (via subprocess) after a spatial library such as mod_spatialite has + # been loaded causes a segmentation fault. Caching the result here ensures + # get_gdal_version() never needs to fork after that point. + try: + from digital_land.utils.gdal_utils import get_gdal_version + + get_gdal_version() + except Exception: + pass + + class FakeDictReader: # Can be used in place of load.py::DictReaderInjectResource (a subclass of # csv.DictReader). Simply returns values from the passed in list of rows.