diff --git a/article/controller.py b/article/controller.py index dce6e385..ff5e8f6e 100644 --- a/article/controller.py +++ b/article/controller.py @@ -445,6 +445,9 @@ def __init__( limit=None, timeout=None, opac_url=None, + verify=True, + issn=None, + stop=None, force_update=None, ): self.user = user @@ -460,6 +463,9 @@ def __init__( self.limit = limit self.timeout = timeout self.opac_url = opac_url + self.verify = verify + self.issn = issn + self.stop = stop self.force_update = force_update self._iter_from_harvest_count = 0 @@ -492,7 +498,8 @@ def _iter_from_pid_provider(self): issn_list = [i for i in journal_issns if i] if journal_issns else None if journal_issns and not issn_list: continue - qs = PidProviderXML.get_queryset( + + kwargs = dict( issn_list=issn_list, from_pub_year=self.from_pub_year, until_pub_year=self.until_pub_year, @@ -500,10 +507,14 @@ def _iter_from_pid_provider(self): until_updated_date=self.until_date, proc_status_list=self.proc_status_list or [PPXML_STATUS_TODO, PPXML_STATUS_INVALID], ) - self._iter_from_pid_provider_count += qs.count() + qs = PidProviderXML.get_queryset(**kwargs) + total = qs.count() + + logging.info(f"PidProviderXML queryset total: {total}, kwargs: {kwargs}") + + self._iter_from_pid_provider_count += total for item in qs.iterator(): yield {"pp_xml_id": item.id} - logging.info(f"_iter_from_pid_provider: yielded {self._iter_from_pid_provider_count} items") def _iter_from_article(self): """ @@ -550,7 +561,7 @@ def _iter_from_harvest(self): """Itera documentos coletados via OPAC ou ArticleMeta.""" if Collection.objects.count() == 0: - Collection.load(self.user) + Collection.load(self.user, verify=self.verify) count = 0 for collection_acron in self.collection_acron_list or list(Collection.get_acronyms()): @@ -594,6 +605,9 @@ def _build_harvester(self, collection_acron): until_date=self.until_date, limit=self.limit, timeout=self.timeout, + verify=self.verify, + issn=self.issn, + stop=self.stop, ) if collection_acron == "scl": return OPACHarvester(self.opac_url or "www.scielo.br", collection_acron, **kwargs) diff --git a/article/sources/preprint.py b/article/sources/preprint.py index 318e3c2a..89ae48a8 100644 --- a/article/sources/preprint.py +++ b/article/sources/preprint.py @@ -19,10 +19,16 @@ class PreprintArticleSaveError(Exception): ... -def harvest_preprints(URL, user): +def harvest_preprints(URL, user, verify=True, timeout=30, stop=None): sickle = Sickle(URL) recs = sickle.ListRecords(metadataPrefix="oai_dc") + + count = 0 for rec in recs: + if stop and count >= stop: + logging.info(f"Reached stop limit of {stop} preprints") + break + article_info = get_info_article(rec) identifier = get_doi(article_info["identifier"]) doi = get_or_create_doi(doi=identifier, user=user) @@ -73,6 +79,8 @@ def harvest_preprints(URL, user): # acessíveis na área administrativa # para que o usuário fique sabendo quais itens falharam raise PreprintArticleSaveError(e) + + count += 1 def get_info_article(rec): diff --git a/article/sources/xmlsps.py b/article/sources/xmlsps.py index 2bcd41fd..6dd7b93b 100755 --- a/article/sources/xmlsps.py +++ b/article/sources/xmlsps.py @@ -232,7 +232,7 @@ def load_article(user, xml=None, file_path=None, v3=None, pp_xml=None): article.languages.add(main_lang) article.sections.set( - get_or_create_toc_sections(xmltree=xmltree, user=user, errors=errors, issue=article.issue) + get_or_create_toc_sections(xmltree=xmltree, user=user, errors=errors, issue=article.issue, verify=True) ) article.titles.set( create_or_update_titles( @@ -522,7 +522,7 @@ def get_or_create_fundings(xmltree, user, item, errors): return data -def get_or_create_toc_sections(xmltree, user, errors, issue): +def get_or_create_toc_sections(xmltree, user, errors, issue, verify=True): """ Extrai e cria seções do sumário (TOC) a partir do XML. @@ -538,7 +538,7 @@ def get_or_create_toc_sections(xmltree, user, errors, issue): try: if not issue.table_of_contents.exists(): for am_issue in AMIssue.objects.filter(new_record=issue): - load_issue_sections(user, issue, am_issue=am_issue) + load_issue_sections(user, issue, am_issue=am_issue, verify=verify) toc_sections = ArticleTocSections(xmltree=xmltree).sections for item in toc_sections: section_title = item.get("section") diff --git a/article/tasks.py b/article/tasks.py index d6aac89e..937d5169 100644 --- a/article/tasks.py +++ b/article/tasks.py @@ -51,7 +51,7 @@ def load_funding_data(user, file_path): @celery_app.task(bind=True, name=_('load_preprints')) -def load_preprint(self, user_id, oai_pmh_preprint_uri): +def load_preprint(self, user_id, oai_pmh_preprint_uri, verify=True, timeout=30, stop=None): """ Coleta e carrega preprints de um endpoint OAI-PMH específico. @@ -62,6 +62,9 @@ def load_preprint(self, user_id, oai_pmh_preprint_uri): self: Instância da tarefa Celery user_id (int): ID do usuário executando a tarefa (obrigatório) oai_pmh_preprint_uri (str): URI do endpoint OAI-PMH para coleta (obrigatório) + verify (bool): Verificação SSL para requisições HTTP + timeout (int): Timeout para requisições HTTP + stop (int, optional): Número máximo de preprints a processar Returns: None @@ -89,7 +92,7 @@ def load_preprint(self, user_id, oai_pmh_preprint_uri): """ user = User.objects.get(pk=user_id) ## fazer filtro para não coletar tudo sempre - harvest_preprints(oai_pmh_preprint_uri, user) + harvest_preprints(oai_pmh_preprint_uri, user, verify=verify, timeout=timeout, stop=stop) @celery_app.task(bind=True) @@ -643,6 +646,7 @@ def task_check_article_availability( article_id=None, collection_acron_list=None, timeout=None, + verify=True, is_activate=None, force_update=False, ): @@ -659,6 +663,7 @@ def task_check_article_availability( article_id (int, optional): ID do artigo a verificar (obrigatório) collection_acron_list (list, optional): Lista de acrônimos de coleções para filtro timeout (int, optional): Timeout em segundos para verificações HTTP + verify (bool): Verificação SSL para requisições HTTP is_activate (bool, optional): Se deve ativar artigo após verificação force_update (bool): Força nova verificação mesmo se recente @@ -719,6 +724,9 @@ def task_dispatch_articles( limit=None, timeout=None, opac_url=None, + verify=True, + issn=None, + stop=None, # --- ativa article_source --- article_source_status_list=None, ): @@ -747,6 +755,9 @@ def task_dispatch_articles( limit (int, optional): Limite máximo de artigos a processar timeout (int, optional): Timeout para operações HTTP opac_url (str, optional): URL base do OPAC para harvest + verify (bool): Verificação SSL para requisições HTTP + issn (str, optional): ISSN para filtrar por journal específico + stop (int, optional): Número máximo de itens a processar article_source_status_list (list, optional): Status do article_source para filtro Returns: @@ -795,12 +806,14 @@ def task_dispatch_articles( limit=limit, timeout=timeout, opac_url=opac_url, + verify=verify, + issn=issn, + stop=stop, force_update=force_update, ): if item_kwargs is None: skipped += 1 continue - logging.info(f"Dispatching article with kwargs: {item_kwargs}") task_process_article_pipeline.delay(**item_kwargs, **common_kwargs) dispatched += 1 @@ -937,7 +950,7 @@ def task_process_article_pipeline( am_article=am_article, auto_solve_pid_conflict=auto_solve_pid_conflict, ) - pp_xml_id = article_source.pid_provider_xml.id + pp_xml_id = article_source.pid_provider_xml.id if article_source.pid_provider_xml else None if article_source_id: article_source = ArticleSource.objects.get(id=article_source_id) @@ -946,7 +959,7 @@ def task_process_article_pipeline( force_update=force_update, auto_solve_pid_conflict=auto_solve_pid_conflict, ) - pp_xml_id = article_source.pid_provider_xml.id + pp_xml_id = article_source.pid_provider_xml.id if article_source.pid_provider_xml else None if not pp_xml_id: raise ValueError( diff --git a/bigbang/tasks.py b/bigbang/tasks.py index 410596a4..a4210c08 100644 --- a/bigbang/tasks.py +++ b/bigbang/tasks.py @@ -38,11 +38,12 @@ def task_start( self, user_id=None, username=None, + verify=True, ): try: user = _get_user(user_id, username) Language.load(user) - Collection.load(user) + Collection.load(user, verify=verify) Vocabulary.load(user) Standard.load(user) Subject.load(user) diff --git a/bigbang/tasks_scheduler.py b/bigbang/tasks_scheduler.py index 6022380f..2cb85de5 100644 --- a/bigbang/tasks_scheduler.py +++ b/bigbang/tasks_scheduler.py @@ -141,7 +141,10 @@ def schedule_task_dispatch_articles(username, enabled=False): limit=None, timeout=None, opac_url=None, + verify=True, + issn=None, article_source_status_list=None, + stop=None, ), description=_("Dispatch articles to processing pipeline"), priority=TASK_PRIORITY, @@ -271,7 +274,7 @@ def schedule_load_journal_from_article_meta(username, enabled=False): """ Agenda a tarefa de carga de dados de journals obtidos do AM e Core. - Configura verify=True para verificação SSL nas requisições HTTP. + Configura verify=True para verificação SSL e limit=100 para limitar coleta. """ schedule_task( task="journal.tasks.load_journal_from_article_meta", @@ -280,6 +283,9 @@ def schedule_load_journal_from_article_meta(username, enabled=False): load_data=False, collection_acron="scl", verify=True, + limit=1000, + issn=None, + stop=None, ), description=_("Carga de dados de journals obtidos do AM e Core"), priority=1, @@ -295,7 +301,7 @@ def schedule_collect_journals_from_am(username, enabled=False): """ Agenda a tarefa de coleta de journals da fonte AM. - Configura verify=True para verificação SSL nas requisições HTTP. + Configura verify=True para verificação SSL e limit=100 para limitar coleta. """ schedule_task( task="journal.tasks.load_journal_from_article_meta", @@ -304,6 +310,9 @@ def schedule_collect_journals_from_am(username, enabled=False): load_data=True, collection_acron="scl", verify=True, + limit=1000, + issn=None, + stop=None, ), description=_("Coleta de journals da fonte AM"), priority=1, @@ -416,7 +425,9 @@ def schedule_export_journal_to_articlemeta(username, enabled=False): def schedule_load_issue_from_articlemeta(username, enabled=False): """ - Agenda a tarefa de carregar issues do ArticleMeta + Agenda a tarefa de carregar issues do ArticleMeta. + + Configura verify=True para verificação SSL e limit=100 para limitar coleta. """ schedule_task( task="issue.tasks.load_issue_from_articlemeta", @@ -429,6 +440,10 @@ def schedule_load_issue_from_articlemeta(username, enabled=False): until_date=None, force_update=None, timeout=30, + verify=True, + limit=1000, + issn=None, + stop=None, ), description=_("Load issues from ArticleMeta"), priority=TASK_PRIORITY, diff --git a/collection/models.py b/collection/models.py index 73d8e5f9..2341dfff 100755 --- a/collection/models.py +++ b/collection/models.py @@ -223,12 +223,12 @@ def __str__(self): base_form_class = CoreAdminModelForm @classmethod - def load(cls, user, collections_data=None): + def load(cls, user, collections_data=None, verify=True): if not collections_data: collections_data = fetch_data( "https://articlemeta.scielo.org/api/v1/collection/identifiers/", json=True, - verify=False, + verify=verify, ) for collection_data in collections_data: diff --git a/collection/tasks.py b/collection/tasks.py index 264e9494..5456e0ab 100644 --- a/collection/tasks.py +++ b/collection/tasks.py @@ -8,9 +8,9 @@ @celery_app.task(bind=True) -def task_load_collections(self, user_id=None, username=None): +def task_load_collections(self, user_id=None, username=None, verify=True): if user_id: user = User.objects.get(pk=user_id) if username: user = User.objects.get(username=username) - Collection.load(user) + Collection.load(user, verify=verify) diff --git a/core/utils/harvesters.py b/core/utils/harvesters.py index cf2026f6..da213a0e 100644 --- a/core/utils/harvesters.py +++ b/core/utils/harvesters.py @@ -19,6 +19,9 @@ def __init__( until_date: Optional[str] = None, limit: Optional[int] = None, timeout: int = 30, + verify: bool = True, + issn: Optional[str] = None, + stop: Optional[int] = None, ): """ Inicializa o harvester do ArticleMeta. @@ -29,6 +32,9 @@ def __init__( until_date: Data final no formato YYYY-MM-DD limit: Número de documentos por página timeout: Timeout em segundos para requisições + verify: Verificação SSL para requisições HTTPS + issn: ISSN para filtrar documents de um journal específico + stop: Número máximo de documentos a retornar (opcional) """ self.record_type = record_type self.base_url = f"https://articlemeta.scielo.org/api/v1/{self.record_type}/identifiers" @@ -37,6 +43,9 @@ def __init__( self.until_date = until_date or datetime.utcnow().isoformat()[:10] self.limit = limit or 1000 self.timeout = timeout + self.verify = verify + self.issn = issn + self.stop = stop def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: """ @@ -55,6 +64,7 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: - metadata: Metadados adicionais do documento """ offset = 0 + count = 0 while True: try: @@ -66,6 +76,10 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: "from": self.from_date, "until": self.until_date, } + + # Adiciona ISSN se fornecido + if self.record_type in ("issue", "article") and self.issn: + params["issn"] = self.issn # Constrói URL url = f"{self.base_url}?{urlencode(params)}" @@ -73,7 +87,7 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: logging.info(f"Fetching AM documents from: {url}") # Faz requisição - response = fetch_data(url, json=True, timeout=self.timeout, verify=False) + response = fetch_data(url, json=True, timeout=self.timeout, verify=self.verify) # Processa objetos retornados objects = response.get("objects", []) @@ -85,6 +99,10 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: break for item in objects: + if self.stop and count >= self.stop: + logging.info(f"Reached stop limit of {self.stop} documents") + return + # Extrai dados básicos pid_v2 = item.get("code") if not pid_v2: @@ -120,6 +138,7 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: } yield document + count += 1 offset += self.limit @@ -147,6 +166,9 @@ def __init__( until_date: Optional[str] = None, limit: int = 100, timeout: int = 5, + verify: bool = True, + stop: Optional[int] = None, + issn: Optional[str] = None, ): """ Inicializa o harvester do OPAC. @@ -158,13 +180,20 @@ def __init__( until_date: Data final no formato YYYY-MM-DD limit: Número de documentos por página timeout: Timeout em segundos para requisições + verify: Verificação SSL para requisições HTTPS + issn: ISSN do periódico (opcional) """ + if not domain.startswith("http"): + domain = f"https://{domain}" self.domain = domain self.collection_acron = collection_acron self.from_date = from_date or "2000-01-01" self.until_date = until_date or datetime.utcnow().isoformat()[:10] self.limit = limit or 100 - self.timeout = timeout or 5 + self.timeout = timeout + self.verify = verify + self.stop = stop + self.issn = issn def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: """ @@ -185,6 +214,7 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: """ page = 1 total_pages = None + count = 0 while True: try: @@ -194,12 +224,13 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: f"end_date={self.until_date}&begin_date={self.from_date}" f"&limit={self.limit}&page={page}" ) + if self.issn: + url += f"&journal_id={self.issn}" logging.info(f"Fetching OPAC documents from: {url}") # Faz requisição - # verify=False é necessário para evitar erros de SSL em ambientes onde o certificado do OPAC não é reconhecido - response = fetch_data(url, json=True, timeout=self.timeout, verify=False) + response = fetch_data(url, json=True, timeout=self.timeout, verify=self.verify) # Define total de páginas na primeira iteração if total_pages is None: @@ -213,6 +244,10 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: break for pid_v3, item in documents.items(): + if self.stop and count >= self.stop: + logging.info(f"Reached stop limit of {self.stop} documents") + return + # Valida dados mínimos if not pid_v3 or not item.get("journal_acronym"): logging.warning(f"Invalid document data: {item}") @@ -254,6 +289,7 @@ def harvest_documents(self) -> Generator[Dict[str, Any], None, None]: } yield document + count += 1 # Verifica se deve continuar page += 1 diff --git a/issue/articlemeta/loader.py b/issue/articlemeta/loader.py index a8242e57..e377e48b 100644 --- a/issue/articlemeta/loader.py +++ b/issue/articlemeta/loader.py @@ -14,7 +14,7 @@ def harvest_issue_identifiers( - collection_acron, from_date, until_date, force_update, timeout=30 + collection_acron, from_date, until_date, force_update, timeout=30, verify=True, limit=None, issn=None, stop=None ): try: harvester = AMHarvester( @@ -22,8 +22,19 @@ def harvest_issue_identifiers( collection_acron=collection_acron, from_date=from_date, until_date=until_date, + limit=limit, + timeout=timeout, + verify=verify, + issn=issn, + stop=stop, ) - yield from harvester.harvest_documents() + + count = 0 + for document in harvester.harvest_documents(): + if stop and count >= stop: + break + yield document + count += 1 except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() @@ -40,7 +51,7 @@ def harvest_issue_identifiers( ) -def harvest_and_load_issue(user, url, code, collection_acron, processing_date, force_update, timeout=30): +def harvest_and_load_issue(user, url, code, collection_acron, processing_date, force_update, timeout=30, verify=True, limit=None, stop=None): if not url: raise ValueError("URL is required to harvest and load issue") @@ -50,7 +61,7 @@ def harvest_and_load_issue(user, url, code, collection_acron, processing_date, f if not collection_acron: raise ValueError("Collection acronym is required to harvest and load issue") - harvested_data = harvest_issue_data(url, timeout=timeout) + harvested_data = harvest_issue_data(url, timeout=timeout, verify=verify) am_issue = load_am_issue( user, Collection.objects.get(acron3=collection_acron), @@ -60,16 +71,17 @@ def harvest_and_load_issue(user, url, code, collection_acron, processing_date, f harvested_data, force_update=force_update, timeout=timeout, + verify=verify, ) if not am_issue: raise ValueError(f"Unable to create am_issue for {url}") - return create_issue_from_am_issue(user, am_issue) + return create_issue_from_am_issue(user, am_issue, verify) -def harvest_issue_data(url, timeout=30): +def harvest_issue_data(url, timeout=30, verify=True): try: item = {} - item["data"] = utils.fetch_data(url, json=True, timeout=timeout, verify=False) + item["data"] = utils.fetch_data(url, json=True, timeout=timeout, verify=verify) item["status"] = "pending" return item except Exception as e: @@ -96,6 +108,7 @@ def load_am_issue( force_update, do_harvesting=False, timeout=30, + verify=True, ): try: if not url: @@ -103,7 +116,7 @@ def load_am_issue( # Corrigido: não redefine harvested_data se já existe if do_harvesting or not harvested_data: - harvested_data = harvest_issue_data(url, timeout=timeout) + harvested_data = harvest_issue_data(url, timeout=timeout, verify=verify) return AMIssue.create_or_update( pid=pid, @@ -132,7 +145,7 @@ def load_am_issue( return None -def complete_am_issue(user, am_issue): +def complete_am_issue(user, am_issue, verify=True): try: detail = {} @@ -144,7 +157,7 @@ def complete_am_issue(user, am_issue): if not am_issue.url: raise ValueError("am_issue.url is required") - harvested_data = harvest_issue_data(am_issue.url) + harvested_data = harvest_issue_data(am_issue.url, verify=verify) detail["harvested_data"] = str(harvested_data) am_issue.status = harvested_data.get("status") am_issue.data = harvested_data.get("data") @@ -160,13 +173,14 @@ def complete_am_issue(user, am_issue): ) -def get_issue_data_from_am_issue(am_issue, user=None): +def get_issue_data_from_am_issue(am_issue, user=None, verify=True): """ Extrai e ajusta dados do AMIssue para criação de Issue. Args: am_issue: Instância de AMIssue user: Usuário para completar dados se necessário + verify: Verificar certificados SSL (default: True) Returns: Dict com dados ajustados para Issue ou None se falhar @@ -183,7 +197,7 @@ def get_issue_data_from_am_issue(am_issue, user=None): am_data = am_issue.data if not am_data: if user: - complete_am_issue(user, am_issue) + complete_am_issue(user, am_issue, verify=verify) am_data = am_issue.data if not am_data: @@ -218,7 +232,7 @@ def get_issue_data_from_am_issue(am_issue, user=None): return None -def create_issue_from_am_issue(user, am_issue): +def create_issue_from_am_issue(user, am_issue, verify=True): """ Cria Issue a partir de AMIssue. @@ -232,7 +246,7 @@ def create_issue_from_am_issue(user, am_issue): issue = None try: - issue_data = get_issue_data_from_am_issue(am_issue, user) + issue_data = get_issue_data_from_am_issue(am_issue, user, verify) if not issue_data: raise ValueError(f"Unable to extract issue data from {am_issue}") @@ -252,9 +266,9 @@ def create_issue_from_am_issue(user, am_issue): if issue: issue.add_am_issue(user, am_issue) - load_issue_sections(user, issue, am_issue, issue_data) - load_issue_titles(user, issue, am_issue, issue_data) - load_bibliographic_strips(user, issue, am_issue, issue_data) + load_issue_sections(user, issue, am_issue, issue_data, verify=verify) + load_issue_titles(user, issue, am_issue, issue_data, verify=verify) + load_bibliographic_strips(user, issue, am_issue, issue_data, verify=verify) return issue @@ -269,7 +283,7 @@ def create_issue_from_am_issue(user, am_issue): return issue -def _extract_field(field_name, field_value, issue_data, am_issue, user): +def _extract_field(field_name, field_value, issue_data, am_issue, user, verify=True): """ Extrai um campo específico dos dados do issue. @@ -299,7 +313,7 @@ def _extract_field(field_name, field_value, issue_data, am_issue, user): pass if am_issue: - data = get_issue_data_from_am_issue(am_issue, user) + data = get_issue_data_from_am_issue(am_issue, user, verify) if not data: raise ValueError(f"Unable to extract issue data from {am_issue}") return data.get(field_name) @@ -307,7 +321,7 @@ def _extract_field(field_name, field_value, issue_data, am_issue, user): raise ValueError(f"am_issue, issue_data or {field_name} is required") -def load_issue_sections(user, issue, am_issue=None, issue_data=None, sections_data=None, collection=None): +def load_issue_sections(user, issue, am_issue=None, issue_data=None, sections_data=None, collection=None, verify=True): """ Carrega sections para um Issue existente. @@ -326,7 +340,7 @@ def load_issue_sections(user, issue, am_issue=None, issue_data=None, sections_da if not issue: raise ValueError("Issue is required") - sections = _extract_field("sections_data", sections_data, issue_data, am_issue, user) + sections = _extract_field("sections_data", sections_data, issue_data, am_issue, user, verify) if not sections: if '"v49"' in str(issue_data): raise ValueError("No sections found, but issue_data contains 'v49'") @@ -355,7 +369,7 @@ def load_issue_sections(user, issue, am_issue=None, issue_data=None, sections_da return False -def load_issue_titles(user, issue, am_issue=None, issue_data=None, issue_titles=None): +def load_issue_titles(user, issue, am_issue=None, issue_data=None, issue_titles=None, verify=True): """ Carrega títulos para um Issue existente. @@ -373,7 +387,7 @@ def load_issue_titles(user, issue, am_issue=None, issue_data=None, issue_titles= if not issue: raise ValueError("Issue is required") - titles = _extract_field("issue_titles", issue_titles, issue_data, am_issue, user) + titles = _extract_field("issue_titles", issue_titles, issue_data, am_issue, user, verify) if not titles: logging.info("No issue titles found") return True @@ -395,7 +409,7 @@ def load_issue_titles(user, issue, am_issue=None, issue_data=None, issue_titles= return False -def load_bibliographic_strips(user, issue, am_issue=None, issue_data=None, bibliographic_strips=None): +def load_bibliographic_strips(user, issue, am_issue=None, issue_data=None, bibliographic_strips=None, verify=True): """ Carrega bibliographic strips para um Issue existente. @@ -413,7 +427,7 @@ def load_bibliographic_strips(user, issue, am_issue=None, issue_data=None, bibli if not issue: raise ValueError("Issue is required") - strips = _extract_field("bibliographic_strip_list", bibliographic_strips, issue_data, am_issue, user) + strips = _extract_field("bibliographic_strip_list", bibliographic_strips, issue_data, am_issue, user, verify) if not strips: logging.info("No bibliographic strips found") return True diff --git a/issue/tasks.py b/issue/tasks.py index bd1f2391..e2b9925a 100644 --- a/issue/tasks.py +++ b/issue/tasks.py @@ -28,6 +28,10 @@ def load_issue_from_articlemeta( until_date=None, force_update=None, timeout=30, + verify=True, + limit=None, + issn=None, + stop=None, ): """ Carrega issues do ArticleMeta para collections específicas. @@ -40,6 +44,10 @@ def load_issue_from_articlemeta( until_date: Data final (YYYY-MM-DD) force_update: Forçar atualização de registros existentes timeout: Timeout para requisições HTTP + verify: Verificação SSL para requisições HTTP + limit: Limite de itens a coletar + issn: ISSN para filtrar para um journal específico + stop: Número máximo de itens a processar (opcional) """ try: user = _get_user(request=self.request, user_id=user_id, username=username) @@ -53,7 +61,15 @@ def load_issue_from_articlemeta( # Coletar identificadores de issues for issue_identifier in harvest_issue_identifiers( - acron3, from_date, until_date, force_update, timeout + collection_acron=acron3, + from_date=from_date, + until_date=until_date, + force_update=force_update, + timeout=timeout, + verify=verify, + limit=limit, + issn=issn, + stop=stop, ): try: logger.info(f"Scheduling load for issue {issue_identifier.get('code')} in collection {acron3}") @@ -65,6 +81,9 @@ def load_issue_from_articlemeta( issue_identifier=issue_identifier, force_update=force_update, timeout=timeout, + verify=verify, + limit=limit, + stop=stop, ) except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() @@ -116,6 +135,9 @@ def task_harvest_and_load_issue( issue_identifier=None, force_update=None, timeout=30, + verify=True, + limit=None, + stop=None, ): """ Carrega um issue específico do ArticleMeta. @@ -127,6 +149,9 @@ def task_harvest_and_load_issue( issue_identifier: Dados do identificador do issue force_update: Forçar atualização de registros existentes timeout: Timeout para requisições HTTP + verify: Verificação SSL para requisições HTTP + limit: Limite de itens (mantido para consistência) + stop: Número máximo de itens a processar (opcional) """ try: user = _get_user(request=self.request, user_id=user_id, username=username) @@ -158,6 +183,9 @@ def task_harvest_and_load_issue( processing_date=processing_date, force_update=force_update, timeout=timeout, + verify=verify, + limit=limit, + stop=stop, ) if issue: @@ -372,6 +400,7 @@ def task_update_issues_from_amissue( supplement=None, force_update=False, only_without_new_record=False, + verify=True, ): """ Atualiza Issues a partir de registros AMIssue com filtros específicos. @@ -390,6 +419,7 @@ def task_update_issues_from_amissue( supplement: Suplemento para filtrar Issues force_update: Forçar atualização de Issues existentes only_without_new_record: Processar apenas AMIssue sem new_record associado + verify: Verificação SSL para requisições HTTP (default: True) Returns: dict: Resultado da operação com estatísticas @@ -465,7 +495,7 @@ def task_update_issues_from_amissue( stats["processed"] += 1 # Extrair dados do AMIssue para aplicar filtros adicionais - issue_data = get_issue_data_from_am_issue(am_issue, user) + issue_data = get_issue_data_from_am_issue(am_issue, user, verify) if not issue_data: stats["errors"] += 1 stats["error_details"].append({ @@ -502,9 +532,9 @@ def task_update_issues_from_amissue( logger.info(f"Updating existing Issue {existing_issue.id} from AMIssue {am_issue.id}") # Recarregar dados do AMIssue no Issue - load_issue_sections(user, existing_issue, am_issue, issue_data, collection=am_issue.collection) - load_issue_titles(user, existing_issue, am_issue, issue_data) - load_bibliographic_strips(user, existing_issue, am_issue, issue_data) + load_issue_sections(user, existing_issue, am_issue, issue_data, collection=am_issue.collection, verify=verify) + load_issue_titles(user, existing_issue, am_issue, issue_data, verify=verify) + load_bibliographic_strips(user, existing_issue, am_issue, issue_data, verify=verify) stats["updated"] += 1 @@ -512,7 +542,7 @@ def task_update_issues_from_amissue( # Criar novo Issue logger.info(f"Creating new Issue from AMIssue {am_issue.id}") - issue = create_issue_from_am_issue(user, am_issue) + issue = create_issue_from_am_issue(user, am_issue, verify) if issue: stats["created"] += 1 else: diff --git a/journal/sources/article_meta.py b/journal/sources/article_meta.py index 443ff145..aa5829ca 100644 --- a/journal/sources/article_meta.py +++ b/journal/sources/article_meta.py @@ -3,6 +3,7 @@ from collection.models import Collection from core.utils.rename_dictionary_keys import rename_dictionary_keys +from core.utils.harvesters import AMHarvester from core.utils.utils import fetch_data from journal.models import AMJournal from journal.sources import am_to_core @@ -15,19 +16,62 @@ def __init__(self, message): super().__init__(f"Failed to save SciELO Journal from article meta: {message}") -def _get_collection_journals(offset=None, limit=None, collection=None, verify=True): - limit = limit or 10 - offset = f"&offset={offset}" if offset else "" +def _get_collection_journals(offset=None, limit=None, collection=None, verify=True, issn=None, stop=None): + """Usa AMHarvester para coletar journals de forma consistente.""" if not collection: raise ValueError( "journal.sources.article_meta._get_collection_journals requires collection" ) - url = ( - f"https://articlemeta.scielo.org/api/v1/journal/identifiers/?collection={collection}&limit={limit}" - + offset + + # Usar AMHarvester para coleta consistente + harvester = AMHarvester( + record_type="journal", + collection_acron=collection, + limit=limit or 1000, + verify=verify, + issn=issn, + stop=stop, ) - data = fetch_data(url, json=True, timeout=30, verify=verify) - return data + + # Simular estrutura de resposta original para compatibilidade + all_objects = [] + total_collected = 0 + target_limit = limit or 1000 + + # Se offset for especificado, precisamos pular registros + records_to_skip = offset or 0 + current_skip = 0 + + for journal in harvester.harvest_documents(): + # Pular registros até chegar no offset + if current_skip < records_to_skip: + current_skip += 1 + continue + + # Adicionar registro convertido para formato esperado + journal_obj = { + "code": journal["code"], + "title": journal.get("title", ""), + "issn": journal.get("pid_v2", ""), # ISSN está no código + "collection": journal["collection_acron"], + "processing_date": journal.get("processing_date"), + } + all_objects.append(journal_obj) + total_collected += 1 + + # Parar quando atingir o limite + if total_collected >= target_limit: + break + + # Retornar estrutura compatível com código existente + return { + "objects": all_objects, + "meta": { + "total": len(all_objects) + records_to_skip, # Aproximação + "limit": target_limit, + "offset": records_to_skip, + } + } def _fetch_and_store_journal(collection, issn, obj_collection, user, verify=True): @@ -41,23 +85,33 @@ def _fetch_and_store_journal(collection, issn, obj_collection, user, verify=True ) -def process_journal_article_meta(collection, limit, user, journal_issn_list=None, verify=True): +def process_journal_article_meta(collection, limit, user, journal_issn_list=None, verify=True, issn=None, stop=None): obj_collection = Collection.objects.get(acron3=collection) if journal_issn_list: + processed_count = 0 for issn in journal_issn_list: + if stop and processed_count >= stop: + logging.info(f"Reached stop limit of {stop} journals") + break _fetch_and_store_journal(collection, issn, obj_collection, user, verify=verify) + processed_count += 1 return offset = 0 - data = _get_collection_journals(collection=collection, limit=limit, verify=verify) + processed_count = 0 + data = _get_collection_journals(collection=collection, limit=limit, verify=verify, issn=issn, stop=stop) total_limit = data["meta"]["total"] - while offset < total_limit: + while offset < total_limit and (not stop or processed_count < stop): for journal in data["objects"]: + if stop and processed_count >= stop: + logging.info(f"Reached stop limit of {stop} journals") + break _fetch_and_store_journal(collection, journal["code"], obj_collection, user, verify=verify) + processed_count += 1 - offset += limit or 10 + offset += limit or 1000 data = _get_collection_journals( - collection=collection, limit=limit, offset=offset, verify=verify + collection=collection, limit=limit, offset=offset, verify=verify, issn=issn, stop=stop ) @@ -195,7 +249,7 @@ def _register_journal_data(user, collection_acron3, journal_issn_list=None): detail={ "function": "journal.sources.article_meta._register_journal_data", "collection": collection_acron3, - "issn": journal_am.scielo_issn, # Mudança aqui: scielo_issn -> pid + "issn": journal_am.pid, "data_journal": journal_am.data, }, ) @@ -233,7 +287,7 @@ def _register_journal_data(user, collection_acron3, journal_issn_list=None): detail={ "function": "journal.sources.article_meta._register_journal_data", "collection": collection_acron3, - "issn": journal_am.scielo_issn, # Mudança aqui: scielo_issn -> pid + "issn": journal_am.pid, "data_journal": journal_am.data, }, ) diff --git a/journal/tasks.py b/journal/tasks.py index 70d38f0f..5c53aedd 100644 --- a/journal/tasks.py +++ b/journal/tasks.py @@ -50,6 +50,8 @@ def load_journal_from_article_meta( load_data=None, journal_issn_list=None, verify=True, + issn=None, + stop=None, ): try: if journal_issn_list and not collection_acron: @@ -73,6 +75,8 @@ def load_journal_from_article_meta( load_data=load_data, journal_issn_list=journal_issn_list, verify=verify, + issn=issn, + stop=stop, ) ) except Exception as e: @@ -96,6 +100,8 @@ def load_journal_from_article_meta_for_one_collection( load_data=None, journal_issn_list=None, verify=True, + issn=None, + stop=None, ): user = _get_user(self.request, username=username, user_id=user_id) try: @@ -119,6 +125,8 @@ def load_journal_from_article_meta_for_one_collection( user=user, journal_issn_list=journal_issn_list, verify=verify, + issn=issn, + stop=stop, ) _register_journal_data( user=user,