Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/sweets/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@

from __future__ import annotations

import asyncio
from concurrent.futures import ThreadPoolExecutor
from datetime import date, datetime
from pathlib import Path
from typing import Any, Literal, Optional
from typing import Any, Callable, Literal, Optional, TypeVar

from dateutil.parser import parse as parse_date
from dolphin.workflows.config import YamlModel
Expand All @@ -43,6 +45,27 @@

from ._log import log_runtime

_T = TypeVar("_T")


def _call_off_running_loop(fn: Callable[..., _T], *args: Any, **kwargs: Any) -> _T:
"""Call `fn` even when this thread already owns a running event loop.

`burst2safe` downloads via `asyncio.run()`, which raises `RuntimeError`
when invoked from a thread that already has a running loop (a Jupyter or
IPython kernel, `jupyter execute`, etc.). In that case `fn` is run in a
dedicated worker thread so its `asyncio.run()` gets a fresh loop. With no
running loop (the normal script/CLI path) `fn` is called directly.
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return fn(*args, **kwargs)

with ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(fn, *args, **kwargs).result()


FlightDirection = Literal["ASCENDING", "DESCENDING"]


Expand Down Expand Up @@ -225,7 +248,8 @@ def download(self) -> list[Path]:
self.out_dir.mkdir(parents=True, exist_ok=True)
logger.info(self.summary())

result = burst2stack(
result = _call_off_running_loop(
burst2stack,
rel_orbit=self.track,
start_date=self.start,
end_date=self.end,
Expand Down
Loading