Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions digital_land/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import os
import sys
import csv
import logging
from importlib.metadata import version, PackageNotFoundError

# On macOS, fork() after loading C extensions (numpy, pandas, pyproj, etc.)
# can segfault because the Objective-C runtime's post-fork safety checks abort
# the child process. Setting this before any C extension is imported ensures
# it is inherited by any subprocess spawned later (e.g. ogr2ogr via gdal_utils).
os.environ.setdefault("OBJC_DISABLE_INITIALIZE_FORK_SAFETY", "YES")

logger = logging.getLogger(__name__)
PACKAGE_NAME = "digital-land"
try:
Expand Down
18 changes: 9 additions & 9 deletions digital_land/expectations/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
"""
Utility functions to support functions in the expectation module

notes:
- might want to remove QueryRunner at a future date as it loads spatialite which may
not be useful for everything
"""

import spatialite
import sqlite3
import yaml
import pandas as pd

Expand Down Expand Up @@ -49,10 +45,14 @@ def run_query(self, sql_query: str, return_only_first_col_as_set: bool = False):
would mean having to dev thread-lcoal connection pools. For more
info see: https://stackoverflow.com/a/14520670
"""
with spatialite.connect(self.tested_dataset_path) as con:
cursor = con.execute(sql_query)
cols = [column[0] for column in cursor.description]
results = pd.DataFrame.from_records(data=cursor.fetchall(), columns=cols)
con = sqlite3.connect(self.tested_dataset_path)
con.enable_load_extension(True)
con.load_extension("mod_spatialite")
con.enable_load_extension(False)
cursor = con.execute(sql_query)
cols = [column[0] for column in cursor.description]
results = pd.DataFrame.from_records(data=cursor.fetchall(), columns=cols)
con.close()

if return_only_first_col_as_set:
return transform_df_first_column_into_set(results)
Expand Down
55 changes: 53 additions & 2 deletions digital_land/phase/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
import json_stream
import os
import os.path
import signal
import sqlite3
import subprocess
import tempfile
import threading
import time
import zipfile
from packaging.version import Version
Expand Down Expand Up @@ -61,16 +63,65 @@ def load_csv(path, encoding="UTF-8", log=None):

def execute(command, env=os.environ):
logging.debug("execute: %s", command)

# On macOS, subprocess.Popen uses fork() which crashes when C extensions
# that initialise GCD/libdispatch (e.g. spatialite) are already loaded.
# os.posix_spawnp avoids fork entirely, so it is safe in all cases.
# TODO once we have migrated to python3.13+ we can use sub process again
if hasattr(os, "posix_spawnp"):
stdout_r, stdout_w = os.pipe()
stderr_r, stderr_w = os.pipe()
file_actions = [
(os.POSIX_SPAWN_DUP2, stdout_w, 1),
(os.POSIX_SPAWN_DUP2, stderr_w, 2),
(os.POSIX_SPAWN_CLOSE, stdout_r),
(os.POSIX_SPAWN_CLOSE, stderr_r),
(os.POSIX_SPAWN_CLOSE, stdout_w),
(os.POSIX_SPAWN_CLOSE, stderr_w),
]
try:
pid = os.posix_spawnp(command[0], command, env, file_actions=file_actions)
except BaseException:
for fd in (stdout_r, stdout_w, stderr_r, stderr_w):
os.close(fd)
raise
os.close(stdout_w)
os.close(stderr_w)
stdout_data, stderr_data = [], []

def _read(fd, buf):
with os.fdopen(fd, "rb") as f:
buf.append(f.read())

t1 = threading.Thread(target=_read, args=(stdout_r, stdout_data), daemon=True)
t2 = threading.Thread(target=_read, args=(stderr_r, stderr_data), daemon=True)
t1.start()
t2.start()
t1.join(timeout=600)
t2.join(timeout=600)
if t1.is_alive() or t2.is_alive():
os.kill(pid, signal.SIGKILL)
t1.join()
t2.join()
_, raw_status = os.waitpid(pid, 0)
if os.WIFEXITED(raw_status):
returncode = os.WEXITSTATUS(raw_status)
elif os.WIFSIGNALED(raw_status):
returncode = -os.WTERMSIG(raw_status)
else:
returncode = -1
outs = stdout_data[0] if stdout_data else b""
errs = stderr_data[0] if stderr_data else b""
return returncode, outs.decode("utf-8"), errs.decode("utf-8")

proc = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env
)

try:
outs, errs = proc.communicate(timeout=600)
except subprocess.TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()

return proc.returncode, outs.decode("utf-8"), errs.decode("utf-8")


Expand Down
4 changes: 3 additions & 1 deletion digital_land/utils/gdal_utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import re
import subprocess
from functools import lru_cache
from packaging.version import Version


@lru_cache(maxsize=None)
def get_gdal_version():
out, _ = subprocess.Popen(
["ogr2ogr", "--version"],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
).communicate()

return Version(re.compile(r"GDAL\s([0-9.]+),").match(out.decode("ascii")).group(1))
return Version(re.compile(r"GDAL\s([0-9.]+)").match(out.decode("ascii")).group(1))
13 changes: 13 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
def pytest_configure(config):
# Pre-warm the gdal version cache before any test runs. On macOS, calling
# fork() (via subprocess) after a spatial library such as mod_spatialite has
# been loaded causes a segmentation fault. Caching the result here ensures
# get_gdal_version() never needs to fork after that point.
try:
from digital_land.utils.gdal_utils import get_gdal_version

get_gdal_version()
except Exception:
pass


class FakeDictReader:
# Can be used in place of load.py::DictReaderInjectResource (a subclass of
# csv.DictReader). Simply returns values from the passed in list of rows.
Expand Down