Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 166 additions & 1 deletion commands/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def resources(
if identifier:
print(identifier)
else:
print(json.dumps(hit, indent=2))
print(json.dumps(hit))
count += 1

if limit and count >= limit:
Expand All @@ -233,3 +233,168 @@ def resources(
except Exception as e:
logger.error(f"Error fetching resources: {e}")
raise click.Abort()


@search.command(name="import-candidates")
@click.pass_obj
@click.option(
"--page-size",
type=int,
default=100,
help="Number of results per page (default: 100)",
)
@click.option(
"--limit",
type=int,
help="Maximum number of total results to return (default: unlimited)",
)
@click.option(
"--import-status",
type=str,
help="Filter by import status (e.g., NOT_IMPORTED, IMPORTED)",
)
@click.option(
"--type",
"publication_type",
type=str,
help="Filter by publication type (e.g., AcademicChapter, AcademicArticle)",
)
@click.option(
"--publication-year",
type=str,
help="Filter by publication year, or year range as 'from,to' (e.g., 2025 or 2023,2025)",
)
@click.option(
"--order-by",
type=str,
help="Order results by field (e.g., createdDate, modifiedDate)",
)
@click.option(
"--sort-order",
type=str,
default="desc",
help="Sort order: asc or desc (default: desc)",
)
@click.option(
"--aggregation",
type=str,
default="none",
help="Aggregation parameter (default: none)",
)
@click.option(
"--category",
type=str,
help="Filter by category",
)
@click.option(
"--contributor",
type=str,
help="Filter by contributor ID",
)
@click.option(
"--publisher",
type=str,
help="Filter by publisher ID",
)
@click.option(
"--title",
type=str,
help="Filter by title",
)
@click.option(
"--doi",
type=str,
help="Filter by DOI",
)
@click.option(
"--top-level-organization",
type=str,
help="Filter by top-level organization ID",
)
@click.option(
"--id-only",
is_flag=True,
help="Output only identifiers (one per line)",
)
@click.option(
"--query",
type=str,
multiple=True,
help="Additional query parameters as key=value (can be used multiple times)",
)
def import_candidates(
ctx: AppContext,
page_size: int,
limit: int | None,
import_status: str | None,
publication_type: str | None,
publication_year: str | None,
order_by: str | None,
sort_order: str,
aggregation: str,
category: str | None,
contributor: str | None,
publisher: str | None,
title: str | None,
doi: str | None,
top_level_organization: str | None,
id_only: bool,
query: Tuple[str, ...],
) -> None:
"""Search import candidates for the authenticated customer.

Examples:
# Search for not-imported academic chapters from 2025
uv run cli.py search import-candidates --import-status NOT_IMPORTED --type AcademicChapter --publication-year 2025

# Search with all aggregations, sorted by created date
uv run cli.py search import-candidates --aggregation all --order-by createdDate --sort-order desc

# Output only identifiers
uv run cli.py search import-candidates --import-status NOT_IMPORTED --id-only
"""
PARAM_MAP = {
"importStatus": import_status,
"type": publication_type,
"publicationYear": publication_year,
"orderBy": order_by,
"sortOrder": sort_order,
"aggregation": aggregation,
"category": category,
"contributor": contributor,
"publisher": publisher,
"title": title,
"doi": doi,
"topLevelOrganization": top_level_organization,
}
query_params = {key: value for key, value in PARAM_MAP.items() if value}

for q in query:
if "=" in q:
key, value = q.split("=", 1)
query_params[key] = value
else:
logger.warning(f"Ignoring invalid query parameter: {q}")

search_service = SearchApiService(profile=ctx.profile)

try:
count = 0
for hit in search_service.import_candidates_search(query_params, page_size):
if id_only:
identifier = hit.get("identifier")
if identifier:
print(identifier)
else:
print(json.dumps(hit))
count += 1

if limit and count >= limit:
break

if count > 0:
logger.info(f"Total results: {count}")

except Exception as e:
logger.error(f"Error fetching import candidates: {e}")
raise click.Abort()
91 changes: 91 additions & 0 deletions commands/services/search_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import boto3
import json
import logging
import requests
from datetime import datetime, timedelta
from requests.exceptions import JSONDecodeError
from tenacity import (
retry,
Expand All @@ -17,12 +19,45 @@ class SearchApiService:
def __init__(self, profile: Optional[str]) -> None:
self.session = boto3.Session(profile_name=profile)
self.ssm = self.session.client("ssm")
self.secretsmanager = self.session.client("secretsmanager")
self.api_domain = self._get_system_parameter("/NVA/ApiDomain")
self._cognito_uri: Optional[str] = None
self._client_credentials: Optional[Dict[str, str]] = None
self._token: Optional[str] = None
self._token_expiry_time: datetime = datetime.now()

def _get_system_parameter(self, name: str) -> str:
response = self.ssm.get_parameter(Name=name)
return response["Parameter"]["Value"]

def _get_secret(self, name: str) -> Dict[str, str]:
response = self.secretsmanager.get_secret_value(SecretId=name)
return json.loads(response["SecretString"])

def _get_cognito_token(self) -> str:
url = f"{self._cognito_uri}/oauth2/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"grant_type": "client_credentials",
"client_id": self._client_credentials["backendClientId"],
"client_secret": self._client_credentials["backendClientSecret"],
}
response = requests.post(url, headers=headers, data=data)
response_json = response.json()
self._token_expiry_time = datetime.now() + timedelta(seconds=response_json["expires_in"])
return response_json["access_token"]

def _is_token_expired(self) -> bool:
return datetime.now() > self._token_expiry_time - timedelta(seconds=30)

def _get_token(self) -> str:
if not self._cognito_uri:
self._cognito_uri = self._get_system_parameter("/NVA/CognitoUri")
self._client_credentials = self._get_secret("BackendCognitoClientCredentials")
if not self._token or self._is_token_expired():
self._token = self._get_cognito_token()
return self._token

def get_uri(self, type: str) -> str:
return f"https://{self.api_domain}/search/{type}"

Expand Down Expand Up @@ -120,3 +155,59 @@ def resource_search(

if offset >= total_hits:
break

def import_candidates_search(
self,
query_parameters: Dict[str, Any],
page_size: int = 100,
) -> Generator[Dict[str, Any], None, None]:
url = self.get_uri("customer/import-candidates")
offset = 0

while True:
params = {
**query_parameters,
"from": offset,
"size": page_size,
}
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {self._get_token()}",
}

try:
response = self._make_search_request(url, headers, params)
except requests.exceptions.HTTPError as e:
logger.error(
f"Failed to search after retries. Status: {e.response.status_code}",
)
if e.response.status_code >= 400:
try:
logger.error(f"Error detail: {e.response.json()}")
except (ValueError, JSONDecodeError):
logger.error(f"Error detail: {e.response.text}")
break
except requests.exceptions.RequestException as e:
logger.error(f"Network error after retries: {e}")
break

if response.status_code != 200:
logger.error(
f"Failed to search. {response.status_code}: {response.json()}"
)
break

response_data = response.json()
hits = response_data.get("hits", [])

if not hits:
break

for hit in hits:
yield hit

total_hits = response_data.get("totalHits", 0)
offset += len(hits)

if offset >= total_hits:
break
Loading