diff --git a/.gitignore b/.gitignore index 6c798e0..6073ca5 100644 --- a/.gitignore +++ b/.gitignore @@ -161,3 +161,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# vim +*.swp diff --git a/stactask/__init__.py b/stactask/__init__.py index b223835..d0cfd1c 100644 --- a/stactask/__init__.py +++ b/stactask/__init__.py @@ -1,3 +1,4 @@ +from .cli import CLI from .config import DownloadConfig from .payload import Payload from .task import Task @@ -11,6 +12,7 @@ __all__ = [ "__version__", "__version_tuple__", + "CLI", "Task", "Payload", "DownloadConfig", diff --git a/stactask/cli.py b/stactask/cli.py new file mode 100644 index 0000000..17e0998 --- /dev/null +++ b/stactask/cli.py @@ -0,0 +1,243 @@ +import argparse +import json +import logging +import subprocess +import sys +import warnings +from pathlib import Path +from typing import Any, cast + +import fsspec + +from stactask.task import Task + + +class DeprecatedStoreTrueAction(argparse._StoreTrueAction): + def __call__(self, parser, namespace, values, option_string=None) -> None: # type: ignore + warnings.warn(f"Argument {self.option_strings} is deprecated.", stacklevel=2) + super().__call__(parser, namespace, values, option_string) + + +class CLI: + tasks: dict[str, Task] + + def __init__(self) -> None: + self.tasks = {} + + self._build_argparser() + + def _build_argparser(self) -> None: + self._parser = argparse.ArgumentParser(description="STAC Task management tools") + self._parser.add_argument( + "--version", + help="Print version and exit", + action="version", + version="???", + ) + self._parser.add_argument( + "--logging", + default="INFO", + help="DEBUG, INFO, WARN, ERROR, CRITICAL", + ) + self._parser.add_argument( + "--output", + default=None, + help="Write output payload to this URL", + ) + + pparser = argparse.ArgumentParser(add_help=False) + subparsers = self._parser.add_subparsers(dest="command") + + # run + run_parser = subparsers.add_parser( + "run", + parents=[pparser], + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + help="Process STAC Item Collection", + ) + run_parser.add_argument( + "input", + nargs="?", + help="Full path of item collection to process (s3 or local)", + ) + + run_parser.add_argument( + "--task", + required=True, + help="Name of the task you wish to run", + ) + + + # additional options + run_parser.add_argument( + "--workdir", + default=None, + type=Path, + help="Use this as work directory. Will be created.", + ) + + run_parser.add_argument( + "--save-workdir", + dest="save_workdir", + action="store_true", + default=False, + help="Save workdir after completion", + ) + + # skips are deprecated in favor of boolean optionals + run_parser.add_argument( + "--skip-upload", + dest="skip_upload", + action=DeprecatedStoreTrueAction, + default=False, + help="DEPRECATED: Skip uploading of generated assets and STAC Items", + ) + run_parser.add_argument( + "--skip-validation", + dest="skip_validation", + action=DeprecatedStoreTrueAction, + default=False, + help="DEPRECATED: Skip validation of input payload", + ) + + run_parser.add_argument( + "--upload", + dest="upload", + action="store_true", + default=True, + help="Upload generated assets and resulting STAC Items", + ) + run_parser.add_argument( + "--no-upload", + dest="upload", + action="store_false", + help="Don't upload generated assets and resulting STAC Items", + ) + run_parser.add_argument( + "--validate", + dest="validate", + action="store_true", + default=True, + help="Validate input payload", + ) + run_parser.add_argument( + "--no-validate", + dest="validate", + action="store_false", + help="Don't validate input payload", + ) + + run_parser.add_argument( + "--local", + action="store_true", + default=False, + help=""" Run local mode +(save-workdir = True, upload = False, +workdir = 'local-output', output = 'local-output/output-payload.json') """, + ) + + # metadata + build_parser = subparsers.add_parser( + "metadata", + parents=[pparser], + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + help="Output metadata document for all registered STAC Tasks", + ) + + build_parser.add_argument( + "--output", + default=None, + help="Write output task metadata to this URL", + ) + + def register_task(self, task: Task) -> None: + self.tasks[task.name] = task + + def execute(self) -> None: + args = self._parse_args(sys.argv[1:]) + cmd = args.pop("command") + + loglevel = args.pop("logging") + logging.basicConfig(level=loglevel) + + href_out = args.pop("output", None) + + # quiet these loud loggers + for ql in [ + "botocore", + "s3transfer", + "urllib3", + "fsspec", + "asyncio", + "aiobotocore", + ]: + logging.getLogger(ql).propagate = False + + match cmd: + case "run": + output = self._run_task(args) + case "metadata": + output = self._task_metadata() + + self._write_output(output, href_out) + + def _parse_args(self, args: list[str]) -> dict[str, Any]: + # turn Namespace into dictionary + pargs = vars(self._parser.parse_args(args)) + # only keep keys that are not None + pargs = {k: v for k, v in pargs.items() if v is not None} + + if pargs.pop("skip_validation", False): + pargs["validate"] = False + if pargs.pop("skip_upload", False): + pargs["upload"] = False + + if pargs.pop("local", False): + pargs["save_workdir"] = True + pargs["upload"] = False + if pargs.get("workdir") is None: + pargs["workdir"] = "local-output" + if pargs.get("output") is None: + pargs["output"] = Path(pargs["workdir"]) / "output-payload.json" + + if pargs.get("command", None) is None: + self._parser.print_help() + sys.exit(0) + + return pargs + + def _task_metadata(self) -> dict[str, Any]: + # create task metadata document + from stactask import __version__ + + metadata: dict[str, Any] = { + "stactask_version": __version__, + "tasks": {}, + } + for name, task in self.tasks.items(): + metadata["tasks"][name] = task.metadata() + + return metadata + + def _run_task(self, args: dict[str, Any]) -> dict[str, Any]: + href = args.pop("input", None) + + # read input + if href is None: + payload = json.load(sys.stdin) + else: + with fsspec.open(href) as f: + payload = json.loads(f.read()) + + # run task handler + task_name = args.pop("task") + payload_out = self.tasks[task_name].handler(payload, **args) + + return payload_out + + def _write_output(self, output: dict[str, Any], href_out: str | None = None) -> None: + if href_out is None: + json.dump(output, sys.stdout) + else: + with fsspec.open(href_out, "w") as f: + f.write(json.dumps(output)) diff --git a/stactask/task.py b/stactask/task.py index 7ae8762..1b0343d 100644 --- a/stactask/task.py +++ b/stactask/task.py @@ -15,6 +15,7 @@ import fsspec from boto3utils import s3 +from pydantic import BaseModel from pystac import Asset, Item, ItemCollection, Link from pystac.layout import LayoutTemplate from pystac.utils import datetime_to_str @@ -93,6 +94,9 @@ class Task(ABC): description = "A task for doing things" version = "0.1.0" + input_model: BaseModel | None = None + output_model: BaseModel | None = None + def __init__( self: "Task", payload: dict[str, Any], @@ -151,6 +155,22 @@ def _payload(self, value: dict[str, Any]) -> None: ) self.payload = Payload(value) + @classmethod + def metadata(cls) -> dict[str, str]: + input_schema = output_schema = None + if cls.input_model: + input_schema = cls.input_model.model_json_schema() + if cls.output_model: + output_schema = cls.output_model.model_json_schema() + + return { + "name": cls.name, + "version": cls.version, + "description": cls.description, + "input_schema": input_schema, + "output_schema": output_schema, + } + @property def process_definition(self) -> dict[str, Any]: warnings.warn( @@ -571,178 +591,6 @@ def handler(cls, payload: dict[str, Any], **kwargs: Any) -> dict[str, Any]: if task: task.cleanup_workdir() - @classmethod - def parse_args(cls, args: list[str]) -> dict[str, Any]: - dhf = argparse.ArgumentDefaultsHelpFormatter - parser0 = argparse.ArgumentParser(description=cls.description) - parser0.add_argument( - "--version", - help="Print version and exit", - action="version", - version=cls.version, - ) - - pparser = argparse.ArgumentParser(add_help=False) - pparser.add_argument( - "--logging", - default="INFO", - help="DEBUG, INFO, WARN, ERROR, CRITICAL", - ) - - subparsers = parser0.add_subparsers(dest="command") - - # run - parser = subparsers.add_parser( - "run", - parents=[pparser], - formatter_class=dhf, - help="Process STAC Item Collection", - ) - parser.add_argument( - "input", - nargs="?", - help="Full path of item collection to process (s3 or local)", - ) - - parser.add_argument( - "--output", - default=None, - help="Write output payload to this URL", - ) - - # additional options - parser.add_argument( - "--workdir", - default=None, - type=Path, - help="Use this as work directory. Will be created.", - ) - - parser.add_argument( - "--save-workdir", - dest="save_workdir", - action="store_true", - default=False, - help="Save workdir after completion", - ) - - # skips are deprecated in favor of boolean optionals - parser.add_argument( - "--skip-upload", - dest="skip_upload", - action=DeprecatedStoreTrueAction, - default=False, - help="DEPRECATED: Skip uploading of generated assets and STAC Items", - ) - parser.add_argument( - "--skip-validation", - dest="skip_validation", - action=DeprecatedStoreTrueAction, - default=False, - help="DEPRECATED: Skip validation of input payload", - ) - - parser.add_argument( - "--upload", - dest="upload", - action="store_true", - default=True, - help="Upload generated assets and resulting STAC Items", - ) - parser.add_argument( - "--no-upload", - dest="upload", - action="store_false", - help="Don't upload generated assets and resulting STAC Items", - ) - parser.add_argument( - "--validate", - dest="validate", - action="store_true", - default=True, - help="Validate input payload", - ) - parser.add_argument( - "--no-validate", - dest="validate", - action="store_false", - help="Don't validate input payload", - ) - - parser.add_argument( - "--local", - action="store_true", - default=False, - help=""" Run local mode -(save-workdir = True, upload = False, -workdir = 'local-output', output = 'local-output/output-payload.json') """, - ) - - # turn Namespace into dictionary - pargs = vars(parser0.parse_args(args)) - # only keep keys that are not None - pargs = {k: v for k, v in pargs.items() if v is not None} - - if pargs.pop("skip_validation", False): - pargs["validate"] = False - if pargs.pop("skip_upload", False): - pargs["upload"] = False - - if pargs.pop("local", False): - pargs["save_workdir"] = True - pargs["upload"] = False - if pargs.get("workdir") is None: - pargs["workdir"] = "local-output" - if pargs.get("output") is None: - pargs["output"] = Path(pargs["workdir"]) / "output-payload.json" - - if pargs.get("command", None) is None: - parser.print_help() - sys.exit(0) - - return pargs - - @classmethod - def cli(cls) -> None: - args = cls.parse_args(sys.argv[1:]) - cmd = args.pop("command") - - # logging - loglevel = args.pop("logging") - logging.basicConfig(level=loglevel) - - # quiet these loud loggers - for ql in [ - "botocore", - "s3transfer", - "urllib3", - "fsspec", - "asyncio", - "aiobotocore", - ]: - logging.getLogger(ql).propagate = False - - if cmd == "run": - href = args.pop("input", None) - href_out = args.pop("output", None) - - # read input - if href is None: - payload = json.load(sys.stdin) - else: - with fsspec.open(href) as f: - payload = json.loads(f.read()) - - # run task handler - payload_out = cls.handler(payload, **args) - - # write output - if href_out is None: - json.dump(payload_out, sys.stdout) - else: - with fsspec.open(href_out, "w") as f: - f.write(json.dumps(payload_out)) - if sys.platform == "win32": from asyncio.proactor_events import _ProactorBasePipeTransport diff --git a/tests/tasks.py b/tests/tasks.py index fb094e3..2b42da2 100644 --- a/tests/tasks.py +++ b/tests/tasks.py @@ -1,5 +1,7 @@ from typing import Any +from pydantic import BaseModel + from stactask import Task from stactask.exceptions import FailedValidation @@ -32,3 +34,24 @@ class DerivedItemTask(Task): def process(self, **kwargs: Any) -> list[dict[str, Any]]: assert kwargs["parameter"] == "value" return [self.create_item_from_item(self.items_as_dicts[0])] + + +class InputModel(BaseModel): + a: int + b: str + + +class OutputModel(BaseModel): + c: float + + +class SchemaTask(Task): + name = "schema-task" + description = "this task defines input and output models" + version = "0.2.0" + + input_model = InputModel + output_model = OutputModel + + def process(self, **kwargs: Any) -> list[dict[str, Any]]: + return [OutputModel(c=2.7)] diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..d619f21 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,102 @@ +from tempfile import TemporaryDirectory + +import pytest + +from stactask import __version__, CLI + +from .tasks import NothingTask + + +@pytest.fixture +def cli() -> CLI: + cli = CLI() + cli.register_task(NothingTask) + return cli + + +def test_parse_no_args(cli) -> None: + with pytest.raises(SystemExit): + cli._parse_args([]) + + +def test_parse_args(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --save-workdir".split() + ) + assert args["command"] == "run" + assert args["logging"] == "INFO" + assert args["input"] == "input" + assert args["save_workdir"] is True + assert args["upload"] is True + assert args["validate"] is True + + +def test_parse_args_deprecated_skip(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --skip-upload --skip-validation".split() + ) + assert args["upload"] is False + assert args["validate"] is False + + +def test_parse_args_no_upload_and_no_validation(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --no-upload --no-validate".split() + ) + assert args["upload"] is False + assert args["validate"] is False + + +def test_parse_args_no_upload_and_validation(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --no-upload --validate".split() + ) + assert args["upload"] is False + assert args["validate"] is True + + +def test_parse_args_upload_and_no_validation(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --upload --no-validate".split() + ) + assert args["upload"] is True + assert args["validate"] is False + + +def test_parse_args_upload_and_validation(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --upload --validate".split() + ) + assert args["upload"] is True + assert args["validate"] is True + + +def test_run_task_command(cli) -> None: + with TemporaryDirectory() as tmpdir: + tmpfile_name = f"{tmpdir}/input" + with open(tmpfile_name, 'w') as fp: + fp.write('{}') + fp.close() + + args = cli._parse_args( + f"run {tmpfile_name} --task nothing-task".split() + ) + args.pop("command") + args.pop("logging") + output = cli._run_task(args) + assert output == {"features": []} + + +def test_metadata_command(cli) -> None: + assert cli._task_metadata() == { + "stactask_version": __version__, + "tasks": { + "nothing-task": { + "name": "nothing-task", + "description": "this task does nothing", + "version": "0.1.0", + "input_schema": None, + "output_schema": None, + } + } + } diff --git a/tests/test_task.py b/tests/test_task.py index c6d7b48..5af4f5b 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -13,7 +13,7 @@ from stactask.exceptions import FailedValidation from stactask.task import Task -from .tasks import DerivedItemTask, FailValidateTask, NothingTask +from .tasks import DerivedItemTask, FailValidateTask, NothingTask, SchemaTask testpath = Path(__file__).parent cassettepath = testpath / "fixtures" / "cassettes" @@ -38,6 +38,11 @@ def derived_item_task(payload: dict[str, Any]) -> Task: return DerivedItemTask(payload) +@pytest.fixture +def schema_task(payload: dict[str, Any]) -> Task: + return SchemaTask(payload) + + @pytest.fixture def mock_s3_client() -> Callable[[], s3]: """Recreate the global S3 client within mock context to avoid state pollution. @@ -197,51 +202,6 @@ def test_task_handler(payload: dict[str, Any]) -> None: assert derived_link["href"] == self_link["href"] -def test_parse_no_args() -> None: - with pytest.raises(SystemExit): - NothingTask.parse_args([]) - - -def test_parse_args() -> None: - args = NothingTask.parse_args("run input --save-workdir".split()) - assert args["command"] == "run" - assert args["logging"] == "INFO" - assert args["input"] == "input" - assert args["save_workdir"] is True - assert args["upload"] is True - assert args["validate"] is True - - -def test_parse_args_deprecated_skip() -> None: - args = NothingTask.parse_args("run input --skip-upload --skip-validation".split()) - assert args["upload"] is False - assert args["validate"] is False - - -def test_parse_args_no_upload_and_no_validation() -> None: - args = NothingTask.parse_args("run input --no-upload --no-validate".split()) - assert args["upload"] is False - assert args["validate"] is False - - -def test_parse_args_no_upload_and_validation() -> None: - args = NothingTask.parse_args("run input --no-upload --validate".split()) - assert args["upload"] is False - assert args["validate"] is True - - -def test_parse_args_upload_and_no_validation() -> None: - args = NothingTask.parse_args("run input --upload --no-validate".split()) - assert args["upload"] is True - assert args["validate"] is False - - -def test_parse_args_upload_and_validation() -> None: - args = NothingTask.parse_args("run input --upload --validate".split()) - assert args["upload"] is True - assert args["validate"] is True - - def test_collection_mapping(nothing_task: Task) -> None: assert nothing_task.collection_mapping == { "sentinel-2-l2a": "$[?(@.id =~ 'S2[AB].*')]", @@ -275,5 +235,35 @@ def test_s3_upload(nothing_task: Task, mock_s3_client: Callable[[], s3]) -> None ) -if __name__ == "__main__": - output = NothingTask.cli() +def test_task_metadata(schema_task: Task) -> None: + assert schema_task.metadata() == { + "name": "schema-task", + "version": "0.2.0", + "description": "this task defines input and output models", + "input_schema": { + "properties": { + "a": { + "title": "A", + "type": "integer", + }, + "b": { + "title": "B", + "type": "string", + }, + }, + "required": ["a", "b"], + "title": "InputModel", + "type": "object", + }, + "output_schema": { + "properties": { + "c": { + "title": "C", + "type": "number", + }, + }, + "required": ["c"], + "title": "OutputModel", + "type": "object", + }, + }