From cfdfacf9b244c4d04b9e7cab5bd47bdb94485063 Mon Sep 17 00:00:00 2001 From: Nate Rubin Date: Tue, 16 Dec 2025 14:32:16 -0500 Subject: [PATCH 1/5] add build command to cli --- stactask/task.py | 99 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 80 insertions(+), 19 deletions(-) diff --git a/stactask/task.py b/stactask/task.py index 7ae8762..2adb5b3 100644 --- a/stactask/task.py +++ b/stactask/task.py @@ -1,5 +1,6 @@ import argparse import asyncio +import inspect import json import logging import sys @@ -10,11 +11,13 @@ from datetime import datetime, timezone from pathlib import Path from shutil import rmtree +import subprocess from tempfile import mkdtemp from typing import Any import fsspec from boto3utils import s3 +from pydantic import BaseModel from pystac import Asset, Item, ItemCollection, Link from pystac.layout import LayoutTemplate from pystac.utils import datetime_to_str @@ -27,7 +30,7 @@ upload_item_json_to_s3, ) from .config import DownloadConfig -from .exceptions import FailedValidation +from .exceptions import FailedValidation, InvalidOutputSchema from .logging import TaskLoggerAdapter from .payload import Payload from .utils import find_collection as utils_find_collection @@ -678,6 +681,26 @@ def parse_args(cls, args: list[str]) -> dict[str, Any]: workdir = 'local-output', output = 'local-output/output-payload.json') """, ) + # build + parser = subparsers.add_parser( + "build", + parents=[pparser], + formatter_class=dhf, + help="Build STAC Task", + ) + + parser.add_argument( + "--image-uri", + required=True, + help="URI to push docker image to", + ) + + parser.add_argument( + "--output", + default=None, + help="Write output task metadata to this URL", + ) + # turn Namespace into dictionary pargs = vars(parser0.parse_args(args)) # only keep keys that are not None @@ -722,26 +745,64 @@ def cli(cls) -> None: ]: logging.getLogger(ql).propagate = False - if cmd == "run": - href = args.pop("input", None) - href_out = args.pop("output", None) + match cmd: + case "run": + href = args.pop("input", None) + href_out = args.pop("output", None) - # read input - if href is None: - payload = json.load(sys.stdin) - else: - with fsspec.open(href) as f: - payload = json.loads(f.read()) - - # run task handler - payload_out = cls.handler(payload, **args) + # read input + if href is None: + payload = json.load(sys.stdin) + else: + with fsspec.open(href) as f: + payload = json.loads(f.read()) + + # run task handler + payload_out = cls.handler(payload, **args) + + # write output + if href_out is None: + json.dump(payload_out, sys.stdout) + else: + with fsspec.open(href_out, "w") as f: + f.write(json.dumps(payload_out)) + case "build": + # build docker image + subprocess.check_call([ + 'docker', 'buildx', 'build', + '-t', f"{args['image_uri']}:{cls.version}", + '-f', 'Dockerfile', '.', + ]) + + # push docker image + def _push_image(): + return f"{args['image_uri']}:{cls.version}", + + uri = _push_image() + + # create task metadata document + input_schema = output_schema = None + if cls.input_model: + input_schema = cls.input_model.model_json_schema() + if cls.output_model: + output_schema = cls.output_model.model_json_schema() + + task_metadata = { + "name": cls.name, + "version": cls.version, + "description": cls.description, + "input_schema": input_schema, + "output_schema": output_schema, + "uri": uri, + } - # write output - if href_out is None: - json.dump(payload_out, sys.stdout) - else: - with fsspec.open(href_out, "w") as f: - f.write(json.dumps(payload_out)) + # write output + href_out = args.pop("output", None) + if href_out is None: + json.dump(task_metadata, sys.stdout) + else: + with fsspec.open(href_out, "w") as f: + f.write(json.dumps(task_metadata)) if sys.platform == "win32": From 5ada4d67beead14d8a2a555d1d236fdb3ac7dd3e Mon Sep 17 00:00:00 2001 From: Nate Rubin Date: Wed, 17 Dec 2025 14:22:01 -0500 Subject: [PATCH 2/5] factor out CLI --- stactask/__init__.py | 2 + stactask/cli.py | 266 +++++++++++++++++++++++++++++++++++++++++++ stactask/task.py | 230 ------------------------------------- 3 files changed, 268 insertions(+), 230 deletions(-) create mode 100644 stactask/cli.py diff --git a/stactask/__init__.py b/stactask/__init__.py index b223835..d0cfd1c 100644 --- a/stactask/__init__.py +++ b/stactask/__init__.py @@ -1,3 +1,4 @@ +from .cli import CLI from .config import DownloadConfig from .payload import Payload from .task import Task @@ -11,6 +12,7 @@ __all__ = [ "__version__", "__version_tuple__", + "CLI", "Task", "Payload", "DownloadConfig", diff --git a/stactask/cli.py b/stactask/cli.py new file mode 100644 index 0000000..099236a --- /dev/null +++ b/stactask/cli.py @@ -0,0 +1,266 @@ +import argparse +import json +import logging +import subprocess +import sys +import warnings +from pathlib import Path +from typing import Any + +import fsspec + +from stactask.task import Task + + +class DeprecatedStoreTrueAction(argparse._StoreTrueAction): + def __call__(self, parser, namespace, values, option_string=None) -> None: # type: ignore + warnings.warn(f"Argument {self.option_strings} is deprecated.", stacklevel=2) + super().__call__(parser, namespace, values, option_string) + + +class CLI: + tasks: dict[str, Task] = {} + + def __init__(self) -> None: + self._build_argparser() + + def _build_argparser(self) -> None: + self._parser = argparse.ArgumentParser(description="?!?!?!?!") + self._parser.add_argument( + "--version", + help="Print version and exit", + action="version", + version="???", + ) + pparser = argparse.ArgumentParser(add_help=False) + self._parser.add_argument( + "--logging", + default="INFO", + help="DEBUG, INFO, WARN, ERROR, CRITICAL", + ) + + subparsers = self._parser.add_subparsers(dest="command") + + # run + run_parser = subparsers.add_parser( + "run", + parents=[pparser], + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + help="Process STAC Item Collection", + ) + run_parser.add_argument( + "input", + nargs="?", + help="Full path of item collection to process (s3 or local)", + ) + + run_parser.add_argument( + "--task", + required=True, + help="Name of the task you wish to run", + ) + + run_parser.add_argument( + "--output", + default=None, + help="Write output payload to this URL", + ) + + # additional options + run_parser.add_argument( + "--workdir", + default=None, + type=Path, + help="Use this as work directory. Will be created.", + ) + + run_parser.add_argument( + "--save-workdir", + dest="save_workdir", + action="store_true", + default=False, + help="Save workdir after completion", + ) + + # skips are deprecated in favor of boolean optionals + run_parser.add_argument( + "--skip-upload", + dest="skip_upload", + action=DeprecatedStoreTrueAction, + default=False, + help="DEPRECATED: Skip uploading of generated assets and STAC Items", + ) + run_parser.add_argument( + "--skip-validation", + dest="skip_validation", + action=DeprecatedStoreTrueAction, + default=False, + help="DEPRECATED: Skip validation of input payload", + ) + + run_parser.add_argument( + "--upload", + dest="upload", + action="store_true", + default=True, + help="Upload generated assets and resulting STAC Items", + ) + run_parser.add_argument( + "--no-upload", + dest="upload", + action="store_false", + help="Don't upload generated assets and resulting STAC Items", + ) + run_parser.add_argument( + "--validate", + dest="validate", + action="store_true", + default=True, + help="Validate input payload", + ) + run_parser.add_argument( + "--no-validate", + dest="validate", + action="store_false", + help="Don't validate input payload", + ) + + run_parser.add_argument( + "--local", + action="store_true", + default=False, + help=""" Run local mode +(save-workdir = True, upload = False, +workdir = 'local-output', output = 'local-output/output-payload.json') """, + ) + + # build + build_parser = subparsers.add_parser( + "build", + parents=[pparser], + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + help="Build all STAC Tasks", + ) + + build_parser.add_argument( + "--image-tag", + required=True, + help="URI to push docker image to", + ) + + build_parser.add_argument( + "--output", + default=None, + help="Write output task metadata to this URL", + ) + + def register_task(self, task: Task) -> None: + self.tasks[task.name] = task + + def execute(self) -> None: + args = self._parse_args(sys.argv[1:]) + cmd = args.pop("command") + + loglevel = args.pop("logging") + logging.basicConfig(level=loglevel) + + # quiet these loud loggers + for ql in [ + "botocore", + "s3transfer", + "urllib3", + "fsspec", + "asyncio", + "aiobotocore", + ]: + logging.getLogger(ql).propagate = False + + match cmd: + case "run": + self._run_task(args) + case "build": + self._build_tasks(args) + + def _parse_args(self, args: list[str]) -> dict[str, Any]: + # turn Namespace into dictionary + pargs = vars(self._parser.parse_args(args)) + # only keep keys that are not None + pargs = {k: v for k, v in pargs.items() if v is not None} + + if pargs.pop("skip_validation", False): + pargs["validate"] = False + if pargs.pop("skip_upload", False): + pargs["upload"] = False + + if pargs.pop("local", False): + pargs["save_workdir"] = True + pargs["upload"] = False + if pargs.get("workdir") is None: + pargs["workdir"] = "local-output" + if pargs.get("output") is None: + pargs["output"] = Path(pargs["workdir"]) / "output-payload.json" + + if pargs.get("command", None) is None: + self._parser.print_help() + sys.exit(0) + + return pargs + + def _build_tasks(self, args: dict[str, Any]) -> None: + # create task metadata document + from stactask import __version__ + metadata = { + "stactask_version": __version__, + "tasks": {}, + } + for name, task in self.tasks.items(): + input_schema = output_schema = None + if task.input_model: + input_schema = task.input_model.model_json_schema() + if task.output_model: + output_schema = task.output_model.model_json_schema() + + task_metadata = { + "name": name, + "version": task.version, + "description": task.description, + "input_schema": input_schema, + "output_schema": output_schema, + } + metadata["tasks"][name] = task_metadata + + # build docker image + subprocess.check_call([ + 'docker', 'buildx', 'build', + '-t', args['image_tag'], + '--label', f"stactask_metadata={json.dumps(metadata)}", + '-f', 'Dockerfile', '.', + ]) + + # push docker image + def _push_image(): + return args['image_tag'] + + _push_image() + + def _run_task(self, args: dict[str, Any]) -> None: + href = args.pop("input", None) + href_out = args.pop("output", None) + + # read input + if href is None: + payload = json.load(sys.stdin) + else: + with fsspec.open(href) as f: + payload = json.loads(f.read()) + + # run task handler + task_name = args.pop("task") + payload_out = self.tasks[task_name].handler(payload, **args) + + # write output + if href_out is None: + json.dump(payload_out, sys.stdout) + else: + with fsspec.open(href_out, "w") as f: + f.write(json.dumps(payload_out)) diff --git a/stactask/task.py b/stactask/task.py index 2adb5b3..39b3db2 100644 --- a/stactask/task.py +++ b/stactask/task.py @@ -574,236 +574,6 @@ def handler(cls, payload: dict[str, Any], **kwargs: Any) -> dict[str, Any]: if task: task.cleanup_workdir() - @classmethod - def parse_args(cls, args: list[str]) -> dict[str, Any]: - dhf = argparse.ArgumentDefaultsHelpFormatter - parser0 = argparse.ArgumentParser(description=cls.description) - parser0.add_argument( - "--version", - help="Print version and exit", - action="version", - version=cls.version, - ) - - pparser = argparse.ArgumentParser(add_help=False) - pparser.add_argument( - "--logging", - default="INFO", - help="DEBUG, INFO, WARN, ERROR, CRITICAL", - ) - - subparsers = parser0.add_subparsers(dest="command") - - # run - parser = subparsers.add_parser( - "run", - parents=[pparser], - formatter_class=dhf, - help="Process STAC Item Collection", - ) - parser.add_argument( - "input", - nargs="?", - help="Full path of item collection to process (s3 or local)", - ) - - parser.add_argument( - "--output", - default=None, - help="Write output payload to this URL", - ) - - # additional options - parser.add_argument( - "--workdir", - default=None, - type=Path, - help="Use this as work directory. Will be created.", - ) - - parser.add_argument( - "--save-workdir", - dest="save_workdir", - action="store_true", - default=False, - help="Save workdir after completion", - ) - - # skips are deprecated in favor of boolean optionals - parser.add_argument( - "--skip-upload", - dest="skip_upload", - action=DeprecatedStoreTrueAction, - default=False, - help="DEPRECATED: Skip uploading of generated assets and STAC Items", - ) - parser.add_argument( - "--skip-validation", - dest="skip_validation", - action=DeprecatedStoreTrueAction, - default=False, - help="DEPRECATED: Skip validation of input payload", - ) - - parser.add_argument( - "--upload", - dest="upload", - action="store_true", - default=True, - help="Upload generated assets and resulting STAC Items", - ) - parser.add_argument( - "--no-upload", - dest="upload", - action="store_false", - help="Don't upload generated assets and resulting STAC Items", - ) - parser.add_argument( - "--validate", - dest="validate", - action="store_true", - default=True, - help="Validate input payload", - ) - parser.add_argument( - "--no-validate", - dest="validate", - action="store_false", - help="Don't validate input payload", - ) - - parser.add_argument( - "--local", - action="store_true", - default=False, - help=""" Run local mode -(save-workdir = True, upload = False, -workdir = 'local-output', output = 'local-output/output-payload.json') """, - ) - - # build - parser = subparsers.add_parser( - "build", - parents=[pparser], - formatter_class=dhf, - help="Build STAC Task", - ) - - parser.add_argument( - "--image-uri", - required=True, - help="URI to push docker image to", - ) - - parser.add_argument( - "--output", - default=None, - help="Write output task metadata to this URL", - ) - - # turn Namespace into dictionary - pargs = vars(parser0.parse_args(args)) - # only keep keys that are not None - pargs = {k: v for k, v in pargs.items() if v is not None} - - if pargs.pop("skip_validation", False): - pargs["validate"] = False - if pargs.pop("skip_upload", False): - pargs["upload"] = False - - if pargs.pop("local", False): - pargs["save_workdir"] = True - pargs["upload"] = False - if pargs.get("workdir") is None: - pargs["workdir"] = "local-output" - if pargs.get("output") is None: - pargs["output"] = Path(pargs["workdir"]) / "output-payload.json" - - if pargs.get("command", None) is None: - parser.print_help() - sys.exit(0) - - return pargs - - @classmethod - def cli(cls) -> None: - args = cls.parse_args(sys.argv[1:]) - cmd = args.pop("command") - - # logging - loglevel = args.pop("logging") - logging.basicConfig(level=loglevel) - - # quiet these loud loggers - for ql in [ - "botocore", - "s3transfer", - "urllib3", - "fsspec", - "asyncio", - "aiobotocore", - ]: - logging.getLogger(ql).propagate = False - - match cmd: - case "run": - href = args.pop("input", None) - href_out = args.pop("output", None) - - # read input - if href is None: - payload = json.load(sys.stdin) - else: - with fsspec.open(href) as f: - payload = json.loads(f.read()) - - # run task handler - payload_out = cls.handler(payload, **args) - - # write output - if href_out is None: - json.dump(payload_out, sys.stdout) - else: - with fsspec.open(href_out, "w") as f: - f.write(json.dumps(payload_out)) - case "build": - # build docker image - subprocess.check_call([ - 'docker', 'buildx', 'build', - '-t', f"{args['image_uri']}:{cls.version}", - '-f', 'Dockerfile', '.', - ]) - - # push docker image - def _push_image(): - return f"{args['image_uri']}:{cls.version}", - - uri = _push_image() - - # create task metadata document - input_schema = output_schema = None - if cls.input_model: - input_schema = cls.input_model.model_json_schema() - if cls.output_model: - output_schema = cls.output_model.model_json_schema() - - task_metadata = { - "name": cls.name, - "version": cls.version, - "description": cls.description, - "input_schema": input_schema, - "output_schema": output_schema, - "uri": uri, - } - - # write output - href_out = args.pop("output", None) - if href_out is None: - json.dump(task_metadata, sys.stdout) - else: - with fsspec.open(href_out, "w") as f: - f.write(json.dumps(task_metadata)) - if sys.platform == "win32": from asyncio.proactor_events import _ProactorBasePipeTransport From d4550a99a4f6311026d8b1be238089aee8fcd779 Mon Sep 17 00:00:00 2001 From: Nate Rubin Date: Wed, 17 Dec 2025 14:25:30 -0500 Subject: [PATCH 3/5] ignore swp files --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 6c798e0..6073ca5 100644 --- a/.gitignore +++ b/.gitignore @@ -161,3 +161,6 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + +# vim +*.swp From c5dd6175b66abefb0fd72a590f4a9cbbca0e3fe7 Mon Sep 17 00:00:00 2001 From: Nate Rubin Date: Wed, 17 Dec 2025 15:10:00 -0500 Subject: [PATCH 4/5] move cli tests to new test module --- stactask/cli.py | 38 +++++++++++++++++-------- stactask/task.py | 7 +++-- tests/test_cli.py | 69 ++++++++++++++++++++++++++++++++++++++++++++++ tests/test_task.py | 49 -------------------------------- 4 files changed, 99 insertions(+), 64 deletions(-) create mode 100644 tests/test_cli.py diff --git a/stactask/cli.py b/stactask/cli.py index 099236a..8c43d6e 100644 --- a/stactask/cli.py +++ b/stactask/cli.py @@ -5,7 +5,7 @@ import sys import warnings from pathlib import Path -from typing import Any +from typing import Any, cast import fsspec @@ -19,9 +19,11 @@ def __call__(self, parser, namespace, values, option_string=None) -> None: # ty class CLI: - tasks: dict[str, Task] = {} + tasks: dict[str, Task] def __init__(self) -> None: + self.tasks = {} + self._build_argparser() def _build_argparser(self) -> None: @@ -209,7 +211,8 @@ def _parse_args(self, args: list[str]) -> dict[str, Any]: def _build_tasks(self, args: dict[str, Any]) -> None: # create task metadata document from stactask import __version__ - metadata = { + + metadata: dict[str, Any] = { "stactask_version": __version__, "tasks": {}, } @@ -230,18 +233,29 @@ def _build_tasks(self, args: dict[str, Any]) -> None: metadata["tasks"][name] = task_metadata # build docker image - subprocess.check_call([ - 'docker', 'buildx', 'build', - '-t', args['image_tag'], - '--label', f"stactask_metadata={json.dumps(metadata)}", - '-f', 'Dockerfile', '.', - ]) + subprocess.check_call( # noqa: S603 + [ # noqa: S607 + "docker", + "buildx", + "build", + "-t", + args["image_tag"], + "--label", + f"stactask_metadata={json.dumps(metadata)}", + "-f", + "Dockerfile", + ".", + ], + ) # push docker image - def _push_image(): - return args['image_tag'] + def _push_image(image_tag: str) -> None: + # noop, replace later + return None - _push_image() + image_tag = args["image_tag"] + cast(str, image_tag) + _push_image(args["image_tag"]) def _run_task(self, args: dict[str, Any]) -> None: href = args.pop("input", None) diff --git a/stactask/task.py b/stactask/task.py index 39b3db2..c908ea9 100644 --- a/stactask/task.py +++ b/stactask/task.py @@ -1,6 +1,5 @@ import argparse import asyncio -import inspect import json import logging import sys @@ -11,7 +10,6 @@ from datetime import datetime, timezone from pathlib import Path from shutil import rmtree -import subprocess from tempfile import mkdtemp from typing import Any @@ -30,7 +28,7 @@ upload_item_json_to_s3, ) from .config import DownloadConfig -from .exceptions import FailedValidation, InvalidOutputSchema +from .exceptions import FailedValidation from .logging import TaskLoggerAdapter from .payload import Payload from .utils import find_collection as utils_find_collection @@ -96,6 +94,9 @@ class Task(ABC): description = "A task for doing things" version = "0.1.0" + input_model: BaseModel | None = None + output_model: BaseModel | None = None + def __init__( self: "Task", payload: dict[str, Any], diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..aeca9a4 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,69 @@ +import pytest + +from stactask import CLI + +from .tasks import NothingTask + + +@pytest.fixture +def cli() -> CLI: + cli = CLI() + cli.register_task(NothingTask) + return cli + + +def test_parse_no_args(cli) -> None: + with pytest.raises(SystemExit): + cli._parse_args([]) + + +def test_parse_args(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --save-workdir".split() + ) + assert args["command"] == "run" + assert args["logging"] == "INFO" + assert args["input"] == "input" + assert args["save_workdir"] is True + assert args["upload"] is True + assert args["validate"] is True + + +def test_parse_args_deprecated_skip(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --skip-upload --skip-validation".split() + ) + assert args["upload"] is False + assert args["validate"] is False + + +def test_parse_args_no_upload_and_no_validation(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --no-upload --no-validate".split() + ) + assert args["upload"] is False + assert args["validate"] is False + + +def test_parse_args_no_upload_and_validation(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --no-upload --validate".split() + ) + assert args["upload"] is False + assert args["validate"] is True + + +def test_parse_args_upload_and_no_validation(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --upload --no-validate".split() + ) + assert args["upload"] is True + assert args["validate"] is False + + +def test_parse_args_upload_and_validation(cli) -> None: + args = cli._parse_args( + "run input --task nothing-task --upload --validate".split() + ) + assert args["upload"] is True + assert args["validate"] is True diff --git a/tests/test_task.py b/tests/test_task.py index c6d7b48..7302fcf 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -197,51 +197,6 @@ def test_task_handler(payload: dict[str, Any]) -> None: assert derived_link["href"] == self_link["href"] -def test_parse_no_args() -> None: - with pytest.raises(SystemExit): - NothingTask.parse_args([]) - - -def test_parse_args() -> None: - args = NothingTask.parse_args("run input --save-workdir".split()) - assert args["command"] == "run" - assert args["logging"] == "INFO" - assert args["input"] == "input" - assert args["save_workdir"] is True - assert args["upload"] is True - assert args["validate"] is True - - -def test_parse_args_deprecated_skip() -> None: - args = NothingTask.parse_args("run input --skip-upload --skip-validation".split()) - assert args["upload"] is False - assert args["validate"] is False - - -def test_parse_args_no_upload_and_no_validation() -> None: - args = NothingTask.parse_args("run input --no-upload --no-validate".split()) - assert args["upload"] is False - assert args["validate"] is False - - -def test_parse_args_no_upload_and_validation() -> None: - args = NothingTask.parse_args("run input --no-upload --validate".split()) - assert args["upload"] is False - assert args["validate"] is True - - -def test_parse_args_upload_and_no_validation() -> None: - args = NothingTask.parse_args("run input --upload --no-validate".split()) - assert args["upload"] is True - assert args["validate"] is False - - -def test_parse_args_upload_and_validation() -> None: - args = NothingTask.parse_args("run input --upload --validate".split()) - assert args["upload"] is True - assert args["validate"] is True - - def test_collection_mapping(nothing_task: Task) -> None: assert nothing_task.collection_mapping == { "sentinel-2-l2a": "$[?(@.id =~ 'S2[AB].*')]", @@ -273,7 +228,3 @@ def test_s3_upload(nothing_task: Task, mock_s3_client: Callable[[], s3]) -> None item_after_upload.assets["key1"].href == "https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-2-l2a/52/H/GH/2022/10/S2A_52HGH_20221007_0_L2A/foo.txt" ) - - -if __name__ == "__main__": - output = NothingTask.cli() From aa3449463d6a0543644422284f577d3199f97422 Mon Sep 17 00:00:00 2001 From: Nate Rubin Date: Thu, 18 Dec 2025 15:37:06 -0500 Subject: [PATCH 5/5] add more CLI and task tests --- stactask/cli.py | 89 ++++++++++++++-------------------------------- stactask/task.py | 16 +++++++++ tests/tasks.py | 23 ++++++++++++ tests/test_cli.py | 35 +++++++++++++++++- tests/test_task.py | 41 ++++++++++++++++++++- 5 files changed, 139 insertions(+), 65 deletions(-) diff --git a/stactask/cli.py b/stactask/cli.py index 8c43d6e..17e0998 100644 --- a/stactask/cli.py +++ b/stactask/cli.py @@ -27,20 +27,25 @@ def __init__(self) -> None: self._build_argparser() def _build_argparser(self) -> None: - self._parser = argparse.ArgumentParser(description="?!?!?!?!") + self._parser = argparse.ArgumentParser(description="STAC Task management tools") self._parser.add_argument( "--version", help="Print version and exit", action="version", version="???", ) - pparser = argparse.ArgumentParser(add_help=False) self._parser.add_argument( "--logging", default="INFO", help="DEBUG, INFO, WARN, ERROR, CRITICAL", ) + self._parser.add_argument( + "--output", + default=None, + help="Write output payload to this URL", + ) + pparser = argparse.ArgumentParser(add_help=False) subparsers = self._parser.add_subparsers(dest="command") # run @@ -62,11 +67,6 @@ def _build_argparser(self) -> None: help="Name of the task you wish to run", ) - run_parser.add_argument( - "--output", - default=None, - help="Write output payload to this URL", - ) # additional options run_parser.add_argument( @@ -136,18 +136,12 @@ def _build_argparser(self) -> None: workdir = 'local-output', output = 'local-output/output-payload.json') """, ) - # build + # metadata build_parser = subparsers.add_parser( - "build", + "metadata", parents=[pparser], formatter_class=argparse.ArgumentDefaultsHelpFormatter, - help="Build all STAC Tasks", - ) - - build_parser.add_argument( - "--image-tag", - required=True, - help="URI to push docker image to", + help="Output metadata document for all registered STAC Tasks", ) build_parser.add_argument( @@ -166,6 +160,8 @@ def execute(self) -> None: loglevel = args.pop("logging") logging.basicConfig(level=loglevel) + href_out = args.pop("output", None) + # quiet these loud loggers for ql in [ "botocore", @@ -179,9 +175,11 @@ def execute(self) -> None: match cmd: case "run": - self._run_task(args) - case "build": - self._build_tasks(args) + output = self._run_task(args) + case "metadata": + output = self._task_metadata() + + self._write_output(output, href_out) def _parse_args(self, args: list[str]) -> dict[str, Any]: # turn Namespace into dictionary @@ -208,7 +206,7 @@ def _parse_args(self, args: list[str]) -> dict[str, Any]: return pargs - def _build_tasks(self, args: dict[str, Any]) -> None: + def _task_metadata(self) -> dict[str, Any]: # create task metadata document from stactask import __version__ @@ -217,49 +215,12 @@ def _build_tasks(self, args: dict[str, Any]) -> None: "tasks": {}, } for name, task in self.tasks.items(): - input_schema = output_schema = None - if task.input_model: - input_schema = task.input_model.model_json_schema() - if task.output_model: - output_schema = task.output_model.model_json_schema() - - task_metadata = { - "name": name, - "version": task.version, - "description": task.description, - "input_schema": input_schema, - "output_schema": output_schema, - } - metadata["tasks"][name] = task_metadata - - # build docker image - subprocess.check_call( # noqa: S603 - [ # noqa: S607 - "docker", - "buildx", - "build", - "-t", - args["image_tag"], - "--label", - f"stactask_metadata={json.dumps(metadata)}", - "-f", - "Dockerfile", - ".", - ], - ) - - # push docker image - def _push_image(image_tag: str) -> None: - # noop, replace later - return None + metadata["tasks"][name] = task.metadata() - image_tag = args["image_tag"] - cast(str, image_tag) - _push_image(args["image_tag"]) + return metadata - def _run_task(self, args: dict[str, Any]) -> None: + def _run_task(self, args: dict[str, Any]) -> dict[str, Any]: href = args.pop("input", None) - href_out = args.pop("output", None) # read input if href is None: @@ -272,9 +233,11 @@ def _run_task(self, args: dict[str, Any]) -> None: task_name = args.pop("task") payload_out = self.tasks[task_name].handler(payload, **args) - # write output + return payload_out + + def _write_output(self, output: dict[str, Any], href_out: str | None = None) -> None: if href_out is None: - json.dump(payload_out, sys.stdout) + json.dump(output, sys.stdout) else: with fsspec.open(href_out, "w") as f: - f.write(json.dumps(payload_out)) + f.write(json.dumps(output)) diff --git a/stactask/task.py b/stactask/task.py index c908ea9..1b0343d 100644 --- a/stactask/task.py +++ b/stactask/task.py @@ -155,6 +155,22 @@ def _payload(self, value: dict[str, Any]) -> None: ) self.payload = Payload(value) + @classmethod + def metadata(cls) -> dict[str, str]: + input_schema = output_schema = None + if cls.input_model: + input_schema = cls.input_model.model_json_schema() + if cls.output_model: + output_schema = cls.output_model.model_json_schema() + + return { + "name": cls.name, + "version": cls.version, + "description": cls.description, + "input_schema": input_schema, + "output_schema": output_schema, + } + @property def process_definition(self) -> dict[str, Any]: warnings.warn( diff --git a/tests/tasks.py b/tests/tasks.py index fb094e3..2b42da2 100644 --- a/tests/tasks.py +++ b/tests/tasks.py @@ -1,5 +1,7 @@ from typing import Any +from pydantic import BaseModel + from stactask import Task from stactask.exceptions import FailedValidation @@ -32,3 +34,24 @@ class DerivedItemTask(Task): def process(self, **kwargs: Any) -> list[dict[str, Any]]: assert kwargs["parameter"] == "value" return [self.create_item_from_item(self.items_as_dicts[0])] + + +class InputModel(BaseModel): + a: int + b: str + + +class OutputModel(BaseModel): + c: float + + +class SchemaTask(Task): + name = "schema-task" + description = "this task defines input and output models" + version = "0.2.0" + + input_model = InputModel + output_model = OutputModel + + def process(self, **kwargs: Any) -> list[dict[str, Any]]: + return [OutputModel(c=2.7)] diff --git a/tests/test_cli.py b/tests/test_cli.py index aeca9a4..d619f21 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,8 @@ +from tempfile import TemporaryDirectory + import pytest -from stactask import CLI +from stactask import __version__, CLI from .tasks import NothingTask @@ -67,3 +69,34 @@ def test_parse_args_upload_and_validation(cli) -> None: ) assert args["upload"] is True assert args["validate"] is True + + +def test_run_task_command(cli) -> None: + with TemporaryDirectory() as tmpdir: + tmpfile_name = f"{tmpdir}/input" + with open(tmpfile_name, 'w') as fp: + fp.write('{}') + fp.close() + + args = cli._parse_args( + f"run {tmpfile_name} --task nothing-task".split() + ) + args.pop("command") + args.pop("logging") + output = cli._run_task(args) + assert output == {"features": []} + + +def test_metadata_command(cli) -> None: + assert cli._task_metadata() == { + "stactask_version": __version__, + "tasks": { + "nothing-task": { + "name": "nothing-task", + "description": "this task does nothing", + "version": "0.1.0", + "input_schema": None, + "output_schema": None, + } + } + } diff --git a/tests/test_task.py b/tests/test_task.py index 7302fcf..5af4f5b 100644 --- a/tests/test_task.py +++ b/tests/test_task.py @@ -13,7 +13,7 @@ from stactask.exceptions import FailedValidation from stactask.task import Task -from .tasks import DerivedItemTask, FailValidateTask, NothingTask +from .tasks import DerivedItemTask, FailValidateTask, NothingTask, SchemaTask testpath = Path(__file__).parent cassettepath = testpath / "fixtures" / "cassettes" @@ -38,6 +38,11 @@ def derived_item_task(payload: dict[str, Any]) -> Task: return DerivedItemTask(payload) +@pytest.fixture +def schema_task(payload: dict[str, Any]) -> Task: + return SchemaTask(payload) + + @pytest.fixture def mock_s3_client() -> Callable[[], s3]: """Recreate the global S3 client within mock context to avoid state pollution. @@ -228,3 +233,37 @@ def test_s3_upload(nothing_task: Task, mock_s3_client: Callable[[], s3]) -> None item_after_upload.assets["key1"].href == "https://sentinel-cogs.s3.us-west-2.amazonaws.com/sentinel-2-l2a/52/H/GH/2022/10/S2A_52HGH_20221007_0_L2A/foo.txt" ) + + +def test_task_metadata(schema_task: Task) -> None: + assert schema_task.metadata() == { + "name": "schema-task", + "version": "0.2.0", + "description": "this task defines input and output models", + "input_schema": { + "properties": { + "a": { + "title": "A", + "type": "integer", + }, + "b": { + "title": "B", + "type": "string", + }, + }, + "required": ["a", "b"], + "title": "InputModel", + "type": "object", + }, + "output_schema": { + "properties": { + "c": { + "title": "C", + "type": "number", + }, + }, + "required": ["c"], + "title": "OutputModel", + "type": "object", + }, + }