From 2ff95f8e679b03a66d027fd68b72fcdc97caf6cc Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Tue, 27 Jan 2026 11:45:59 -0800 Subject: [PATCH 01/14] start the script --- .../project_attribution/apply_attribution.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 scripts/utility/project_attribution/apply_attribution.py diff --git a/scripts/utility/project_attribution/apply_attribution.py b/scripts/utility/project_attribution/apply_attribution.py new file mode 100644 index 0000000..4152f51 --- /dev/null +++ b/scripts/utility/project_attribution/apply_attribution.py @@ -0,0 +1,42 @@ +"""Bulk apply project attribution to projects in Data Exchange +issue https://github.com/Riverscapes/rs-web-monorepo/issues/861 + +Lorin Gaertner +January 2026 +""" +import argparse + +from rsxml import ProgressBar, dotenv, Logger +from pydex import RiverscapesAPI + + +def build_attribution_params(): + """Assemble: + * list of projects IDs to apply attribution to + * ProjectAttribution Object (Organization ID, list of AttributionRoleEnum) + """ + return (['73cc1ada-c82b-499e-b3b2-5dc70393e340'], 'cc4fff44-f470-4f4f-ada2-99f741d56b28', ['CO_FUNDER', 'DESIGNER']) + + +def apply_attribution(api: RiverscapesAPI, stage: str, attribution_params: tuple[list[str], str, list[str]]): + # ProjectAttribution is organization: Organization! , role [AttributionRoleEnum!] + # Project.attribution is an array of [ProjectATtribution!]! + print("hello") + + +if __name__ == '__main__': + """Main entry point - process arguments""" + parser = argparse.ArgumentParser() + parser.add_argument('stage', + help='Production or staging Data Exchange', + type=str, + choices=['production', 'staging'], + default='staging') + args = dotenv.parse_args_env(parser) + log = Logger('Setup') + # log_path = output_path / 'report.log' + # log.setup(log_path=log_path, log_level=logging.DEBUG) + log.info(f'Connecting to {args.stage} environment') + with RiverscapesAPI(stage=args.stage) as api: + attribution_params = build_attribution_params() + apply_attribution(api, args.stage, attribution_params) From bd1a6367d6c075e61bf40c4b6e3043c80bd15bf2 Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Tue, 27 Jan 2026 11:53:00 -0800 Subject: [PATCH 02/14] explain that code includes the API --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 779685f..54be7c0 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,10 @@ This project is designed to simplify interaction with the Riverscapes GraphQL API. It uses modern Python packaging standards, including a `pyproject.toml` file for configuration and dependency management. +### Data Exchange API (GraphQL definitions) + +This project includes a static local copy of ot he riverscapes data exchange API. We do this as we've had trouble getting linter/VSCode introspection to work with the online version. But it means if the API changes, this code needs to be updated. + ## Using UV for Environment Management This project uses [uv](https://github.com/astral-sh/uv) to manage Python virtual environments and dependencies. `uv` is an alternative to tools like `pipenv` and `poetry`. From b6fb1e07c49488348fd920bb410f6e917023fe32 Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Tue, 27 Jan 2026 12:19:02 -0800 Subject: [PATCH 03/14] progress --- .../project_attribution/apply_attribution.py | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/scripts/utility/project_attribution/apply_attribution.py b/scripts/utility/project_attribution/apply_attribution.py index 4152f51..3c89bb1 100644 --- a/scripts/utility/project_attribution/apply_attribution.py +++ b/scripts/utility/project_attribution/apply_attribution.py @@ -1,6 +1,14 @@ """Bulk apply project attribution to projects in Data Exchange issue https://github.com/Riverscapes/rs-web-monorepo/issues/861 +A project's attribution consists of a list of attribution objects, +each of which is an Organization and a list of Roles from the AttributionRoleEnum + +There are three ways someone might want to change attribution: +1. add (do not change existing, apply new on top of it) +2. replace (remove any existing attribution and apply new) +3. remove (remove specific attribution but leave all others in place) + Lorin Gaertner January 2026 """ @@ -10,18 +18,33 @@ from pydex import RiverscapesAPI -def build_attribution_params(): +def build_attribution_params() -> tuple[list[str], str, list[str]]: """Assemble: * list of projects IDs to apply attribution to - * ProjectAttribution Object (Organization ID, list of AttributionRoleEnum) + * ProjectAttribution Object Organization ID + * ProjectAttribution Object list of AttributionRoleEnum """ return (['73cc1ada-c82b-499e-b3b2-5dc70393e340'], 'cc4fff44-f470-4f4f-ada2-99f741d56b28', ['CO_FUNDER', 'DESIGNER']) def apply_attribution(api: RiverscapesAPI, stage: str, attribution_params: tuple[list[str], str, list[str]]): + # Project.attribution is an array of [ProjectAttribution!]! # ProjectAttribution is organization: Organization! , role [AttributionRoleEnum!] - # Project.attribution is an array of [ProjectATtribution!]! - print("hello") + log = Logger('Apply attribution') + log.title('Apply attribution') + mutation = api.load_mutation('updateProject') + project_ids, org_id, roles = attribution_params + + attribution_item = { + "organizationId": org_id, + "roles": roles + } + + prg = ProgressBar(total=len(project_ids), text='Attributing projects') + for i, project_id in enumerate(project_ids): + variables = { + "projectId": project_id, + } if __name__ == '__main__': From a0a0f2a0ca1012dfccdc46332b697cc1917b52f2 Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Tue, 27 Jan 2026 12:38:22 -0800 Subject: [PATCH 04/14] new script to make python TypedDict classes from the API --- ...enerate_python_classes_from_graphql_api.py | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 pydex/generate_python_classes_from_graphql_api.py diff --git a/pydex/generate_python_classes_from_graphql_api.py b/pydex/generate_python_classes_from_graphql_api.py new file mode 100644 index 0000000..069ab75 --- /dev/null +++ b/pydex/generate_python_classes_from_graphql_api.py @@ -0,0 +1,114 @@ +""" +Generate Python TypedDict definitions from a GraphQL schema. + +This script reads the project's 'graphql.config.json' to locate the schema file, +parses it, and generates Python `TypedDict` classes for all InputObjects. +This allows for type-safe construction of GraphQL mutation payloads. +""" + +import argparse +from pathlib import Path + +from graphql import ( + InputObjectTypeDefinitionNode, + ListTypeNode, + NamedTypeNode, + NonNullTypeNode, + TypeNode, + parse, +) + + +def get_python_type(type_node: TypeNode) -> str: + """ + Recursively resolve GraphQL types to modern Python type strings. + + Args: + type_node: The GraphQL AST node representing the type. + + Returns: + A string representing the Python type (e.g., 'list[str]', 'int'). + """ + if isinstance(type_node, NonNullTypeNode): + return get_python_type(type_node.type) + + if isinstance(type_node, ListTypeNode): + inner_type = get_python_type(type_node.type) + return f"list[{inner_type}]" + + if isinstance(type_node, NamedTypeNode): + name = type_node.name.value + mapping = { + 'String': 'str', + 'ID': 'str', + 'Boolean': 'bool', + 'Int': 'int', + 'Float': 'float' + } + # Use quotes for forward references to other classes + return mapping.get(name, f"'{name}'") + + return "Any" + + +def generate_types(schema_path: Path, output_path: Path) -> None: + """ + Parse the schema and write Python TypedDict definitions to a file. + + Args: + schema_path: Path to the .graphql schema file. + output_path: Path to the output .py file. + """ + if not schema_path.exists(): + print(f"Error: Schema file not found at {schema_path}") + return + + print(f"Reading schema from: {schema_path}") + print(f"Writing types to: {output_path}") + + with open(schema_path, 'r', encoding='utf-8') as f: + schema_content = f.read() + + doc = parse(schema_content) + + # Ensure output directory exists + output_path.parent.mkdir(parents=True, exist_ok=True) + + with open(output_path, 'w', encoding='utf-8') as f: + f.write(f'"""\nGenerated from {schema_path.name} using {Path(__file__).name}\n"""\n') + f.write("from typing import TypedDict, Any\n\n") + + count = 0 + for definition in doc.definitions: + # We focus on Input types as they are critical for constructing mutation payloads + if isinstance(definition, InputObjectTypeDefinitionNode): + count += 1 + name = definition.name.value + f.write(f"class {name}(TypedDict, total=False):\n") + + if not definition.fields: + f.write(" pass\n\n") + continue + + for field in definition.fields: + field_name = field.name.value + python_type = get_python_type(field.type) + f.write(f" {field_name}: {python_type}\n") + f.write("\n") + + print(f"Successfully generated {count} types.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Generate Python TypedDicts from GraphQL Schema") + + default_schema = Path("pydex/graphql/riverscapes.schema.graphql") + default_output = Path("pydex/graphql/generated_types.py") + + parser.add_argument('--schema', type=Path, default=default_schema, + help='Path to riverscapes.schema.graphql') + parser.add_argument('--output', type=Path, default=default_output, + help='Path to output .py file') + + args = parser.parse_args() + generate_types(args.schema, args.output) From 069050844c666961890b08d64252d5c1f3234f15 Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Tue, 27 Jan 2026 14:23:11 -0800 Subject: [PATCH 05/14] first working version --- ...enerate_python_classes_from_graphql_api.py | 40 +- pydex/generated_types.py | 542 ++++++++++++++++++ .../project_attribution/apply_attribution.py | 64 ++- 3 files changed, 632 insertions(+), 14 deletions(-) create mode 100644 pydex/generated_types.py diff --git a/pydex/generate_python_classes_from_graphql_api.py b/pydex/generate_python_classes_from_graphql_api.py index 069ab75..e1cbe32 100644 --- a/pydex/generate_python_classes_from_graphql_api.py +++ b/pydex/generate_python_classes_from_graphql_api.py @@ -4,12 +4,18 @@ This script reads the project's 'graphql.config.json' to locate the schema file, parses it, and generates Python `TypedDict` classes for all InputObjects. This allows for type-safe construction of GraphQL mutation payloads. + +Quickly built with copilot/gemini 3 pro (preview) 2026-01-27 by Lorin +NOTE: If we want to go deeper, there are established libraries for this: +* ariadne https://github.com/mirumee/ariadne-codegen/ +* https://github.com/sauldom102/gql_schema_codegen """ import argparse from pathlib import Path from graphql import ( + EnumTypeDefinitionNode, InputObjectTypeDefinitionNode, ListTypeNode, NamedTypeNode, @@ -76,13 +82,34 @@ def generate_types(schema_path: Path, output_path: Path) -> None: with open(output_path, 'w', encoding='utf-8') as f: f.write(f'"""\nGenerated from {schema_path.name} using {Path(__file__).name}\n"""\n') - f.write("from typing import TypedDict, Any\n\n") + f.write("from typing import TypedDict\n") + f.write("from enum import Enum\n\n\n") + + enum_count = 0 + input_count = 0 + + # Pass 1: generate Enums + for definition in doc.definitions: + if isinstance(definition, EnumTypeDefinitionNode): + enum_count += 1 + name = definition.name.value + f.write(f"class {name}(str, Enum):\n") + if not definition.values: + f.write(" pass\n\n") + continue - count = 0 + for value_def in definition.values: + val = value_def.name.value + # Handle Python reserved keywords or invalid identifiers if necessary + # For now assume schema values are safe or valid python identifiers + f.write(f" {val} = '{val}'\n") + f.write("\n") + + # Pass 2: generate Input Objects for definition in doc.definitions: # We focus on Input types as they are critical for constructing mutation payloads if isinstance(definition, InputObjectTypeDefinitionNode): - count += 1 + input_count += 1 name = definition.name.value f.write(f"class {name}(TypedDict, total=False):\n") @@ -95,15 +122,15 @@ def generate_types(schema_path: Path, output_path: Path) -> None: python_type = get_python_type(field.type) f.write(f" {field_name}: {python_type}\n") f.write("\n") - - print(f"Successfully generated {count} types.") + + print(f"Successfully generated {enum_count} Enums and {input_count} Input Types.") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Generate Python TypedDicts from GraphQL Schema") default_schema = Path("pydex/graphql/riverscapes.schema.graphql") - default_output = Path("pydex/graphql/generated_types.py") + default_output = Path("pydex/generated_types.py") parser.add_argument('--schema', type=Path, default=default_schema, help='Path to riverscapes.schema.graphql') @@ -112,3 +139,4 @@ def generate_types(schema_path: Path, output_path: Path) -> None: args = parser.parse_args() generate_types(args.schema, args.output) + print('DONE.') diff --git a/pydex/generated_types.py b/pydex/generated_types.py new file mode 100644 index 0000000..76277d0 --- /dev/null +++ b/pydex/generated_types.py @@ -0,0 +1,542 @@ +""" +Generated from riverscapes.schema.graphql using generate_python_classes_from_graphql_api.py +""" +from typing import TypedDict +from enum import Enum + + +class AttributionRoleEnum(str, Enum): + ANALYST = 'ANALYST' + CONTRIBUTOR = 'CONTRIBUTOR' + CO_FUNDER = 'CO_FUNDER' + DESIGNER = 'DESIGNER' + FUNDER = 'FUNDER' + OWNER = 'OWNER' + QA_QC = 'QA_QC' + SUPPORTER = 'SUPPORTER' + + +class DatasetContainerTypesEnum(str, Enum): + CommonDatasets = 'CommonDatasets' + Configuration = 'Configuration' + Datasets = 'Datasets' + Inputs = 'Inputs' + Intermediates = 'Intermediates' + Logs = 'Logs' + Outputs = 'Outputs' + Products = 'Products' + + +class DatasetTypeEnum(str, Enum): + AuxInstrumentFile = 'AuxInstrumentFile' + CSV = 'CSV' + ConfigFile = 'ConfigFile' + DEM = 'DEM' + DataTable = 'DataTable' + Database = 'Database' + File = 'File' + Geopackage = 'Geopackage' + HTMLFile = 'HTMLFile' + HillShade = 'HillShade' + Image = 'Image' + InstrumentFile = 'InstrumentFile' + LogFile = 'LogFile' + MSAccessDB = 'MSAccessDB' + PDF = 'PDF' + Raster = 'Raster' + SQLiteDB = 'SQLiteDB' + SurveyQualityDB = 'SurveyQualityDB' + TIN = 'TIN' + Vector = 'Vector' + Video = 'Video' + ZipFile = 'ZipFile' + + +class DateWithinEnum(str, Enum): + ONE_DAY = 'ONE_DAY' + ONE_MONTH = 'ONE_MONTH' + ONE_WEEK = 'ONE_WEEK' + SIX_MONTHS = 'SIX_MONTHS' + + +class EntitiesWithImagesEnum(str, Enum): + COLLECTION = 'COLLECTION' + ORGANIZATION = 'ORGANIZATION' + PROJECT = 'PROJECT' + PROJECT_TYPE = 'PROJECT_TYPE' + SAVED_SEARCH = 'SAVED_SEARCH' + USER = 'USER' + + +class EntityDeleteActionsEnum(str, Enum): + DELETE = 'DELETE' + DELETE_COMPLETE = 'DELETE_COMPLETE' + MAKE_PUBLIC = 'MAKE_PUBLIC' + REQUEST_TRANSFER = 'REQUEST_TRANSFER' + + +class ImageTypeEnum(str, Enum): + AVATAR = 'AVATAR' + HERO = 'HERO' + LOGO = 'LOGO' + + +class JobStatusEnum(str, Enum): + FAILED = 'FAILED' + PROCESSING = 'PROCESSING' + READY = 'READY' + SUCCESS = 'SUCCESS' + UNKNOWN = 'UNKNOWN' + + +class MetaDataExtEnum(str, Enum): + DATASET = 'DATASET' + PROJECT = 'PROJECT' + WAREHOUSE = 'WAREHOUSE' + + +class MetaDataTypeEnum(str, Enum): + BOOLEAN = 'BOOLEAN' + FILEPATH = 'FILEPATH' + FLOAT = 'FLOAT' + GUID = 'GUID' + HIDDEN = 'HIDDEN' + IMAGE = 'IMAGE' + INT = 'INT' + ISODATE = 'ISODATE' + JSON = 'JSON' + MARKDOWN = 'MARKDOWN' + RICHTEXT = 'RICHTEXT' + STRING = 'STRING' + TIMESTAMP = 'TIMESTAMP' + URL = 'URL' + VIDEO = 'VIDEO' + + +class NotificationActionsEnum(str, Enum): + CREATED = 'CREATED' + DELETED = 'DELETED' + RENAMED = 'RENAMED' + TRANSFERRED = 'TRANSFERRED' + UPDATED = 'UPDATED' + + +class NotificationOperationEnum(str, Enum): + DELETE = 'DELETE' + MARK_READ = 'MARK_READ' + MARK_UNREAD = 'MARK_UNREAD' + + +class NotificationTypesEnum(str, Enum): + COLLECTION = 'COLLECTION' + ORGANIZATION = 'ORGANIZATION' + PROJECT = 'PROJECT' + SAVED_SEARCH = 'SAVED_SEARCH' + USER = 'USER' + + +class OrganizationInviteRoleEnum(str, Enum): + ADMIN = 'ADMIN' + CONTRIBUTOR = 'CONTRIBUTOR' + VIEWER = 'VIEWER' + + +class OrganizationInviteStateEnum(str, Enum): + ACCEPTED = 'ACCEPTED' + EXPIRED = 'EXPIRED' + INVITED = 'INVITED' + REJECTED = 'REJECTED' + REQUESTED = 'REQUESTED' + + +class OrganizationRoleEnum(str, Enum): + ADMIN = 'ADMIN' + CONTRIBUTOR = 'CONTRIBUTOR' + NONE = 'NONE' + OWNER = 'OWNER' + VIEWER = 'VIEWER' + + +class OwnerInputTypesEnum(str, Enum): + ORGANIZATION = 'ORGANIZATION' + USER = 'USER' + + +class ProjectDeleteChoicesEnum(str, Enum): + DELETE = 'DELETE' + DELETE_COMPLETE = 'DELETE_COMPLETE' + + +class ProjectGroupVisibilityEnum(str, Enum): + PUBLIC = 'PUBLIC' + SECRET = 'SECRET' + + +class ProjectTreeLayerTypeEnum(str, Enum): + FILE = 'FILE' + LINE = 'LINE' + POINT = 'POINT' + POLYGON = 'POLYGON' + RASTER = 'RASTER' + REPORT = 'REPORT' + TIN = 'TIN' + + +class ProjectTypeStateEnum(str, Enum): + ACTIVE = 'ACTIVE' + DELETED = 'DELETED' + SUGGESTED = 'SUGGESTED' + + +class ProjectVisibilityEnum(str, Enum): + PRIVATE = 'PRIVATE' + PUBLIC = 'PUBLIC' + SECRET = 'SECRET' + + +class QAQCStateEnum(str, Enum): + FAILED = 'FAILED' + PASSED = 'PASSED' + PROVISIONAL = 'PROVISIONAL' + + +class RampTypeEnum(str, Enum): + DISCRETE = 'DISCRETE' + EXACT = 'EXACT' + INTERPOLATED = 'INTERPOLATED' + + +class SearchSortEnum(str, Enum): + AREA_DESC = 'AREA_DESC' + DATE_CREATED_ASC = 'DATE_CREATED_ASC' + DATE_CREATED_DESC = 'DATE_CREATED_DESC' + DATE_UPDATED_ASC = 'DATE_UPDATED_ASC' + DATE_UPDATED_DESC = 'DATE_UPDATED_DESC' + MINE = 'MINE' + MODEL_VERSION_ASC = 'MODEL_VERSION_ASC' + MODEL_VERSION_DESC = 'MODEL_VERSION_DESC' + NAME_ASC = 'NAME_ASC' + NAME_DESC = 'NAME_DESC' + + +class SearchableTypesEnum(str, Enum): + COLLECTION = 'COLLECTION' + ORGANIZATION = 'ORGANIZATION' + PROJECT = 'PROJECT' + SAVED_SEARCH = 'SAVED_SEARCH' + USER = 'USER' + + +class SeverityEnum(str, Enum): + CRITICAL = 'CRITICAL' + DEBUG = 'DEBUG' + ERROR = 'ERROR' + INFO = 'INFO' + WARNING = 'WARNING' + + +class StarrableTypesEnum(str, Enum): + COLLECTION = 'COLLECTION' + ORGANIZATION = 'ORGANIZATION' + PROJECT = 'PROJECT' + SAVED_SEARCH = 'SAVED_SEARCH' + USER = 'USER' + + +class SymbologyStateEnum(str, Enum): + ERROR = 'ERROR' + FETCHING = 'FETCHING' + FOUND = 'FOUND' + MISSING = 'MISSING' + NOT_APPLICABLE = 'NOT_APPLICABLE' + UNKNOWN = 'UNKNOWN' + + +class TileTypesEnum(str, Enum): + HTML = 'HTML' + RASTER = 'RASTER' + VECTOR_GPKG = 'VECTOR_GPKG' + VECTOR_SHP = 'VECTOR_SHP' + + +class TilingStateEnum(str, Enum): + CREATING = 'CREATING' + FETCHING = 'FETCHING' + FETCH_ERROR = 'FETCH_ERROR' + INDEX_NOT_FOUND = 'INDEX_NOT_FOUND' + LAYER_NOT_FOUND = 'LAYER_NOT_FOUND' + NOT_APPLICABLE = 'NOT_APPLICABLE' + NO_GEOMETRIES = 'NO_GEOMETRIES' + QUEUED = 'QUEUED' + SUCCESS = 'SUCCESS' + TILING_ERROR = 'TILING_ERROR' + TIMEOUT = 'TIMEOUT' + UNKNOWN = 'UNKNOWN' + + +class TransferStateEnum(str, Enum): + ACCEPTED = 'ACCEPTED' + EXPIRED = 'EXPIRED' + IN_PROGRESS = 'IN_PROGRESS' + PROPOSED = 'PROPOSED' + REJECTED = 'REJECTED' + + +class TransferrableTypesEnum(str, Enum): + COLLECTION = 'COLLECTION' + ORGANIZATION = 'ORGANIZATION' + PROJECT = 'PROJECT' + USER = 'USER' + + +class CollectionInput(TypedDict, total=False): + citation: str + clearContact: bool + clearHeroImage: bool + contact: 'OwnerInput' + description: str + heroImageToken: str + meta: list['MetaDataInput'] + name: str + summary: str + tags: list[str] + visibility: 'ProjectGroupVisibilityEnum' + + +class DBObjNotificationsInput(TypedDict, total=False): + createdById: str + createdByName: str + createdOn: 'DateTime' + id: str + name: str + summary: str + updatedById: str + updatedByName: str + updatedOn: 'DateTime' + + +class DatasetInput(TypedDict, total=False): + citation: str + description: str + extRef: str + layers: list['DatasetLayerInput'] + localPath: str + meta: list['MetaDataInput'] + name: str + rsXPath: str + summary: str + + +class DatasetLayerInput(TypedDict, total=False): + citation: str + description: str + extRef: str + lyrName: str + meta: list['MetaDataInput'] + name: str + summary: str + + +class DatasetLayerUpdate(TypedDict, total=False): + citation: str + description: str + meta: list['MetaDataInput'] + name: str + summary: str + + +class DatasetUpdate(TypedDict, total=False): + citation: str + description: str + dsId: str + meta: list['MetaDataInput'] + name: str + summary: str + + +class EntityDeletionOptions(TypedDict, total=False): + totalDelete: bool + transfer: 'TransferEntityItemsInput' + + +class FileDownloadMetaInput(TypedDict, total=False): + contentType: str + localPath: str + md5: str + size: 'BigInt' + + +class LinkInput(TypedDict, total=False): + alt: str + href: 'URL' + text: str + + +class MetaDataInput(TypedDict, total=False): + ext: 'MetaDataExtEnum' + key: str + locked: bool + type: 'MetaDataTypeEnum' + value: str + + +class NotificationInput(TypedDict, total=False): + object: 'DBObjNotificationsInput' + subject: 'DBObjNotificationsInput' + type: 'NotificationTypesEnum' + verb: 'NotificationActionsEnum' + + +class OrganizationInput(TypedDict, total=False): + clearLogo: bool + description: str + logoToken: str + meta: list['MetaDataInput'] + name: str + preferences: 'JSONObject' + social: 'SocialLinksInput' + summary: str + + +class OwnerInput(TypedDict, total=False): + id: str + type: 'OwnerInputTypesEnum' + + +class ProfileInput(TypedDict, total=False): + affiliations: list['UserAffiliationInput'] + avatarToken: str + clearAvatar: bool + description: str + jobTitle: str + location: str + meta: list['MetaDataInput'] + name: str + preferences: 'JSONObject' + socialLinks: 'SocialLinksInput' + summary: str + + +class ProjectAttributionInput(TypedDict, total=False): + organizationId: str + roles: list['AttributionRoleEnum'] + + +class ProjectInput(TypedDict, total=False): + archived: bool + attribution: list['ProjectAttributionInput'] + boundsToken: str + citation: str + clearBounds: bool + clearHeroImage: bool + datasets: list['DatasetInput'] + deleteDatasets: list[str] + description: str + heroImageToken: str + meta: list['MetaDataInput'] + name: str + qaqc: list['QAQCEventInput'] + summary: str + tags: list[str] + totalSize: 'BigInt' + visibility: 'ProjectVisibilityEnum' + + +class ProjectSearchParamsInput(TypedDict, total=False): + attributedOrgId: str + bbox: list[float] + boundsId: str + collection: str + createdOn: 'SearchDateInput' + createdWithin: 'DateWithinEnum' + editableOnly: bool + excludeArchived: bool + keywords: str + meta: list['MetaDataInput'] + name: str + ownedBy: 'OwnerInput' + projectTypeId: str + tags: list[str] + updatedOn: 'SearchDateInput' + visibility: 'ProjectVisibilityEnum' + + +class ProjectTypeInput(TypedDict, total=False): + clearLogo: bool + description: str + logoToken: str + meta: list['MetaDataInput'] + name: str + summary: str + url: 'URL' + + +class QAQCEventInput(TypedDict, total=False): + datePerformed: 'DateTime' + description: str + meta: list['MetaDataInput'] + name: str + performedBy: str + state: 'QAQCStateEnum' + summary: str + supportingLinks: list['LinkInput'] + + +class SavedSearchInput(TypedDict, total=False): + citation: str + clearHeroImage: bool + defaultSort: list['SearchSortEnum'] + description: str + heroImageToken: str + meta: list['MetaDataInput'] + name: str + searchParams: 'ProjectSearchParamsInput' + summary: str + tags: list[str] + visibility: 'ProjectGroupVisibilityEnum' + + +class SearchDateInput(TypedDict, total=False): + from: 'DateTime' + to: 'DateTime' + + +class SearchParamsInput(TypedDict, total=False): + createdOn: 'SearchDateInput' + createdWithin: 'DateWithinEnum' + editableOnly: bool + keywords: str + meta: list['MetaDataInput'] + name: str + ownedBy: 'OwnerInput' + tags: list[str] + updatedOn: 'SearchDateInput' + visibility: 'ProjectGroupVisibilityEnum' + + +class SocialLinksInput(TypedDict, total=False): + facebook: str + instagram: str + linkedIn: str + tiktok: str + twitter: str + website: 'URL' + + +class TransferEntityItemsInput(TypedDict, total=False): + note: str + transferTo: 'OwnerInput' + + +class TransferInput(TypedDict, total=False): + includeProjects: bool + note: str + objectIds: list[str] + transferTo: 'OwnerInput' + transferType: 'TransferrableTypesEnum' + + +class UserAffiliationInput(TypedDict, total=False): + affiliationRole: str + name: str + url: 'URL' diff --git a/scripts/utility/project_attribution/apply_attribution.py b/scripts/utility/project_attribution/apply_attribution.py index 3c89bb1..2441b31 100644 --- a/scripts/utility/project_attribution/apply_attribution.py +++ b/scripts/utility/project_attribution/apply_attribution.py @@ -1,7 +1,7 @@ """Bulk apply project attribution to projects in Data Exchange -issue https://github.com/Riverscapes/rs-web-monorepo/issues/861 +issue https://github.com/Riverscapes/rs-web-monorepo/issues/861 -A project's attribution consists of a list of attribution objects, +A project's attribution consists of a list of attribution objects, each of which is an Organization and a list of Roles from the AttributionRoleEnum There are three ways someone might want to change attribution: @@ -16,6 +16,35 @@ from rsxml import ProgressBar, dotenv, Logger from pydex import RiverscapesAPI +# from pydex.generated_types import AttributionRoleEnum, ProjectAttributionInput, ProjectInput +from typing import TypedDict +from enum import Enum + + +class AttributionRoleEnum(str, Enum): + ANALYST = 'ANALYST' + CONTRIBUTOR = 'CONTRIBUTOR' + CO_FUNDER = 'CO_FUNDER' + DESIGNER = 'DESIGNER' + FUNDER = 'FUNDER' + OWNER = 'OWNER' + QA_QC = 'QA_QC' + SUPPORTER = 'SUPPORTER' + + +class ProjectAttributionInput(TypedDict, total=False): + organizationId: str + roles: list['AttributionRoleEnum'] + + +class ProjectInput(TypedDict, total=False): + archived: bool + attribution: list['ProjectAttributionInput'] + description: str + heroImageToken: str + name: str + summary: str + tags: list[str] def build_attribution_params() -> tuple[list[str], str, list[str]]: @@ -27,27 +56,42 @@ def build_attribution_params() -> tuple[list[str], str, list[str]]: return (['73cc1ada-c82b-499e-b3b2-5dc70393e340'], 'cc4fff44-f470-4f4f-ada2-99f741d56b28', ['CO_FUNDER', 'DESIGNER']) -def apply_attribution(api: RiverscapesAPI, stage: str, attribution_params: tuple[list[str], str, list[str]]): +def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str], str, list[str]]): + """Apply attribution to a project + TODO: Add different modes + """ # Project.attribution is an array of [ProjectAttribution!]! # ProjectAttribution is organization: Organization! , role [AttributionRoleEnum!] log = Logger('Apply attribution') log.title('Apply attribution') - mutation = api.load_mutation('updateProject') + mutation = rs_api.load_mutation('updateProject') project_ids, org_id, roles = attribution_params - attribution_item = { + attribution_item: ProjectAttributionInput = { "organizationId": org_id, - "roles": roles + "roles": [AttributionRoleEnum(role) for role in roles] } + updated = 0 prg = ProgressBar(total=len(project_ids), text='Attributing projects') for i, project_id in enumerate(project_ids): + project_update: ProjectInput = { + 'attribution': [attribution_item] + } variables = { "projectId": project_id, + "project": project_update } + result = rs_api.run_query(mutation, variables) + if result is None: + raise Exception(f'Failed to update project {project_id}. Query returned: {result}') + updated += 1 + prg.update(i+1) + prg.finish() + print(f'Process complete. {updated} projects updated.') -if __name__ == '__main__': +def main(): """Main entry point - process arguments""" parser = argparse.ArgumentParser() parser.add_argument('stage', @@ -62,4 +106,8 @@ def apply_attribution(api: RiverscapesAPI, stage: str, attribution_params: tuple log.info(f'Connecting to {args.stage} environment') with RiverscapesAPI(stage=args.stage) as api: attribution_params = build_attribution_params() - apply_attribution(api, args.stage, attribution_params) + apply_attribution(api, attribution_params) + + +if __name__ == "__main__": + main() From 0e9b4954cda9616b98a75eb4bbb345cacddc5b3c Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Tue, 27 Jan 2026 15:32:34 -0800 Subject: [PATCH 06/14] refinement - update RiverscapesAPI code to allow script to use its own graphql rather than a shared one that has stuff we don't need and not what we do want --- pydex/classes/RiverscapesAPI.py | 17 +++++++++++------ .../project_attribution/apply_attribution.py | 11 +++++++++-- .../updateProjectAttribution.graphql | 11 +++++++++++ 3 files changed, 31 insertions(+), 8 deletions(-) create mode 100644 scripts/utility/project_attribution/updateProjectAttribution.graphql diff --git a/pydex/classes/RiverscapesAPI.py b/pydex/classes/RiverscapesAPI.py index 642a00e..c437f3b 100644 --- a/pydex/classes/RiverscapesAPI.py +++ b/pydex/classes/RiverscapesAPI.py @@ -1,4 +1,5 @@ import os +from pathlib import Path from typing import Dict, List, Generator, Tuple import webbrowser import re @@ -325,17 +326,21 @@ def load_query(self, query_name: str) -> str: with open(os.path.join(os.path.dirname(__file__), '..', 'graphql', 'queries', f'{query_name}.graphql'), 'r', encoding='utf-8') as queryFile: return queryFile.read() - def load_mutation(self, mutation_name: str) -> str: - """ Load a mutation file from the file system. + def load_mutation(self, mutation_name: str | Path) -> str: + """ Load a mutation file from the file system graphql/mutations folder or from a specific path. Args: - mutationName (str): _description_ + mutationName (str|Path): name of mutation in library, or Path to .graphql file Returns: - str: _description_ + str: the contents of the file """ - with open(os.path.join(os.path.dirname(__file__), '..', 'graphql', 'mutations', f'{mutation_name}.graphql'), 'r', encoding='utf-8') as queryFile: - return queryFile.read() + if Path(mutation_name).exists(): + mutation_file_path = Path(mutation_name) + else: + mutation_file_path = Path(__file__).parent.parent / 'graphql' / 'mutations' / f'{mutation_name}.graphql' + + return mutation_file_path.read_text(encoding='utf-8') def search(self, search_params: RiverscapesSearchParams, progress_bar: bool = False, page_size: int = 500, sort: List[str] = None, max_results: int = None, search_query_name: str = None) -> Generator[Tuple[RiverscapesProject, Dict, int], None, None]: """ A simple function to make a yielded search on the riverscapes API diff --git a/scripts/utility/project_attribution/apply_attribution.py b/scripts/utility/project_attribution/apply_attribution.py index 2441b31..33346ea 100644 --- a/scripts/utility/project_attribution/apply_attribution.py +++ b/scripts/utility/project_attribution/apply_attribution.py @@ -9,14 +9,19 @@ 2. replace (remove any existing attribution and apply new) 3. remove (remove specific attribution but leave all others in place) +* This currently implements MODE 2 ONLY. +* The project will show as having been UPDATED BY the user running the script + Lorin Gaertner January 2026 """ +from pathlib import Path import argparse from rsxml import ProgressBar, dotenv, Logger from pydex import RiverscapesAPI # from pydex.generated_types import AttributionRoleEnum, ProjectAttributionInput, ProjectInput +# ============================================================================================ from typing import TypedDict from enum import Enum @@ -45,6 +50,7 @@ class ProjectInput(TypedDict, total=False): name: str summary: str tags: list[str] +# ============================================================================================ def build_attribution_params() -> tuple[list[str], str, list[str]]: @@ -53,7 +59,7 @@ def build_attribution_params() -> tuple[list[str], str, list[str]]: * ProjectAttribution Object Organization ID * ProjectAttribution Object list of AttributionRoleEnum """ - return (['73cc1ada-c82b-499e-b3b2-5dc70393e340'], 'cc4fff44-f470-4f4f-ada2-99f741d56b28', ['CO_FUNDER', 'DESIGNER']) + return (['73cc1ada-c82b-499e-b3b2-5dc70393e340'], 'cc4fff44-f470-4f4f-ada2-99f741d56b28', ['ANALYST']) def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str], str, list[str]]): @@ -64,7 +70,8 @@ def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str # ProjectAttribution is organization: Organization! , role [AttributionRoleEnum!] log = Logger('Apply attribution') log.title('Apply attribution') - mutation = rs_api.load_mutation('updateProject') + mutation_file = Path(__file__).parent / 'updateProjectAttribution.graphql' + mutation = rs_api.load_mutation(mutation_file) project_ids, org_id, roles = attribution_params attribution_item: ProjectAttributionInput = { diff --git a/scripts/utility/project_attribution/updateProjectAttribution.graphql b/scripts/utility/project_attribution/updateProjectAttribution.graphql new file mode 100644 index 0000000..4cf7e21 --- /dev/null +++ b/scripts/utility/project_attribution/updateProjectAttribution.graphql @@ -0,0 +1,11 @@ +mutation updateProjectAttribution($projectId: ID!, $project: ProjectInput!) { + updateProject(project: $project, projectId: $projectId) { + id + attribution { + organization { + name + } + roles + } + } +} \ No newline at end of file From 1f242f83d8c9b5beb9bf3308fcef06a848de3dae Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Tue, 27 Jan 2026 18:15:32 -0800 Subject: [PATCH 07/14] adding 3 update modes WIP --- ...enerate_python_classes_from_graphql_api.py | 3 +- .../project_attribution/apply_attribution.py | 133 +++++++++++++++--- .../getProjectAttribution.graphql | 13 ++ 3 files changed, 130 insertions(+), 19 deletions(-) create mode 100644 scripts/utility/project_attribution/getProjectAttribution.graphql diff --git a/pydex/generate_python_classes_from_graphql_api.py b/pydex/generate_python_classes_from_graphql_api.py index e1cbe32..a0160f6 100644 --- a/pydex/generate_python_classes_from_graphql_api.py +++ b/pydex/generate_python_classes_from_graphql_api.py @@ -2,13 +2,14 @@ Generate Python TypedDict definitions from a GraphQL schema. This script reads the project's 'graphql.config.json' to locate the schema file, -parses it, and generates Python `TypedDict` classes for all InputObjects. +parses it, and generates Python `TypedDict` classes for all InputObjects. and enums. This allows for type-safe construction of GraphQL mutation payloads. Quickly built with copilot/gemini 3 pro (preview) 2026-01-27 by Lorin NOTE: If we want to go deeper, there are established libraries for this: * ariadne https://github.com/mirumee/ariadne-codegen/ * https://github.com/sauldom102/gql_schema_codegen +e.g. Could add types, could make Total=True if all fields are required """ import argparse diff --git a/scripts/utility/project_attribution/apply_attribution.py b/scripts/utility/project_attribution/apply_attribution.py index 33346ea..c982561 100644 --- a/scripts/utility/project_attribution/apply_attribution.py +++ b/scripts/utility/project_attribution/apply_attribution.py @@ -9,12 +9,18 @@ 2. replace (remove any existing attribution and apply new) 3. remove (remove specific attribution but leave all others in place) -* This currently implements MODE 2 ONLY. +* This currently implements all modes. * The project will show as having been UPDATED BY the user running the script Lorin Gaertner January 2026 + +examples - all tagged 2025conus - blm funder +huc metadata is in certain huc2 +pb really likes point to folder of csv files with projects +inquirer - get org GUID and multi-select roles """ +from typing import Any from pathlib import Path import argparse @@ -37,7 +43,7 @@ class AttributionRoleEnum(str, Enum): SUPPORTER = 'SUPPORTER' -class ProjectAttributionInput(TypedDict, total=False): +class ProjectAttributionInput(TypedDict, total=True): organizationId: str roles: list['AttributionRoleEnum'] @@ -46,13 +52,26 @@ class ProjectInput(TypedDict, total=False): archived: bool attribution: list['ProjectAttributionInput'] description: str - heroImageToken: str name: str summary: str tags: list[str] + # ============================================================================================ +class ProjectAttributionOutput(TypedDict): + """Model for what we get back from the API""" + organization: dict[str, Any] # e.g. {'id': '...', 'name': '...'} + roles: list[str] + + +class UpdateMode(str, Enum): + """Allowed options for attribution changes""" + ADD = 'ADD' + REPLACE = 'REPLACE' + REMOVE = 'REMOVE' + + def build_attribution_params() -> tuple[list[str], str, list[str]]: """Assemble: * list of projects IDs to apply attribution to @@ -62,7 +81,58 @@ def build_attribution_params() -> tuple[list[str], str, list[str]]: return (['73cc1ada-c82b-499e-b3b2-5dc70393e340'], 'cc4fff44-f470-4f4f-ada2-99f741d56b28', ['ANALYST']) -def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str], str, list[str]]): +def resolve_attribution_list(current_data: list['ProjectAttributionInput'], + target_attrib_item: ProjectAttributionInput, + mode: UpdateMode) -> list['ProjectAttributionInput']: + """given the a project attribution, the change mode and the change element, + return the new attribution + * For REMOVE - it ignores the input roles and removes all attribution for that organization + """ + + # 1. Transform the current data from the output format to the input format + normalized_list: list[ProjectAttributionInput] = [] + + for item in current_data: + # Safety check for malformed data + if not item.get('organization') or not item['organization'].get('id'): + continue + + normalized_list.append({ + "organizationId": item['organization']['id'], + # Cast strings back to Enums + "roles": [AttributionRoleEnum(r) for r in item.get('roles', [])] + }) + + target_org_id = target_attrib_item['organizationId'] + + # 2. Logic + if mode == UpdateMode.REPLACE: + # Override everything + return [target_attrib_item] + + elif mode == UpdateMode.REMOVE: + # Return list without this org + # TODO: Allow for more targetted removal of a specific role + return [x for x in normalized_list if x['organizationId'] != target_org_id] + + if mode == UpdateMode.ADD: + # check if org exists + existing_index = next((i for i, x in enumerate(normalized_list) if x['organizationId'] == target_org_id), -1) + if existing_index > -1: + # MERGE: Combine existing roles with new roles (using set to avoid duplicates) + existing_roles = set(normalized_list[existing_index]['roles']) + new_roles = set(target_attrib_item['roles']) + + # Update the existing entry with the Union of roles + normalized_list[existing_index]['roles'] = list(existing_roles.union(new_roles)) + else: + # APPEND: Add new entry to list + normalized_list.append(target_attrib_item) + + return normalized_list + + +def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str], str, list[str]], mode: UpdateMode): """Apply attribution to a project TODO: Add different modes """ @@ -72,9 +142,12 @@ def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str log.title('Apply attribution') mutation_file = Path(__file__).parent / 'updateProjectAttribution.graphql' mutation = rs_api.load_mutation(mutation_file) + get_current_attrib_query_file = Path(__file__).parent / 'getProjectAttribution.graphql' + get_current_attrib_query = rs_api.load_mutation(get_current_attrib_query_file) + project_ids, org_id, roles = attribution_params - attribution_item: ProjectAttributionInput = { + target_attrib_item: ProjectAttributionInput = { "organizationId": org_id, "roles": [AttributionRoleEnum(role) for role in roles] } @@ -82,17 +155,38 @@ def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str updated = 0 prg = ProgressBar(total=len(project_ids), text='Attributing projects') for i, project_id in enumerate(project_ids): - project_update: ProjectInput = { - 'attribution': [attribution_item] - } - variables = { - "projectId": project_id, - "project": project_update - } - result = rs_api.run_query(mutation, variables) - if result is None: - raise Exception(f'Failed to update project {project_id}. Query returned: {result}') - updated += 1 + + # Step 1 .Fetch Current attribution + current_attribution = [] + try: + resp = rs_api.run_query(get_current_attrib_query, {"id": project_id}) + if resp and 'data' in resp: + current_attribution = resp['data']['project'].get('attribution', []) + print(current_attribution) + except Exception as e: + log.error(f"Failed to fetch current attribution for {project_id}: {e}") + prg.update(i+1) + continue + + # Step 2: Calculate desired new attribution state + final_list = resolve_attribution_list(current_attribution, target_attrib_item, mode) + if current_attribution == final_list: + print("No change needed") + else: + project_update: ProjectInput = { + 'attribution': [target_attrib_item] + } + variables = { + "projectId": project_id, + "project": project_update + } + try: + result = rs_api.run_query(mutation, variables) + if result is None: + raise Exception(f'Failed to update project {project_id}. Query returned: {result}') + updated += 1 + except Exception as e: + log.error(f"Error executing mutation on {project_id}: {e}") prg.update(i+1) prg.finish() print(f'Process complete. {updated} projects updated.') @@ -101,19 +195,22 @@ def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str def main(): """Main entry point - process arguments""" parser = argparse.ArgumentParser() - parser.add_argument('stage', + parser.add_argument('--stage', help='Production or staging Data Exchange', type=str, choices=['production', 'staging'], default='staging') + parser.add_argument('--mode', type=str, choices=[m.value for m in UpdateMode], default='REPLACE', + help="ADD: Append/Merge, REPLACE: Overwrite, REMOVE: Delete specific org") args = dotenv.parse_args_env(parser) log = Logger('Setup') + mode_enum = UpdateMode(args.mode) # log_path = output_path / 'report.log' # log.setup(log_path=log_path, log_level=logging.DEBUG) log.info(f'Connecting to {args.stage} environment') with RiverscapesAPI(stage=args.stage) as api: attribution_params = build_attribution_params() - apply_attribution(api, attribution_params) + apply_attribution(api, attribution_params, mode=mode_enum) if __name__ == "__main__": diff --git a/scripts/utility/project_attribution/getProjectAttribution.graphql b/scripts/utility/project_attribution/getProjectAttribution.graphql new file mode 100644 index 0000000..6612526 --- /dev/null +++ b/scripts/utility/project_attribution/getProjectAttribution.graphql @@ -0,0 +1,13 @@ +query getProjectAttribution($id: ID!) { + project(id: $id) { + id + name + attribution { + organization { + id + name + } + roles + } + } +} \ No newline at end of file From 38f46b41adc618f6da21daee9c7c129202280aad Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Tue, 27 Jan 2026 18:42:39 -0800 Subject: [PATCH 08/14] 3 modes seem to be working. doesn't update if there is no change required --- .../project_attribution/apply_attribution.py | 84 +++++++++++++------ 1 file changed, 57 insertions(+), 27 deletions(-) diff --git a/scripts/utility/project_attribution/apply_attribution.py b/scripts/utility/project_attribution/apply_attribution.py index c982561..a436b92 100644 --- a/scripts/utility/project_attribution/apply_attribution.py +++ b/scripts/utility/project_attribution/apply_attribution.py @@ -78,20 +78,16 @@ def build_attribution_params() -> tuple[list[str], str, list[str]]: * ProjectAttribution Object Organization ID * ProjectAttribution Object list of AttributionRoleEnum """ - return (['73cc1ada-c82b-499e-b3b2-5dc70393e340'], 'cc4fff44-f470-4f4f-ada2-99f741d56b28', ['ANALYST']) + return (['73cc1ada-c82b-499e-b3b2-5dc70393e340'], 'c3addb86-a96d-4831-99eb-3899764924da', ['ANALYST', 'DESIGNER']) -def resolve_attribution_list(current_data: list['ProjectAttributionInput'], - target_attrib_item: ProjectAttributionInput, - mode: UpdateMode) -> list['ProjectAttributionInput']: - """given the a project attribution, the change mode and the change element, - return the new attribution - * For REMOVE - it ignores the input roles and removes all attribution for that organization - """ - - # 1. Transform the current data from the output format to the input format +def normalize_api_data(current_data: list[Any]) -> list[ProjectAttributionInput]: + """Helper: Convert raw API Output (Nested Dicts) to Input Format (TypedDict)""" normalized_list: list[ProjectAttributionInput] = [] + if not current_data: + return normalized_list + for item in current_data: # Safety check for malformed data if not item.get('organization') or not item['organization'].get('id'): @@ -99,42 +95,75 @@ def resolve_attribution_list(current_data: list['ProjectAttributionInput'], normalized_list.append({ "organizationId": item['organization']['id'], - # Cast strings back to Enums + # Convert string roles back to proper Enums "roles": [AttributionRoleEnum(r) for r in item.get('roles', [])] }) + return normalized_list - target_org_id = target_attrib_item['organizationId'] + +def is_attribution_equal(list_a: list[ProjectAttributionInput], list_b: list[ProjectAttributionInput]) -> bool: + """Compare two attribution lists. + * Checks length + * Checks Organization ID Match + * Checks Roles (Order agnostic using Sets) + """ + if len(list_a) != len(list_b): + return False + + # We assume the order of organizations matters (e.g. Primary first) + for a, b in zip(list_a, list_b): + if a['organizationId'] != b['organizationId']: + return False + + # Compare roles as sets to ignore order (['A', 'B'] == ['B', 'A']) + if set(a['roles']) != set(b['roles']): + return False + + return True + + +def resolve_attribution_list(current_data: list[ProjectAttributionInput], + target_attrib_item: ProjectAttributionInput, + mode: UpdateMode) -> list[ProjectAttributionInput]: + """ + Takes the normalized input list, applies logic, returns new list: + * for ADD - adds the specific attribution in target to existing + * for REPLACE - all existing attributions ignored, target returned + * For REMOVE - removes all attribution for the organization specified in target + # TODO: Allow for more targetted removal of a specific role + """ # 2. Logic if mode == UpdateMode.REPLACE: # Override everything return [target_attrib_item] - elif mode == UpdateMode.REMOVE: + target_org_id = target_attrib_item['organizationId'] + if mode == UpdateMode.REMOVE: # Return list without this org - # TODO: Allow for more targetted removal of a specific role - return [x for x in normalized_list if x['organizationId'] != target_org_id] + return [x for x in current_data if x['organizationId'] != target_org_id] + working_list = [x.copy() for x in current_data] if mode == UpdateMode.ADD: # check if org exists - existing_index = next((i for i, x in enumerate(normalized_list) if x['organizationId'] == target_org_id), -1) + existing_index = next((i for i, x in enumerate(working_list) if x['organizationId'] == target_org_id), -1) if existing_index > -1: # MERGE: Combine existing roles with new roles (using set to avoid duplicates) - existing_roles = set(normalized_list[existing_index]['roles']) + existing_roles = set(working_list[existing_index]['roles']) new_roles = set(target_attrib_item['roles']) - # Update the existing entry with the Union of roles - normalized_list[existing_index]['roles'] = list(existing_roles.union(new_roles)) + # Convert back to list and cast to Enum to satisfy TypedDict + merged_roles = [AttributionRoleEnum(r) for r in existing_roles.union(new_roles)] + working_list[existing_index]['roles'] = merged_roles else: # APPEND: Add new entry to list - normalized_list.append(target_attrib_item) + working_list.append(target_attrib_item) - return normalized_list + return working_list def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str], str, list[str]], mode: UpdateMode): """Apply attribution to a project - TODO: Add different modes """ # Project.attribution is an array of [ProjectAttribution!]! # ProjectAttribution is organization: Organization! , role [AttributionRoleEnum!] @@ -161,7 +190,8 @@ def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str try: resp = rs_api.run_query(get_current_attrib_query, {"id": project_id}) if resp and 'data' in resp: - current_attribution = resp['data']['project'].get('attribution', []) + raw_data = resp['data']['project'].get('attribution', []) + current_attribution = normalize_api_data(raw_data) print(current_attribution) except Exception as e: log.error(f"Failed to fetch current attribution for {project_id}: {e}") @@ -170,11 +200,11 @@ def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str # Step 2: Calculate desired new attribution state final_list = resolve_attribution_list(current_attribution, target_attrib_item, mode) - if current_attribution == final_list: - print("No change needed") + if is_attribution_equal(current_attribution, final_list): + log.debug("No change needed") else: project_update: ProjectInput = { - 'attribution': [target_attrib_item] + 'attribution': final_list } variables = { "projectId": project_id, @@ -200,7 +230,7 @@ def main(): type=str, choices=['production', 'staging'], default='staging') - parser.add_argument('--mode', type=str, choices=[m.value for m in UpdateMode], default='REPLACE', + parser.add_argument('--mode', type=str, choices=[m.value for m in UpdateMode], default='ADD', help="ADD: Append/Merge, REPLACE: Overwrite, REMOVE: Delete specific org") args = dotenv.parse_args_env(parser) log = Logger('Setup') From 6212b377fa47c9993e4c8c829009ead8dd98cd12 Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Wed, 28 Jan 2026 13:42:37 -0800 Subject: [PATCH 09/14] improve typing and doc for run_query --- pydex/classes/RiverscapesAPI.py | 267 +++++++++++++++----------------- 1 file changed, 125 insertions(+), 142 deletions(-) diff --git a/pydex/classes/RiverscapesAPI.py b/pydex/classes/RiverscapesAPI.py index c437f3b..8722f00 100644 --- a/pydex/classes/RiverscapesAPI.py +++ b/pydex/classes/RiverscapesAPI.py @@ -1,18 +1,18 @@ +import base64 +import concurrent.futures +import hashlib +import json +import logging import os -from pathlib import Path -from typing import Dict, List, Generator, Tuple -import webbrowser import re +import threading import time -import concurrent.futures +import webbrowser +from datetime import datetime, timedelta, timezone from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Dict, Generator, List, Tuple from urllib.parse import urlencode, urlparse, urlunparse -import json -import threading -import hashlib -import base64 -import logging -from datetime import datetime, timedelta, timezone # We want to make inquirer optional so that we can use this module in other contexts try: @@ -24,21 +24,19 @@ from dateutil.parser import parse as dateparse from rsxml import Logger, ProgressBar, calculate_etag from rsxml.util import safe_makedirs + from pydex.classes.riverscapes_helpers import RiverscapesProject, RiverscapesProjectType, RiverscapesSearchParams, format_date # Disable all the weird terminal noise from urllib3 logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3").propagate = False -CHARSET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~' +CHARSET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~" LOCAL_PORT = 4721 ALT_PORT = 4723 -LOGIN_SCOPE = 'openid' +LOGIN_SCOPE = "openid" -AUTH_DETAILS = { - "domain": "auth.riverscapes.net", - "clientId": "pH1ADlGVi69rMozJS1cixkuL5DMVLhKC" -} +AUTH_DETAILS = {"domain": "auth.riverscapes.net", "clientId": "pH1ADlGVi69rMozJS1cixkuL5DMVLhKC"} class RiverscapesAPIException(Exception): @@ -63,7 +61,7 @@ class RiverscapesAPI: """ def __init__(self, stage: str = None, machine_auth: Dict[str, str] = None, dev_headers: Dict[str, str] = None): - self.log = Logger('API') + self.log = Logger("API") self.stage = stage.upper() if stage else self._get_stage_interactive() self.machine_auth = machine_auth @@ -73,14 +71,14 @@ def __init__(self, stage: str = None, machine_auth: Dict[str, str] = None, dev_h # If the RSAPI_ALTPORT environment variable is set then we use an alternative port for authentication # This is useful for keeping a local environment unblocked while also using this code inside a codespace - self.auth_port = LOCAL_PORT if not os.environ.get('RSAPI_ALTPORT') else ALT_PORT + self.auth_port = LOCAL_PORT if not os.environ.get("RSAPI_ALTPORT") else ALT_PORT - if self.stage.upper() == 'PRODUCTION': - self.uri = 'https://api.data.riverscapes.net' - elif self.stage.upper() == 'STAGING': - self.uri = 'https://api.data.riverscapes.net/staging' + if self.stage.upper() == "PRODUCTION": + self.uri = "https://api.data.riverscapes.net" + elif self.stage.upper() == "STAGING": + self.uri = "https://api.data.riverscapes.net/staging" else: - raise RiverscapesAPIException(f'Unknown stage: {stage}') + raise RiverscapesAPIException(f"Unknown stage: {stage}") def _get_stage_interactive(self): """_summary_ @@ -92,37 +90,35 @@ def _get_stage_interactive(self): raise RiverscapesAPIException("Inquirer is not installed so interactive stage choosing is not possible. Either install inquirer or specify the stage in the constructor.") questions = [ - inquirer.List('stage', message="Which Data Exchange stage?", choices=['production', 'staging'], default='production'), + inquirer.List("stage", message="Which Data Exchange stage?", choices=["production", "staging"], default="production"), ] answers = inquirer.prompt(questions) - return answers['stage'].upper() + return answers["stage"].upper() - def __enter__(self) -> 'RiverscapesAPI': - """ Allows us to use this class as a context manager - """ + def __enter__(self) -> "RiverscapesAPI": + """Allows us to use this class as a context manager""" self.refresh_token() return self def __exit__(self, _type, _value, _traceback): - """Behaviour on close when using the "with RiverscapesAPI():" Syntax - """ + """Behaviour on close when using the "with RiverscapesAPI():" Syntax""" # Make sure to shut down the token poll event so the process can exit normally self.shutdown() def _generate_challenge(self, code: str) -> str: - return self._base64_url(hashlib.sha256(code.encode('utf-8')).digest()) + return self._base64_url(hashlib.sha256(code.encode("utf-8")).digest()) def _generate_state(self, length: int) -> str: - result = '' + result = "" i = length - chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' + chars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" while i > 0: result += chars[int(round(os.urandom(1)[0] * (len(chars) - 1)))] i -= 1 return result def _base64_url(self, string: bytes) -> str: - """ Convert a string to a base64url string + """Convert a string to a base64url string Args: string (bytes): this is the string to convert @@ -130,10 +126,10 @@ def _base64_url(self, string: bytes) -> str: Returns: str: the base64url string """ - return base64.urlsafe_b64encode(string).decode('utf-8').replace('=', '').replace('+', '-').replace('/', '_') + return base64.urlsafe_b64encode(string).decode("utf-8").replace("=", "").replace("+", "-").replace("/", "_") def _generate_random(self, size: int) -> str: - """ Generate a random string of a given size + """Generate a random string of a given size Args: size (int): the size of the string to generate @@ -146,11 +142,10 @@ def _generate_random(self, size: int) -> str: for b in buffer: index = b % len(CHARSET) state.append(CHARSET[index]) - return ''.join(state) + return "".join(state) def shutdown(self): - """_summary_ - """ + """_summary_""" self.log.debug("Shutting down Riverscapes API") if self.token_timeout: self.token_timeout.cancel() @@ -179,28 +174,28 @@ def refresh_token(self, force: bool = False): # Step 1: Determine if we're machine code or user auth # If it's machine then we can fetch tokens much easier: if self.machine_auth: - token_uri = self.uri if self.uri.endswith('/') else self.uri + '/' - token_uri += 'token' + token_uri = self.uri if self.uri.endswith("/") else self.uri + "/" + token_uri += "token" options = { - 'method': 'POST', - 'url': token_uri, - 'headers': {'content-type': 'application/x-www-form-urlencoded'}, - 'data': { - 'audience': 'https://api.riverscapes.net', - 'grant_type': 'client_credentials', - 'scope': 'machine:admin', - 'client_id': self.machine_auth['clientId'], - 'client_secret': self.machine_auth['secretId'], + "method": "POST", + "url": token_uri, + "headers": {"content-type": "application/x-www-form-urlencoded"}, + "data": { + "audience": "https://api.riverscapes.net", + "grant_type": "client_credentials", + "scope": "machine:admin", + "client_id": self.machine_auth["clientId"], + "client_secret": self.machine_auth["secretId"], }, - 'timeout': 30 + "timeout": 30, } try: get_token_return = requests.request(**options).json() # NOTE: RETRY IS NOT NECESSARY HERE because we do our refresh on the API side of things # self.tokenTimeout = setTimeout(self.refreshToken, 1000 * getTokenReturn['expires_in'] - 20) - self.access_token = get_token_return['access_token'] + self.access_token = get_token_return["access_token"] self.log.info("SUCCESSFUL Machine Authentication") except Exception as error: self.log.info(f"Access Token error {error}") @@ -240,14 +235,13 @@ def refresh_token(self, force: bool = False): response = requests.post(authentication_url, headers={"content-type": "application/x-www-form-urlencoded"}, data=data, timeout=30) response.raise_for_status() res = response.json() - self.token_timeout = threading.Timer( - res["expires_in"] - 20, self.refresh_token) + self.token_timeout = threading.Timer(res["expires_in"] - 20, self.refresh_token) self.token_timeout.start() self.access_token = res["access_token"] self.log.info("SUCCESSFUL Browser Authentication") def _wait_for_auth_code(self): - """ Wait for the auth code to come back from the server using a simple HTTP server + """Wait for the auth code to come back from the server using a simple HTTP server Raises: Exception: _description_ @@ -255,6 +249,7 @@ def _wait_for_auth_code(self): Returns: _type_: _description_ """ + class AuthHandler(BaseHTTPRequestHandler): """_summary_ @@ -263,13 +258,11 @@ class AuthHandler(BaseHTTPRequestHandler): """ def stop(self): - """Stop the server - """ + """Stop the server""" self.server.shutdown() def do_GET(self): - """ Do all the server stuff here - """ + """Do all the server stuff here""" self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() @@ -292,12 +285,11 @@ def do_GET(self): """ - self.wfile.write(success_html_body.encode('utf-8')) + self.wfile.write(success_html_body.encode("utf-8")) query = urlparse(self.path).query if "=" in query and "code" in query: - self.server.auth_code = dict(x.split("=") - for x in query.split("&"))["code"] + self.server.auth_code = dict(x.split("=") for x in query.split("&"))["code"] # Now shut down the server and return self.stop() @@ -315,7 +307,7 @@ def do_GET(self): return auth_code def load_query(self, query_name: str) -> str: - """ Load a query file from the file system. + """Load a query file from the file system. Args: queryName (str): _description_ @@ -323,11 +315,11 @@ def load_query(self, query_name: str) -> str: Returns: str: _description_ """ - with open(os.path.join(os.path.dirname(__file__), '..', 'graphql', 'queries', f'{query_name}.graphql'), 'r', encoding='utf-8') as queryFile: + with open(os.path.join(os.path.dirname(__file__), "..", "graphql", "queries", f"{query_name}.graphql"), "r", encoding="utf-8") as queryFile: return queryFile.read() def load_mutation(self, mutation_name: str | Path) -> str: - """ Load a mutation file from the file system graphql/mutations folder or from a specific path. + """Load a mutation file from the file system graphql/mutations folder or from a specific path. Args: mutationName (str|Path): name of mutation in library, or Path to .graphql file @@ -338,12 +330,14 @@ def load_mutation(self, mutation_name: str | Path) -> str: if Path(mutation_name).exists(): mutation_file_path = Path(mutation_name) else: - mutation_file_path = Path(__file__).parent.parent / 'graphql' / 'mutations' / f'{mutation_name}.graphql' + mutation_file_path = Path(__file__).parent.parent / "graphql" / "mutations" / f"{mutation_name}.graphql" - return mutation_file_path.read_text(encoding='utf-8') + return mutation_file_path.read_text(encoding="utf-8") - def search(self, search_params: RiverscapesSearchParams, progress_bar: bool = False, page_size: int = 500, sort: List[str] = None, max_results: int = None, search_query_name: str = None) -> Generator[Tuple[RiverscapesProject, Dict, int], None, None]: - """ A simple function to make a yielded search on the riverscapes API + def search( + self, search_params: RiverscapesSearchParams, progress_bar: bool = False, page_size: int = 500, sort: List[str] = None, max_results: int = None, search_query_name: str = None + ) -> Generator[Tuple[RiverscapesProject, Dict, int], None, None]: + """A simple function to make a yielded search on the riverscapes API This search has two modes: If the total number of records is less than 10,000 then it will do a single paginated query. If the total number of records is greater than 10,000 then it will do a date-partitioned search. @@ -358,12 +352,12 @@ def search(self, search_params: RiverscapesSearchParams, progress_bar: bool = Fa Yields: Tuple[project: RiverscapeProject, stats: Dict[str, any], total: int]: the project, the stats dictionary and the total number of records """ - qry = self.load_query(search_query_name if search_query_name else 'searchProjects') + qry = self.load_query(search_query_name if search_query_name else "searchProjects") stats = {} # NOTE: DO NOT CHANGE THE SORT ORDER HERE. IT WILL BREAK THE PAGINATION. # why not make this the default argument instead of None? LSG - sort = sort if sort else ['DATE_CREATED_DESC'] + sort = sort if sort else ["DATE_CREATED_DESC"] if not search_params or not isinstance(search_params, RiverscapesSearchParams): raise RiverscapesAPIException("search requires a valid RiverscapesSearchParams object") @@ -371,35 +365,32 @@ def search(self, search_params: RiverscapesSearchParams, progress_bar: bool = Fa # First make a quick query to get the total number of records search_params_gql = search_params.to_gql() stats_results = self.run_query(qry, {"searchParams": search_params_gql, "limit": 0, "offset": 0, "sort": sort}) - overall_total = stats_results['data']['searchProjects']['total'] - stats = stats_results['data']['searchProjects']['stats'] - _prg = ProgressBar(overall_total, 30, 'Search Progress') + overall_total = stats_results["data"]["searchProjects"]["total"] + stats = stats_results["data"]["searchProjects"]["stats"] + _prg = ProgressBar(overall_total, 30, "Search Progress") self.log.debug(f"Total records: {overall_total:,} .... starting retrieval...") if max_results and max_results > 0: self.log.debug(f" ... but max_results is set to {max_results:,} so we will stop there.") # Set initial to and from dates so that we can paginate through more than 10,000 recirds now_date = datetime.now(timezone.utc) - createdOn = search_params_gql.get('createdOn', {}) - search_to_date = dateparse(createdOn.get('to')) if createdOn.get('to') else now_date - search_from_date = dateparse(createdOn.get('from')) if createdOn.get('from') else None + createdOn = search_params_gql.get("createdOn", {}) + search_to_date = dateparse(createdOn.get("to")) if createdOn.get("to") else now_date + search_from_date = dateparse(createdOn.get("from")) if createdOn.get("from") else None num_results = 1 # Just to get the loop started outer_counter = 0 while outer_counter < overall_total and num_results > 0: - search_params_gql['createdOn'] = { - "to": format_date(search_to_date), - "from": format_date(search_from_date) if search_from_date else None - } + search_params_gql["createdOn"] = {"to": format_date(search_to_date), "from": format_date(search_from_date) if search_from_date else None} if progress_bar: _prg.update(outer_counter) # self.log.debug(f" Searching from {search_from_date} to {search_to_date}") results = self.run_query(qry, {"searchParams": search_params_gql, "limit": page_size, "offset": 0, "sort": sort}) - projects = results['data']['searchProjects']['results'] + projects = results["data"]["searchProjects"]["results"] num_results = len(projects) inner_counter = 0 project = None for search_result in projects: - project_raw = search_result['item'] + project_raw = search_result["item"] if progress_bar: _prg.update(outer_counter + inner_counter) project = RiverscapesProject(project_raw) @@ -425,15 +416,7 @@ def search(self, search_params: RiverscapesSearchParams, progress_bar: bool = Fa _prg.finish() self.log.debug(f"Search complete: retrieved {outer_counter:,} records") - def process_search_results_async(self, - callback: callable, - search_params: RiverscapesSearchParams, - progress_bar: bool = False, - page_size: int = 500, - sort: List[str] = None, - max_results: int = None, - max_workers=5 - ): + def process_search_results_async(self, callback: callable, search_params: RiverscapesSearchParams, progress_bar: bool = False, page_size: int = 500, sort: List[str] = None, max_results: int = None, max_workers=5): """ Considerations: @@ -455,7 +438,7 @@ def process_search_results_async(self, max_results (int, optional): SAME AS THE SEARCH FUNCTION max_workers (int, optional): Here is where you can set the number of workers for the ThreadPoolExecutor. Defaults to 5. """ - log = Logger('API') + log = Logger("API") with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for project, _stats, _total, _prg in self.search(search_params, progress_bar=progress_bar, page_size=page_size, sort=sort, max_results=max_results): @@ -475,10 +458,9 @@ def process_search_results_async(self, future.result() # Gather results or handle exceptions except Exception as e: log.error(f"Project {project.id} generated an exception: {e}") - return def get_project_full(self, project_id: str) -> RiverscapesProject: - """ This gets the full project record + """This gets the full project record This is a MUCH heavier query than what comes back from the search function. If all you need is the project metadata this is probably not the query for you @@ -489,12 +471,12 @@ def get_project_full(self, project_id: str) -> RiverscapesProject: Returns: _type_: _description_ """ - qry = self.load_query('getProjectFull') + qry = self.load_query("getProjectFull") results = self.run_query(qry, {"id": project_id}) - return RiverscapesProject(results['data']['project']) + return RiverscapesProject(results["data"]["project"]) def get_project_files(self, project_id: str) -> List[Dict[str, any]]: - """ This returns the file listing with everything you need to download project files + """This returns the file listing with everything you need to download project files Args: @@ -503,9 +485,9 @@ def get_project_files(self, project_id: str) -> List[Dict[str, any]]: Returns: _type_: _description_ """ - qry = self.load_query('projectFiles') + qry = self.load_query("projectFiles") results = self.run_query(qry, {"projectId": project_id}) - return results['data']['project']['files'] + return results["data"]["project"]["files"] def get_project_types(self) -> Dict[str, RiverscapesProjectType]: """_summary_ @@ -513,22 +495,22 @@ def get_project_types(self) -> Dict[str, RiverscapesProjectType]: Returns: _type_: _description_ """ - qry = self.load_query('projectTypes') + qry = self.load_query("projectTypes") offset = 0 limit = 100 total = -1 results = [] while total < 0 or offset < total: qry_results = self.run_query(qry, {"limit": limit, "offset": offset}) - total = qry_results['data']['projectTypes']['total'] + total = qry_results["data"]["projectTypes"]["total"] offset += limit - for x in qry_results['data']['projectTypes']['items']: + for x in qry_results["data"]["projectTypes"]["items"]: results.append(x) - return {x['machineName']: RiverscapesProjectType(x) for x in results} + return {x["machineName"]: RiverscapesProjectType(x) for x in results} def search_count(self, search_params: RiverscapesSearchParams): - """ Return the number of records that match the search parameters + """Return the number of records that match the search parameters Args: query (str): _description_ variables (Dict[str, str]): _description_ @@ -536,41 +518,38 @@ def search_count(self, search_params: RiverscapesSearchParams): Returns: Tuple[total: int, Dict[str, any]]: the total results and the stats dictionary """ - qry = self.load_query('searchCount') + qry = self.load_query("searchCount") if not search_params or not isinstance(search_params, RiverscapesSearchParams): raise RiverscapesAPIException("searchCount requires a valid RiverscapesSearchParams object") if search_params.keywords is not None or search_params.name is not None: raise RiverscapesAPIException("searchCount does not support keywords or name search parameters as you will always get a large, non-representative count because of low-scoring items") results = self.run_query(qry, {"searchParams": search_params.to_gql(), "limit": 0, "offset": 0}) - total = results['data']['searchProjects']['total'] - stats = results['data']['searchProjects']['stats'] + total = results["data"]["searchProjects"]["total"] + stats = results["data"]["searchProjects"]["stats"] return (total, stats) - def run_query(self, query, variables): - """ A simple function to use requests.post to make the API call. Note the json= section. + def run_query(self, query: str, variables: dict) -> dict: + """A simple function to use requests.post to make the API call. Note the json= section. Args: - query (_type_): _description_ - variables (_type_): _description_ + query (str): GraphQL query string + variables (dict): mapping variable names to values Raises: - Exception: _description_ + Exception: RiverscapesAPIException Returns: - _type_: _description_ + dict: parsed JSON response from the API """ headers = {"authorization": "Bearer " + self.access_token} if self.access_token else {} - request = requests.post(self.uri, json={ - 'query': query, - 'variables': variables - }, headers=headers, timeout=30) + request = requests.post(self.uri, json={"query": query, "variables": variables}, headers=headers, timeout=30) if request.status_code == 200: resp_json = request.json() - if 'errors' in resp_json and len(resp_json['errors']) > 0: + if "errors" in resp_json and len(resp_json["errors"]) > 0: # Authentication timeout: re-login and retry the query - if len(list(filter(lambda err: 'You must be authenticated' in err['message'], resp_json['errors']))) > 0: + if len(list(filter(lambda err: "You must be authenticated" in err["message"], resp_json["errors"]))) > 0: self.log.debug("Authentication timed out. Fetching new token...") self.refresh_token() self.log.debug(" done. Re-trying query...") @@ -586,7 +565,7 @@ def run_query(self, query, variables): raise RiverscapesAPIException(f"Query failed to run by returning code of {request.status_code}. {query} {json.dumps(variables)}") def download_files(self, project_id: str, download_dir: str, re_filter: List[str] = None, force=False): - """ From a project id get all relevant files and download them + """From a project id get all relevant files and download them Args: project_id (_type_): _description_ @@ -600,12 +579,19 @@ def download_files(self, project_id: str, download_dir: str, re_filter: List[str # Now filter the list of files to anything that remains after the regex filter filtered_files = [] for file in file_results: - if not 'localPath' in file: - self.log.warning('File has no localPath. Skipping') + if "localPath" not in file: + self.log.warning("File has no localPath. Skipping") continue # now filter the if re_filter is not None and len(re_filter) > 0: - if not any([re.compile(x, re.IGNORECASE).match(file['localPath'], ) for x in re_filter]): + if not any( + [ + re.compile(x, re.IGNORECASE).match( + file["localPath"], + ) + for x in re_filter + ] + ): continue filtered_files.append(file) @@ -614,11 +600,11 @@ def download_files(self, project_id: str, download_dir: str, re_filter: List[str return for file in filtered_files: - local_file_path = os.path.join(download_dir, file['localPath']) + local_file_path = os.path.join(download_dir, file["localPath"]) self.download_file(file, local_file_path, force) def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=False): - """ NOTE: The directory for this file will be created if it doesn't exist + """NOTE: The directory for this file will be created if it doesn't exist Arguments: api_file_obj {[type]} -- The dictionary that the API returns. should include the name, md5, size etc @@ -628,7 +614,7 @@ def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=Fal force {bool} -- if true we will download regardless """ file_is_there = os.path.exists(local_path) and os.path.isfile(local_path) - etag_match = file_is_there and calculate_etag(local_path) == api_file_obj['etag'] + etag_match = file_is_there and calculate_etag(local_path) == api_file_obj["etag"] file_directory = os.path.dirname(local_path) @@ -641,9 +627,9 @@ def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=Fal if force is True or not file_is_there or not etag_match: if not etag_match and file_is_there: - self.log.info(f' File etag mismatch. Re-downloading: {local_path}') + self.log.info(f" File etag mismatch. Re-downloading: {local_path}") elif not file_is_there: - self.log.info(f' Downloading: {local_path}') + self.log.info(f" Downloading: {local_path}") max_retries = 3 # could parameterize for attempt in range(max_retries): @@ -651,11 +637,11 @@ def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=Fal # errors occurred here once, so we try to catch it: # Exception has occurred: ConnectionError # ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)) - r = requests.get(api_file_obj['downloadUrl'], allow_redirects=True, stream=True, timeout=30) - total_length = r.headers.get('content-length') + r = requests.get(api_file_obj["downloadUrl"], allow_redirects=True, stream=True, timeout=30) + total_length = r.headers.get("content-length") dl = 0 - with open(local_path, 'wb') as f: + with open(local_path, "wb") as f: if total_length is None: # no content length header f.write(r.content) else: @@ -667,28 +653,25 @@ def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=Fal progbar.erase() return True except requests.ConnectionError as e: - self.log.warning(f"Connection error on attempt {attempt+1}: {e}") + self.log.warning(f"Connection error on attempt {attempt + 1}: {e}") if attempt < max_retries - 1: - time.sleep(2 ** attempt) # Exponential backoff + time.sleep(2**attempt) # Exponential backoff else: raise else: - self.log.debug(f' File already exists (skipping): {local_path}') + self.log.debug(f" File already exists (skipping): {local_path}") return False -if __name__ == '__main__': - log = Logger('API') - gql = RiverscapesAPI(os.environ.get('RS_API_URL')) +if __name__ == "__main__": + log = Logger("API") + gql = RiverscapesAPI(os.environ.get("RS_API_URL")) gql.refresh_token() log.debug(gql.access_token) gql.shutdown() # remember to shutdown so the threaded timer doesn't keep the process alive - gql2 = RiverscapesAPI(os.environ.get('RS_API_URL'), { - 'clientId': os.environ['RS_CLIENT_ID'], - 'secretId': os.environ['RS_CLIENT_SECRET'] - }) + gql2 = RiverscapesAPI(os.environ.get("RS_API_URL"), {"clientId": os.environ["RS_CLIENT_ID"], "secretId": os.environ["RS_CLIENT_SECRET"]}) gql2.refresh_token() log.debug(gql2.access_token) gql2.shutdown() # remember to shutdown so the threaded timer doesn't keep the process alive From ec067a07b72b57a0f35c4af921ffe9f6a6647c5d Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Wed, 28 Jan 2026 13:43:01 -0800 Subject: [PATCH 10/14] adding ruff (should not affect users without ruff) --- pyproject.toml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index a0f50be..3700038 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,3 +46,19 @@ include-package-data = true [tool.setuptools.package-data] pydex = ["**/*.graphql", "**/*.json"] + +[tool.ruff] +line-length = 240 +target-version = "py312" + +[tool.ruff.lint] +# I = isort (import sorting fix) +# E, F = Flake8 (standard errors) +# PL = Pylint (replaces Pylint's rules) +# N = pep8 naming checks +select = ["I", "E", "F", "PL", "N"] + +[tool.ruff.lint.isort] +# Tells Ruff exactly which imports are "local" to your project +known-first-party = ["pydex"] +combine-as-imports = true \ No newline at end of file From 4805471f3d2e4a19cc114ee6936fb631c567578f Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Wed, 28 Jan 2026 14:53:05 -0800 Subject: [PATCH 11/14] worked, tested, and documented --- .vscode/launch.json | 28 +- .../project_attribution/apply_attribution.py | 308 ++++++++++++------ 2 files changed, 244 insertions(+), 92 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index c10120a..a450236 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -56,6 +56,32 @@ "console": "integratedTerminal", "justMyCode": true }, + { + "name": "Apply attribution to projects in bulk", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/scripts/utility/project_attribution/apply_attribution.py", + "cwd": "${workspaceFolder}", + "console": "integratedTerminal", + "envFile": "${workspaceFolder}/.env", + "env": { + "PYTHONPATH": "${workspaceFolder}" + }, + "args": [ + "--stage", + "staging", // staging or production + "--mode", + "REMOVE", // ADD, REPLACE or REMOVE + "--csv-folder", + "{env:CSV_FOLDER}", + "--organization", + "70bc51f9-f630-43e8-ae33-88765fa611ba", + "--roles", + "OWNER", + "DESIGNER", + "--verbose" + ] + }, { "name": "📦 Merge Projects Tool", "type": "debugpy", @@ -179,7 +205,7 @@ "PYTHONPATH": "${workspaceFolder}" }, "args": [ - "production", + "staging", "{env:CSV_FOLDER}", ] }, diff --git a/scripts/utility/project_attribution/apply_attribution.py b/scripts/utility/project_attribution/apply_attribution.py index a436b92..c206bd2 100644 --- a/scripts/utility/project_attribution/apply_attribution.py +++ b/scripts/utility/project_attribution/apply_attribution.py @@ -1,75 +1,94 @@ """Bulk apply project attribution to projects in Data Exchange -issue https://github.com/Riverscapes/rs-web-monorepo/issues/861 + +The workhorse function is `apply_attribution`. The inputs are a list of projects, an organization ID and a list of roles. +The main function will help user select a csv file (from a specified folder) containing the list of projects. A project's attribution consists of a list of attribution objects, each of which is an Organization and a list of Roles from the AttributionRoleEnum -There are three ways someone might want to change attribution: -1. add (do not change existing, apply new on top of it) -2. replace (remove any existing attribution and apply new) -3. remove (remove specific attribution but leave all others in place) +There are three MODES for attribution change: +1. ADD (do not change existing, apply new on top of it) +2. REPLACE (remove any existing attribution and apply new) +3. REMOVE (remove specific attribution but leave all others in place) + +* This currently implements all modes +* in REMOVE mode, it removes _all_ attribution for that organization (leaving other organizations in place) +* It only updates project if there is a change. +* When a project is updated by the script, the project will show as having been UPDATED BY the user running the script (logging into Data Exchange) -* This currently implements all modes. -* The project will show as having been UPDATED BY the user running the script +## Example usage: +"Add BLM as funder to all the 2025 CONUS projects." +* Run Athena query to get the IDs of all projects tagged 2025CONUS. `SELECT project_id FROM conus_projects WHERE contains(tags,'2025CONUS')` +* download results as csv e.g. `conusprojects.csv` +* look up the BLM organization ID in Data Exchange: 876d3961-08f2-4db5-aff2-7ccfa391b984 +* Run `apply_attribution --stage production --csv-file conusprojects.csv --organization 876d3961-08f2-4db5-aff2-7ccfa391b984 --role FUNDER --mode ADD` Lorin Gaertner January 2026 -examples - all tagged 2025conus - blm funder -huc metadata is in certain huc2 -pb really likes point to folder of csv files with projects -inquirer - get org GUID and multi-select roles +These classes objects originally came from pydex.generated_types (and could be imported): + AttributionRoleEnum, ProjectAttributionInput, ProjectInput + +Possible enhancements: +* if Organization or roles not provided in command line, use inquirer option to get from user (with multi-select for roles) +* more selective removal option - to remove specific role for an organization """ -from typing import Any -from pathlib import Path + import argparse +import logging +import uuid +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import Any, TypedDict + +import inquirer +from rsxml import Logger, ProgressBar, dotenv -from rsxml import ProgressBar, dotenv, Logger from pydex import RiverscapesAPI -# from pydex.generated_types import AttributionRoleEnum, ProjectAttributionInput, ProjectInput -# ============================================================================================ -from typing import TypedDict -from enum import Enum class AttributionRoleEnum(str, Enum): - ANALYST = 'ANALYST' - CONTRIBUTOR = 'CONTRIBUTOR' - CO_FUNDER = 'CO_FUNDER' - DESIGNER = 'DESIGNER' - FUNDER = 'FUNDER' - OWNER = 'OWNER' - QA_QC = 'QA_QC' - SUPPORTER = 'SUPPORTER' + ANALYST = "ANALYST" + CONTRIBUTOR = "CONTRIBUTOR" + CO_FUNDER = "CO_FUNDER" + DESIGNER = "DESIGNER" + FUNDER = "FUNDER" + OWNER = "OWNER" + QA_QC = "QA_QC" + SUPPORTER = "SUPPORTER" class ProjectAttributionInput(TypedDict, total=True): organizationId: str - roles: list['AttributionRoleEnum'] + roles: list["AttributionRoleEnum"] class ProjectInput(TypedDict, total=False): archived: bool - attribution: list['ProjectAttributionInput'] + attribution: list["ProjectAttributionInput"] description: str name: str summary: str tags: list[str] + # ============================================================================================ class ProjectAttributionOutput(TypedDict): """Model for what we get back from the API""" + organization: dict[str, Any] # e.g. {'id': '...', 'name': '...'} roles: list[str] class UpdateMode(str, Enum): """Allowed options for attribution changes""" - ADD = 'ADD' - REPLACE = 'REPLACE' - REMOVE = 'REMOVE' + + ADD = "ADD" + REPLACE = "REPLACE" + REMOVE = "REMOVE" def build_attribution_params() -> tuple[list[str], str, list[str]]: @@ -78,7 +97,7 @@ def build_attribution_params() -> tuple[list[str], str, list[str]]: * ProjectAttribution Object Organization ID * ProjectAttribution Object list of AttributionRoleEnum """ - return (['73cc1ada-c82b-499e-b3b2-5dc70393e340'], 'c3addb86-a96d-4831-99eb-3899764924da', ['ANALYST', 'DESIGNER']) + return (["73cc1ada-c82b-499e-b3b2-5dc70393e340"], "c3addb86-a96d-4831-99eb-3899764924da", ["ANALYST", "DESIGNER"]) def normalize_api_data(current_data: list[Any]) -> list[ProjectAttributionInput]: @@ -90,14 +109,16 @@ def normalize_api_data(current_data: list[Any]) -> list[ProjectAttributionInput] for item in current_data: # Safety check for malformed data - if not item.get('organization') or not item['organization'].get('id'): + if not item.get("organization") or not item["organization"].get("id"): continue - normalized_list.append({ - "organizationId": item['organization']['id'], - # Convert string roles back to proper Enums - "roles": [AttributionRoleEnum(r) for r in item.get('roles', [])] - }) + normalized_list.append( + { + "organizationId": item["organization"]["id"], + # Convert string roles back to proper Enums + "roles": [AttributionRoleEnum(r) for r in item.get("roles", [])], + } + ) return normalized_list @@ -112,19 +133,17 @@ def is_attribution_equal(list_a: list[ProjectAttributionInput], list_b: list[Pro # We assume the order of organizations matters (e.g. Primary first) for a, b in zip(list_a, list_b): - if a['organizationId'] != b['organizationId']: + if a["organizationId"] != b["organizationId"]: return False # Compare roles as sets to ignore order (['A', 'B'] == ['B', 'A']) - if set(a['roles']) != set(b['roles']): + if set(a["roles"]) != set(b["roles"]): return False return True -def resolve_attribution_list(current_data: list[ProjectAttributionInput], - target_attrib_item: ProjectAttributionInput, - mode: UpdateMode) -> list[ProjectAttributionInput]: +def resolve_attribution_list(current_data: list[ProjectAttributionInput], target_attrib_item: ProjectAttributionInput, mode: UpdateMode) -> list[ProjectAttributionInput]: """ Takes the normalized input list, applies logic, returns new list: * for ADD - adds the specific attribution in target to existing @@ -138,23 +157,23 @@ def resolve_attribution_list(current_data: list[ProjectAttributionInput], # Override everything return [target_attrib_item] - target_org_id = target_attrib_item['organizationId'] + target_org_id = target_attrib_item["organizationId"] if mode == UpdateMode.REMOVE: # Return list without this org - return [x for x in current_data if x['organizationId'] != target_org_id] + return [x for x in current_data if x["organizationId"] != target_org_id] working_list = [x.copy() for x in current_data] if mode == UpdateMode.ADD: # check if org exists - existing_index = next((i for i, x in enumerate(working_list) if x['organizationId'] == target_org_id), -1) + existing_index = next((i for i, x in enumerate(working_list) if x["organizationId"] == target_org_id), -1) if existing_index > -1: # MERGE: Combine existing roles with new roles (using set to avoid duplicates) - existing_roles = set(working_list[existing_index]['roles']) - new_roles = set(target_attrib_item['roles']) + existing_roles = set(working_list[existing_index]["roles"]) + new_roles = set(target_attrib_item["roles"]) # Convert back to list and cast to Enum to satisfy TypedDict merged_roles = [AttributionRoleEnum(r) for r in existing_roles.union(new_roles)] - working_list[existing_index]['roles'] = merged_roles + working_list[existing_index]["roles"] = merged_roles else: # APPEND: Add new entry to list working_list.append(target_attrib_item) @@ -162,40 +181,34 @@ def resolve_attribution_list(current_data: list[ProjectAttributionInput], return working_list -def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str], str, list[str]], mode: UpdateMode): - """Apply attribution to a project - """ +def apply_attribution(rs_api: RiverscapesAPI, mode: UpdateMode, project_ids: list[str], org_id: str, roles: list[str]): + """Apply attribution to a project""" # Project.attribution is an array of [ProjectAttribution!]! # ProjectAttribution is organization: Organization! , role [AttributionRoleEnum!] - log = Logger('Apply attribution') - log.title('Apply attribution') - mutation_file = Path(__file__).parent / 'updateProjectAttribution.graphql' + log = Logger("Apply attribution") + log.title("Apply attribution") + mutation_file = Path(__file__).parent / "updateProjectAttribution.graphql" mutation = rs_api.load_mutation(mutation_file) - get_current_attrib_query_file = Path(__file__).parent / 'getProjectAttribution.graphql' + get_current_attrib_query_file = Path(__file__).parent / "getProjectAttribution.graphql" get_current_attrib_query = rs_api.load_mutation(get_current_attrib_query_file) - project_ids, org_id, roles = attribution_params - - target_attrib_item: ProjectAttributionInput = { - "organizationId": org_id, - "roles": [AttributionRoleEnum(role) for role in roles] - } + target_attrib_item: ProjectAttributionInput = {"organizationId": org_id, "roles": [AttributionRoleEnum(role) for role in roles]} updated = 0 - prg = ProgressBar(total=len(project_ids), text='Attributing projects') + prg = ProgressBar(total=len(project_ids), text="Attributing projects") for i, project_id in enumerate(project_ids): - + log.debug(f"Processing Project ID {project_id}") # Step 1 .Fetch Current attribution current_attribution = [] try: resp = rs_api.run_query(get_current_attrib_query, {"id": project_id}) - if resp and 'data' in resp: - raw_data = resp['data']['project'].get('attribution', []) + if resp and "data" in resp: + raw_data = resp["data"]["project"].get("attribution", []) current_attribution = normalize_api_data(raw_data) - print(current_attribution) + log.debug(f"Current attribution: {current_attribution}") except Exception as e: log.error(f"Failed to fetch current attribution for {project_id}: {e}") - prg.update(i+1) + prg.update(i + 1) continue # Step 2: Calculate desired new attribution state @@ -203,44 +216,157 @@ def apply_attribution(rs_api: RiverscapesAPI, attribution_params: tuple[list[str if is_attribution_equal(current_attribution, final_list): log.debug("No change needed") else: - project_update: ProjectInput = { - 'attribution': final_list - } - variables = { - "projectId": project_id, - "project": project_update - } + project_update: ProjectInput = {"attribution": final_list} + variables = {"projectId": project_id, "project": project_update} try: result = rs_api.run_query(mutation, variables) if result is None: - raise Exception(f'Failed to update project {project_id}. Query returned: {result}') + raise Exception(f"Failed to update project {project_id}. Query returned: {result}") updated += 1 + log.debug(f"New attribution: {final_list}") except Exception as e: log.error(f"Error executing mutation on {project_id}: {e}") - prg.update(i+1) + prg.update(i + 1) prg.finish() - print(f'Process complete. {updated} projects updated.') + log.info(f"Process complete. {updated} projects updated.") + + +def get_file_from_folder(folder: Path, ext: str = ".csv") -> Path | None: + """prompt user for csv file from within specified folder + returns: path to the chosen file or None otherwise + This could easily be adjusted to get files of + """ + log = Logger("get file from folder") + if not (folder.exists() and folder.is_dir()): + log.error(f"The path {folder} does not exist or is not a directory. Please provide a valid folder with CSV files.") + return + + # Get a list of all CSV files in the specified folder. Do not walk to subfolders. + matching_files = [f for f in folder.iterdir() if f.suffix == ext] + if not matching_files: + log.error(f"No `.{ext}` files found in {folder}. Please provide a valid folder with {ext} files.") + return + + answers = inquirer.prompt([inquirer.List("file_path", message=f"Select a {ext} file to use", choices=matching_files)]) + if not answers: + log.error("No file selected.") + return + csv_path = folder / answers["file_path"] + return csv_path + + +def load_ids_from_csv(csvfile: Path) -> list[str]: + """ + Load a list of GUIDs from a CSV file. + Assumes the file exists and is a valid path. + Ignores the first row if it looks like a header (non-GUID). + Strips whitespace from each entry. + Logs a warning if any non-GUID values are found. + Returns a list of GUID strings only. + """ + log = Logger("load IDs from file") + project_ids = [] + non_guids = [] + lines = csvfile.read_text().splitlines() + for i, line in enumerate(lines): + value = line.strip().strip(",") + # Remove surrounding single or double quotes if present + if (value.startswith('"') and value.endswith('"')) or (value.startswith("'") and value.endswith("'")): + value = value[1:-1] + value = value.strip() + if not value: + continue + try: + # Try to parse as UUID + uuid_obj = uuid.UUID(value) + project_ids.append(str(uuid_obj)) + except (ValueError, AttributeError): + # Ignore first row if it looks like a header + if i == 0: + continue + non_guids.append(value) + if non_guids: + log.warning(f"Found {len(non_guids)} non-GUID values in CSV e.g. {non_guids[0]}. These will not be processed.") + return project_ids + + +def get_organization_name(rs_api: RiverscapesAPI, organization_id: str) -> str | None: + """Look up organization by ID and return its name or None if not found.""" + get_org_qry = """ +query getOrganization($id: ID!) { + organization(id: $id) { + name + } +} +""" + log = Logger("Get organization name") + try: + resp = rs_api.run_query(get_org_qry, {"id": organization_id}) + if resp and "data" in resp and resp["data"].get("organization"): + return resp["data"]["organization"]["name"] + return None + except Exception as e: + log.error(f"No organization found for id {organization_id}: {e}") + return None def main(): """Main entry point - process arguments""" parser = argparse.ArgumentParser() - parser.add_argument('--stage', - help='Production or staging Data Exchange', - type=str, - choices=['production', 'staging'], - default='staging') - parser.add_argument('--mode', type=str, choices=[m.value for m in UpdateMode], default='ADD', - help="ADD: Append/Merge, REPLACE: Overwrite, REMOVE: Delete specific org") + parser.add_argument("--stage", help="Production or staging Data Exchange", type=str, choices=["production", "staging"], default="staging") + parser.add_argument("--mode", type=str, choices=[m.value for m in UpdateMode], default="ADD", help="ADD: Append/Merge, REPLACE: Overwrite, REMOVE: Delete specific org") + # because we use dotenv.parse_args_env we need to parser to get strings rather than path objects + parser.add_argument("--csv-file", help="path to specific csv file with projectIDs to process", type=str) + parser.add_argument("--csv-folder", help="Folder containing CSV files with project IDs, from which a file can be chosen interactively", type=str) + parser.add_argument("--organization", help="GUID for the organization whose attribution will be added or removed", type=str) + parser.add_argument( + "--roles", + nargs="+", + choices=[role.value for role in AttributionRoleEnum], + help="one or more roles to add or replace for the supplied organization and projects e.g. FUNDER OWNER. Ignored for REMOVE mode (all attributions are removed)", + type=str, + ) + parser.add_argument("--yes", "-y", help="Assume yes to all prompts and run without confirmation.", action="store_true") + parser.add_argument("--verbose", "-v", help="Verbose logging output", action="store_true") + # Parse arguments and inquire from user, as needed args = dotenv.parse_args_env(parser) - log = Logger('Setup') + log = Logger("Setup") + + datestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + if args.verbose: + log_level = logging.DEBUG + else: + log_level = logging.INFO + log_path = Path.cwd() / f"apply_attribution_{datestamp}.log" + log.setup(log_path=log_path, log_level=log_level) mode_enum = UpdateMode(args.mode) - # log_path = output_path / 'report.log' - # log.setup(log_path=log_path, log_level=logging.DEBUG) - log.info(f'Connecting to {args.stage} environment') + # get csv_file of projects + csv_file = None + if args.csv_file: + csv_path = Path(args.csv_file) + csv_file = csv_path if csv_path.exists() else None + elif args.csv_folder: + folder_path = Path(args.csv_folder) + csv_file = get_file_from_folder(folder_path) + if not csv_file: + log.error("No file of projects to process provided. Exiting.") + return + project_id_list = load_ids_from_csv(csv_file) + + log.info(f"Connecting to {args.stage} environment") with RiverscapesAPI(stage=args.stage) as api: - attribution_params = build_attribution_params() - apply_attribution(api, attribution_params, mode=mode_enum) + organization_id = args.organization + org_name = get_organization_name(api, organization_id) + if not org_name: + log.error(f"Invalid Organization ID: {organization_id}") + roles = args.roles + log.info(f"Ready to alter attribution using {mode_enum} for {organization_id} ({org_name}) (ROLES {roles}) to {len(project_id_list)} projects from {csv_file}.") + # final review for user + if not args.yes: + proceed = inquirer.prompt([inquirer.Confirm("proceed", message="Proceed?", default=True)]) + if not proceed or not proceed.get("proceed", False): + return + apply_attribution(api, mode_enum, project_id_list, organization_id, roles) if __name__ == "__main__": From 4041a176bdf1113428c46626969f71a846b3c09e Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Wed, 28 Jan 2026 15:01:14 -0800 Subject: [PATCH 12/14] tweak launch and logging --- .vscode/launch.json | 11 +++++------ .../utility/project_attribution/apply_attribution.py | 3 ++- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index a450236..392ec96 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -69,16 +69,15 @@ }, "args": [ "--stage", - "staging", // staging or production + "production", // staging or production "--mode", - "REMOVE", // ADD, REPLACE or REMOVE + "ADD", // ADD, REPLACE or REMOVE "--csv-folder", "{env:CSV_FOLDER}", "--organization", - "70bc51f9-f630-43e8-ae33-88765fa611ba", - "--roles", - "OWNER", - "DESIGNER", + "f1b8e6ae-d103-4ffb-b402-a9ee0eaf7607", //NAR on production + "--roles", // OWNER DESIGNER CO_FUNDER etc. + "CO_FUNDER", "--verbose" ] }, diff --git a/scripts/utility/project_attribution/apply_attribution.py b/scripts/utility/project_attribution/apply_attribution.py index c206bd2..650edd2 100644 --- a/scripts/utility/project_attribution/apply_attribution.py +++ b/scripts/utility/project_attribution/apply_attribution.py @@ -338,6 +338,7 @@ def main(): else: log_level = logging.INFO log_path = Path.cwd() / f"apply_attribution_{datestamp}.log" + print(f"Logging to {log_path} with level {log_level}.") log.setup(log_path=log_path, log_level=log_level) mode_enum = UpdateMode(args.mode) # get csv_file of projects @@ -360,7 +361,7 @@ def main(): if not org_name: log.error(f"Invalid Organization ID: {organization_id}") roles = args.roles - log.info(f"Ready to alter attribution using {mode_enum} for {organization_id} ({org_name}) (ROLES {roles}) to {len(project_id_list)} projects from {csv_file}.") + log.info(f"Ready to alter attribution using {mode_enum} \n for {organization_id} ({org_name}) \n (ROLES {roles}) \n to {len(project_id_list)} projects \n from {csv_file}.") # final review for user if not args.yes: proceed = inquirer.prompt([inquirer.Confirm("proceed", message="Proceed?", default=True)]) From 6bb99c0c2f63f075decd08415f6007c81b826425 Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Wed, 28 Jan 2026 15:19:39 -0800 Subject: [PATCH 13/14] Revert "improve typing and doc for run_query" as it also changed quotes and line length etc This reverts commit 6212b377fa47c9993e4c8c829009ead8dd98cd12. --- pydex/classes/RiverscapesAPI.py | 267 +++++++++++++++++--------------- 1 file changed, 142 insertions(+), 125 deletions(-) diff --git a/pydex/classes/RiverscapesAPI.py b/pydex/classes/RiverscapesAPI.py index 8722f00..c437f3b 100644 --- a/pydex/classes/RiverscapesAPI.py +++ b/pydex/classes/RiverscapesAPI.py @@ -1,18 +1,18 @@ -import base64 -import concurrent.futures -import hashlib -import json -import logging import os +from pathlib import Path +from typing import Dict, List, Generator, Tuple +import webbrowser import re -import threading import time -import webbrowser -from datetime import datetime, timedelta, timezone +import concurrent.futures from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer -from pathlib import Path -from typing import Dict, Generator, List, Tuple from urllib.parse import urlencode, urlparse, urlunparse +import json +import threading +import hashlib +import base64 +import logging +from datetime import datetime, timedelta, timezone # We want to make inquirer optional so that we can use this module in other contexts try: @@ -24,19 +24,21 @@ from dateutil.parser import parse as dateparse from rsxml import Logger, ProgressBar, calculate_etag from rsxml.util import safe_makedirs - from pydex.classes.riverscapes_helpers import RiverscapesProject, RiverscapesProjectType, RiverscapesSearchParams, format_date # Disable all the weird terminal noise from urllib3 logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("urllib3").propagate = False -CHARSET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~" +CHARSET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~' LOCAL_PORT = 4721 ALT_PORT = 4723 -LOGIN_SCOPE = "openid" +LOGIN_SCOPE = 'openid' -AUTH_DETAILS = {"domain": "auth.riverscapes.net", "clientId": "pH1ADlGVi69rMozJS1cixkuL5DMVLhKC"} +AUTH_DETAILS = { + "domain": "auth.riverscapes.net", + "clientId": "pH1ADlGVi69rMozJS1cixkuL5DMVLhKC" +} class RiverscapesAPIException(Exception): @@ -61,7 +63,7 @@ class RiverscapesAPI: """ def __init__(self, stage: str = None, machine_auth: Dict[str, str] = None, dev_headers: Dict[str, str] = None): - self.log = Logger("API") + self.log = Logger('API') self.stage = stage.upper() if stage else self._get_stage_interactive() self.machine_auth = machine_auth @@ -71,14 +73,14 @@ def __init__(self, stage: str = None, machine_auth: Dict[str, str] = None, dev_h # If the RSAPI_ALTPORT environment variable is set then we use an alternative port for authentication # This is useful for keeping a local environment unblocked while also using this code inside a codespace - self.auth_port = LOCAL_PORT if not os.environ.get("RSAPI_ALTPORT") else ALT_PORT + self.auth_port = LOCAL_PORT if not os.environ.get('RSAPI_ALTPORT') else ALT_PORT - if self.stage.upper() == "PRODUCTION": - self.uri = "https://api.data.riverscapes.net" - elif self.stage.upper() == "STAGING": - self.uri = "https://api.data.riverscapes.net/staging" + if self.stage.upper() == 'PRODUCTION': + self.uri = 'https://api.data.riverscapes.net' + elif self.stage.upper() == 'STAGING': + self.uri = 'https://api.data.riverscapes.net/staging' else: - raise RiverscapesAPIException(f"Unknown stage: {stage}") + raise RiverscapesAPIException(f'Unknown stage: {stage}') def _get_stage_interactive(self): """_summary_ @@ -90,35 +92,37 @@ def _get_stage_interactive(self): raise RiverscapesAPIException("Inquirer is not installed so interactive stage choosing is not possible. Either install inquirer or specify the stage in the constructor.") questions = [ - inquirer.List("stage", message="Which Data Exchange stage?", choices=["production", "staging"], default="production"), + inquirer.List('stage', message="Which Data Exchange stage?", choices=['production', 'staging'], default='production'), ] answers = inquirer.prompt(questions) - return answers["stage"].upper() + return answers['stage'].upper() - def __enter__(self) -> "RiverscapesAPI": - """Allows us to use this class as a context manager""" + def __enter__(self) -> 'RiverscapesAPI': + """ Allows us to use this class as a context manager + """ self.refresh_token() return self def __exit__(self, _type, _value, _traceback): - """Behaviour on close when using the "with RiverscapesAPI():" Syntax""" + """Behaviour on close when using the "with RiverscapesAPI():" Syntax + """ # Make sure to shut down the token poll event so the process can exit normally self.shutdown() def _generate_challenge(self, code: str) -> str: - return self._base64_url(hashlib.sha256(code.encode("utf-8")).digest()) + return self._base64_url(hashlib.sha256(code.encode('utf-8')).digest()) def _generate_state(self, length: int) -> str: - result = "" + result = '' i = length - chars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ' while i > 0: result += chars[int(round(os.urandom(1)[0] * (len(chars) - 1)))] i -= 1 return result def _base64_url(self, string: bytes) -> str: - """Convert a string to a base64url string + """ Convert a string to a base64url string Args: string (bytes): this is the string to convert @@ -126,10 +130,10 @@ def _base64_url(self, string: bytes) -> str: Returns: str: the base64url string """ - return base64.urlsafe_b64encode(string).decode("utf-8").replace("=", "").replace("+", "-").replace("/", "_") + return base64.urlsafe_b64encode(string).decode('utf-8').replace('=', '').replace('+', '-').replace('/', '_') def _generate_random(self, size: int) -> str: - """Generate a random string of a given size + """ Generate a random string of a given size Args: size (int): the size of the string to generate @@ -142,10 +146,11 @@ def _generate_random(self, size: int) -> str: for b in buffer: index = b % len(CHARSET) state.append(CHARSET[index]) - return "".join(state) + return ''.join(state) def shutdown(self): - """_summary_""" + """_summary_ + """ self.log.debug("Shutting down Riverscapes API") if self.token_timeout: self.token_timeout.cancel() @@ -174,28 +179,28 @@ def refresh_token(self, force: bool = False): # Step 1: Determine if we're machine code or user auth # If it's machine then we can fetch tokens much easier: if self.machine_auth: - token_uri = self.uri if self.uri.endswith("/") else self.uri + "/" - token_uri += "token" + token_uri = self.uri if self.uri.endswith('/') else self.uri + '/' + token_uri += 'token' options = { - "method": "POST", - "url": token_uri, - "headers": {"content-type": "application/x-www-form-urlencoded"}, - "data": { - "audience": "https://api.riverscapes.net", - "grant_type": "client_credentials", - "scope": "machine:admin", - "client_id": self.machine_auth["clientId"], - "client_secret": self.machine_auth["secretId"], + 'method': 'POST', + 'url': token_uri, + 'headers': {'content-type': 'application/x-www-form-urlencoded'}, + 'data': { + 'audience': 'https://api.riverscapes.net', + 'grant_type': 'client_credentials', + 'scope': 'machine:admin', + 'client_id': self.machine_auth['clientId'], + 'client_secret': self.machine_auth['secretId'], }, - "timeout": 30, + 'timeout': 30 } try: get_token_return = requests.request(**options).json() # NOTE: RETRY IS NOT NECESSARY HERE because we do our refresh on the API side of things # self.tokenTimeout = setTimeout(self.refreshToken, 1000 * getTokenReturn['expires_in'] - 20) - self.access_token = get_token_return["access_token"] + self.access_token = get_token_return['access_token'] self.log.info("SUCCESSFUL Machine Authentication") except Exception as error: self.log.info(f"Access Token error {error}") @@ -235,13 +240,14 @@ def refresh_token(self, force: bool = False): response = requests.post(authentication_url, headers={"content-type": "application/x-www-form-urlencoded"}, data=data, timeout=30) response.raise_for_status() res = response.json() - self.token_timeout = threading.Timer(res["expires_in"] - 20, self.refresh_token) + self.token_timeout = threading.Timer( + res["expires_in"] - 20, self.refresh_token) self.token_timeout.start() self.access_token = res["access_token"] self.log.info("SUCCESSFUL Browser Authentication") def _wait_for_auth_code(self): - """Wait for the auth code to come back from the server using a simple HTTP server + """ Wait for the auth code to come back from the server using a simple HTTP server Raises: Exception: _description_ @@ -249,7 +255,6 @@ def _wait_for_auth_code(self): Returns: _type_: _description_ """ - class AuthHandler(BaseHTTPRequestHandler): """_summary_ @@ -258,11 +263,13 @@ class AuthHandler(BaseHTTPRequestHandler): """ def stop(self): - """Stop the server""" + """Stop the server + """ self.server.shutdown() def do_GET(self): - """Do all the server stuff here""" + """ Do all the server stuff here + """ self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() @@ -285,11 +292,12 @@ def do_GET(self): """ - self.wfile.write(success_html_body.encode("utf-8")) + self.wfile.write(success_html_body.encode('utf-8')) query = urlparse(self.path).query if "=" in query and "code" in query: - self.server.auth_code = dict(x.split("=") for x in query.split("&"))["code"] + self.server.auth_code = dict(x.split("=") + for x in query.split("&"))["code"] # Now shut down the server and return self.stop() @@ -307,7 +315,7 @@ def do_GET(self): return auth_code def load_query(self, query_name: str) -> str: - """Load a query file from the file system. + """ Load a query file from the file system. Args: queryName (str): _description_ @@ -315,11 +323,11 @@ def load_query(self, query_name: str) -> str: Returns: str: _description_ """ - with open(os.path.join(os.path.dirname(__file__), "..", "graphql", "queries", f"{query_name}.graphql"), "r", encoding="utf-8") as queryFile: + with open(os.path.join(os.path.dirname(__file__), '..', 'graphql', 'queries', f'{query_name}.graphql'), 'r', encoding='utf-8') as queryFile: return queryFile.read() def load_mutation(self, mutation_name: str | Path) -> str: - """Load a mutation file from the file system graphql/mutations folder or from a specific path. + """ Load a mutation file from the file system graphql/mutations folder or from a specific path. Args: mutationName (str|Path): name of mutation in library, or Path to .graphql file @@ -330,14 +338,12 @@ def load_mutation(self, mutation_name: str | Path) -> str: if Path(mutation_name).exists(): mutation_file_path = Path(mutation_name) else: - mutation_file_path = Path(__file__).parent.parent / "graphql" / "mutations" / f"{mutation_name}.graphql" + mutation_file_path = Path(__file__).parent.parent / 'graphql' / 'mutations' / f'{mutation_name}.graphql' - return mutation_file_path.read_text(encoding="utf-8") + return mutation_file_path.read_text(encoding='utf-8') - def search( - self, search_params: RiverscapesSearchParams, progress_bar: bool = False, page_size: int = 500, sort: List[str] = None, max_results: int = None, search_query_name: str = None - ) -> Generator[Tuple[RiverscapesProject, Dict, int], None, None]: - """A simple function to make a yielded search on the riverscapes API + def search(self, search_params: RiverscapesSearchParams, progress_bar: bool = False, page_size: int = 500, sort: List[str] = None, max_results: int = None, search_query_name: str = None) -> Generator[Tuple[RiverscapesProject, Dict, int], None, None]: + """ A simple function to make a yielded search on the riverscapes API This search has two modes: If the total number of records is less than 10,000 then it will do a single paginated query. If the total number of records is greater than 10,000 then it will do a date-partitioned search. @@ -352,12 +358,12 @@ def search( Yields: Tuple[project: RiverscapeProject, stats: Dict[str, any], total: int]: the project, the stats dictionary and the total number of records """ - qry = self.load_query(search_query_name if search_query_name else "searchProjects") + qry = self.load_query(search_query_name if search_query_name else 'searchProjects') stats = {} # NOTE: DO NOT CHANGE THE SORT ORDER HERE. IT WILL BREAK THE PAGINATION. # why not make this the default argument instead of None? LSG - sort = sort if sort else ["DATE_CREATED_DESC"] + sort = sort if sort else ['DATE_CREATED_DESC'] if not search_params or not isinstance(search_params, RiverscapesSearchParams): raise RiverscapesAPIException("search requires a valid RiverscapesSearchParams object") @@ -365,32 +371,35 @@ def search( # First make a quick query to get the total number of records search_params_gql = search_params.to_gql() stats_results = self.run_query(qry, {"searchParams": search_params_gql, "limit": 0, "offset": 0, "sort": sort}) - overall_total = stats_results["data"]["searchProjects"]["total"] - stats = stats_results["data"]["searchProjects"]["stats"] - _prg = ProgressBar(overall_total, 30, "Search Progress") + overall_total = stats_results['data']['searchProjects']['total'] + stats = stats_results['data']['searchProjects']['stats'] + _prg = ProgressBar(overall_total, 30, 'Search Progress') self.log.debug(f"Total records: {overall_total:,} .... starting retrieval...") if max_results and max_results > 0: self.log.debug(f" ... but max_results is set to {max_results:,} so we will stop there.") # Set initial to and from dates so that we can paginate through more than 10,000 recirds now_date = datetime.now(timezone.utc) - createdOn = search_params_gql.get("createdOn", {}) - search_to_date = dateparse(createdOn.get("to")) if createdOn.get("to") else now_date - search_from_date = dateparse(createdOn.get("from")) if createdOn.get("from") else None + createdOn = search_params_gql.get('createdOn', {}) + search_to_date = dateparse(createdOn.get('to')) if createdOn.get('to') else now_date + search_from_date = dateparse(createdOn.get('from')) if createdOn.get('from') else None num_results = 1 # Just to get the loop started outer_counter = 0 while outer_counter < overall_total and num_results > 0: - search_params_gql["createdOn"] = {"to": format_date(search_to_date), "from": format_date(search_from_date) if search_from_date else None} + search_params_gql['createdOn'] = { + "to": format_date(search_to_date), + "from": format_date(search_from_date) if search_from_date else None + } if progress_bar: _prg.update(outer_counter) # self.log.debug(f" Searching from {search_from_date} to {search_to_date}") results = self.run_query(qry, {"searchParams": search_params_gql, "limit": page_size, "offset": 0, "sort": sort}) - projects = results["data"]["searchProjects"]["results"] + projects = results['data']['searchProjects']['results'] num_results = len(projects) inner_counter = 0 project = None for search_result in projects: - project_raw = search_result["item"] + project_raw = search_result['item'] if progress_bar: _prg.update(outer_counter + inner_counter) project = RiverscapesProject(project_raw) @@ -416,7 +425,15 @@ def search( _prg.finish() self.log.debug(f"Search complete: retrieved {outer_counter:,} records") - def process_search_results_async(self, callback: callable, search_params: RiverscapesSearchParams, progress_bar: bool = False, page_size: int = 500, sort: List[str] = None, max_results: int = None, max_workers=5): + def process_search_results_async(self, + callback: callable, + search_params: RiverscapesSearchParams, + progress_bar: bool = False, + page_size: int = 500, + sort: List[str] = None, + max_results: int = None, + max_workers=5 + ): """ Considerations: @@ -438,7 +455,7 @@ def process_search_results_async(self, callback: callable, search_params: Rivers max_results (int, optional): SAME AS THE SEARCH FUNCTION max_workers (int, optional): Here is where you can set the number of workers for the ThreadPoolExecutor. Defaults to 5. """ - log = Logger("API") + log = Logger('API') with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [] for project, _stats, _total, _prg in self.search(search_params, progress_bar=progress_bar, page_size=page_size, sort=sort, max_results=max_results): @@ -458,9 +475,10 @@ def process_search_results_async(self, callback: callable, search_params: Rivers future.result() # Gather results or handle exceptions except Exception as e: log.error(f"Project {project.id} generated an exception: {e}") + return def get_project_full(self, project_id: str) -> RiverscapesProject: - """This gets the full project record + """ This gets the full project record This is a MUCH heavier query than what comes back from the search function. If all you need is the project metadata this is probably not the query for you @@ -471,12 +489,12 @@ def get_project_full(self, project_id: str) -> RiverscapesProject: Returns: _type_: _description_ """ - qry = self.load_query("getProjectFull") + qry = self.load_query('getProjectFull') results = self.run_query(qry, {"id": project_id}) - return RiverscapesProject(results["data"]["project"]) + return RiverscapesProject(results['data']['project']) def get_project_files(self, project_id: str) -> List[Dict[str, any]]: - """This returns the file listing with everything you need to download project files + """ This returns the file listing with everything you need to download project files Args: @@ -485,9 +503,9 @@ def get_project_files(self, project_id: str) -> List[Dict[str, any]]: Returns: _type_: _description_ """ - qry = self.load_query("projectFiles") + qry = self.load_query('projectFiles') results = self.run_query(qry, {"projectId": project_id}) - return results["data"]["project"]["files"] + return results['data']['project']['files'] def get_project_types(self) -> Dict[str, RiverscapesProjectType]: """_summary_ @@ -495,22 +513,22 @@ def get_project_types(self) -> Dict[str, RiverscapesProjectType]: Returns: _type_: _description_ """ - qry = self.load_query("projectTypes") + qry = self.load_query('projectTypes') offset = 0 limit = 100 total = -1 results = [] while total < 0 or offset < total: qry_results = self.run_query(qry, {"limit": limit, "offset": offset}) - total = qry_results["data"]["projectTypes"]["total"] + total = qry_results['data']['projectTypes']['total'] offset += limit - for x in qry_results["data"]["projectTypes"]["items"]: + for x in qry_results['data']['projectTypes']['items']: results.append(x) - return {x["machineName"]: RiverscapesProjectType(x) for x in results} + return {x['machineName']: RiverscapesProjectType(x) for x in results} def search_count(self, search_params: RiverscapesSearchParams): - """Return the number of records that match the search parameters + """ Return the number of records that match the search parameters Args: query (str): _description_ variables (Dict[str, str]): _description_ @@ -518,38 +536,41 @@ def search_count(self, search_params: RiverscapesSearchParams): Returns: Tuple[total: int, Dict[str, any]]: the total results and the stats dictionary """ - qry = self.load_query("searchCount") + qry = self.load_query('searchCount') if not search_params or not isinstance(search_params, RiverscapesSearchParams): raise RiverscapesAPIException("searchCount requires a valid RiverscapesSearchParams object") if search_params.keywords is not None or search_params.name is not None: raise RiverscapesAPIException("searchCount does not support keywords or name search parameters as you will always get a large, non-representative count because of low-scoring items") results = self.run_query(qry, {"searchParams": search_params.to_gql(), "limit": 0, "offset": 0}) - total = results["data"]["searchProjects"]["total"] - stats = results["data"]["searchProjects"]["stats"] + total = results['data']['searchProjects']['total'] + stats = results['data']['searchProjects']['stats'] return (total, stats) - def run_query(self, query: str, variables: dict) -> dict: - """A simple function to use requests.post to make the API call. Note the json= section. + def run_query(self, query, variables): + """ A simple function to use requests.post to make the API call. Note the json= section. Args: - query (str): GraphQL query string - variables (dict): mapping variable names to values + query (_type_): _description_ + variables (_type_): _description_ Raises: - Exception: RiverscapesAPIException + Exception: _description_ Returns: - dict: parsed JSON response from the API + _type_: _description_ """ headers = {"authorization": "Bearer " + self.access_token} if self.access_token else {} - request = requests.post(self.uri, json={"query": query, "variables": variables}, headers=headers, timeout=30) + request = requests.post(self.uri, json={ + 'query': query, + 'variables': variables + }, headers=headers, timeout=30) if request.status_code == 200: resp_json = request.json() - if "errors" in resp_json and len(resp_json["errors"]) > 0: + if 'errors' in resp_json and len(resp_json['errors']) > 0: # Authentication timeout: re-login and retry the query - if len(list(filter(lambda err: "You must be authenticated" in err["message"], resp_json["errors"]))) > 0: + if len(list(filter(lambda err: 'You must be authenticated' in err['message'], resp_json['errors']))) > 0: self.log.debug("Authentication timed out. Fetching new token...") self.refresh_token() self.log.debug(" done. Re-trying query...") @@ -565,7 +586,7 @@ def run_query(self, query: str, variables: dict) -> dict: raise RiverscapesAPIException(f"Query failed to run by returning code of {request.status_code}. {query} {json.dumps(variables)}") def download_files(self, project_id: str, download_dir: str, re_filter: List[str] = None, force=False): - """From a project id get all relevant files and download them + """ From a project id get all relevant files and download them Args: project_id (_type_): _description_ @@ -579,19 +600,12 @@ def download_files(self, project_id: str, download_dir: str, re_filter: List[str # Now filter the list of files to anything that remains after the regex filter filtered_files = [] for file in file_results: - if "localPath" not in file: - self.log.warning("File has no localPath. Skipping") + if not 'localPath' in file: + self.log.warning('File has no localPath. Skipping') continue # now filter the if re_filter is not None and len(re_filter) > 0: - if not any( - [ - re.compile(x, re.IGNORECASE).match( - file["localPath"], - ) - for x in re_filter - ] - ): + if not any([re.compile(x, re.IGNORECASE).match(file['localPath'], ) for x in re_filter]): continue filtered_files.append(file) @@ -600,11 +614,11 @@ def download_files(self, project_id: str, download_dir: str, re_filter: List[str return for file in filtered_files: - local_file_path = os.path.join(download_dir, file["localPath"]) + local_file_path = os.path.join(download_dir, file['localPath']) self.download_file(file, local_file_path, force) def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=False): - """NOTE: The directory for this file will be created if it doesn't exist + """ NOTE: The directory for this file will be created if it doesn't exist Arguments: api_file_obj {[type]} -- The dictionary that the API returns. should include the name, md5, size etc @@ -614,7 +628,7 @@ def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=Fal force {bool} -- if true we will download regardless """ file_is_there = os.path.exists(local_path) and os.path.isfile(local_path) - etag_match = file_is_there and calculate_etag(local_path) == api_file_obj["etag"] + etag_match = file_is_there and calculate_etag(local_path) == api_file_obj['etag'] file_directory = os.path.dirname(local_path) @@ -627,9 +641,9 @@ def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=Fal if force is True or not file_is_there or not etag_match: if not etag_match and file_is_there: - self.log.info(f" File etag mismatch. Re-downloading: {local_path}") + self.log.info(f' File etag mismatch. Re-downloading: {local_path}') elif not file_is_there: - self.log.info(f" Downloading: {local_path}") + self.log.info(f' Downloading: {local_path}') max_retries = 3 # could parameterize for attempt in range(max_retries): @@ -637,11 +651,11 @@ def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=Fal # errors occurred here once, so we try to catch it: # Exception has occurred: ConnectionError # ('Connection aborted.', ConnectionResetError(10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)) - r = requests.get(api_file_obj["downloadUrl"], allow_redirects=True, stream=True, timeout=30) - total_length = r.headers.get("content-length") + r = requests.get(api_file_obj['downloadUrl'], allow_redirects=True, stream=True, timeout=30) + total_length = r.headers.get('content-length') dl = 0 - with open(local_path, "wb") as f: + with open(local_path, 'wb') as f: if total_length is None: # no content length header f.write(r.content) else: @@ -653,25 +667,28 @@ def download_file(self, api_file_obj: Dict[str, any], local_path: str, force=Fal progbar.erase() return True except requests.ConnectionError as e: - self.log.warning(f"Connection error on attempt {attempt + 1}: {e}") + self.log.warning(f"Connection error on attempt {attempt+1}: {e}") if attempt < max_retries - 1: - time.sleep(2**attempt) # Exponential backoff + time.sleep(2 ** attempt) # Exponential backoff else: raise else: - self.log.debug(f" File already exists (skipping): {local_path}") + self.log.debug(f' File already exists (skipping): {local_path}') return False -if __name__ == "__main__": - log = Logger("API") - gql = RiverscapesAPI(os.environ.get("RS_API_URL")) +if __name__ == '__main__': + log = Logger('API') + gql = RiverscapesAPI(os.environ.get('RS_API_URL')) gql.refresh_token() log.debug(gql.access_token) gql.shutdown() # remember to shutdown so the threaded timer doesn't keep the process alive - gql2 = RiverscapesAPI(os.environ.get("RS_API_URL"), {"clientId": os.environ["RS_CLIENT_ID"], "secretId": os.environ["RS_CLIENT_SECRET"]}) + gql2 = RiverscapesAPI(os.environ.get('RS_API_URL'), { + 'clientId': os.environ['RS_CLIENT_ID'], + 'secretId': os.environ['RS_CLIENT_SECRET'] + }) gql2.refresh_token() log.debug(gql2.access_token) gql2.shutdown() # remember to shutdown so the threaded timer doesn't keep the process alive From c31640faacc01d71ceb3fdea3fde5d919d91663d Mon Sep 17 00:00:00 2001 From: Lorin Gaertner Date: Wed, 28 Jan 2026 15:49:41 -0800 Subject: [PATCH 14/14] improve typing and doc for run_query without changing rest of file --- pydex/classes/RiverscapesAPI.py | 12 ++++++------ pyproject.toml | 3 +++ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pydex/classes/RiverscapesAPI.py b/pydex/classes/RiverscapesAPI.py index c437f3b..32785ef 100644 --- a/pydex/classes/RiverscapesAPI.py +++ b/pydex/classes/RiverscapesAPI.py @@ -547,18 +547,18 @@ def search_count(self, search_params: RiverscapesSearchParams): stats = results['data']['searchProjects']['stats'] return (total, stats) - def run_query(self, query, variables): - """ A simple function to use requests.post to make the API call. Note the json= section. + def run_query(self, query: str, variables: dict) -> dict: + """A simple function to use requests.post to make the API call. Note the json= section. Args: - query (_type_): _description_ - variables (_type_): _description_ + query (str): GraphQL query string + variables (dict): mapping variable names to values Raises: - Exception: _description_ + Exception: RiverscapesAPIException Returns: - _type_: _description_ + dict: parsed JSON response from the API """ headers = {"authorization": "Bearer " + self.access_token} if self.access_token else {} request = requests.post(self.uri, json={ diff --git a/pyproject.toml b/pyproject.toml index 3700038..bd802a4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,9 @@ pydex = ["**/*.graphql", "**/*.json"] line-length = 240 target-version = "py312" +[tool.ruff.format] +quote-style = "preserve" + [tool.ruff.lint] # I = isort (import sorting fix) # E, F = Flake8 (standard errors)