From 68fe12c366832dc3d6d5d8c009e9d0e1dfb3ab52 Mon Sep 17 00:00:00 2001 From: xubinrui <1462311339@qq.com> Date: Tue, 10 Feb 2026 19:39:00 +0800 Subject: [PATCH] add new websearch agent --- .../paper2any_agents/websearch_curator.py | 675 ++++++++++++++ .../websearch_initial_analyzer.py | 557 +++++++++++ .../paper2any_agents/websearch_planner.py | 222 +++++ .../paper2any_agents/websearch_researcher.py | 880 ++++++++++++++++++ dataflow_agent/state.py | 44 + .../workflow/wf_websearch_knowledge_store.py | 184 ++++ pyproject.toml | 5 + requirements-base.txt | 3 +- script/run_websearch_knowledge_store.py | 261 ++++++ tests/test_websearch_curator.py | 243 +++++ tests/test_websearch_initial_analyzer.py | 109 +++ tests/test_websearch_researcher.py | 112 +++ 12 files changed, 3294 insertions(+), 1 deletion(-) create mode 100644 dataflow_agent/agentroles/paper2any_agents/websearch_curator.py create mode 100644 dataflow_agent/agentroles/paper2any_agents/websearch_initial_analyzer.py create mode 100644 dataflow_agent/agentroles/paper2any_agents/websearch_planner.py create mode 100644 dataflow_agent/agentroles/paper2any_agents/websearch_researcher.py create mode 100644 dataflow_agent/workflow/wf_websearch_knowledge_store.py create mode 100644 script/run_websearch_knowledge_store.py create mode 100644 tests/test_websearch_curator.py create mode 100644 tests/test_websearch_initial_analyzer.py create mode 100644 tests/test_websearch_researcher.py diff --git a/dataflow_agent/agentroles/paper2any_agents/websearch_curator.py b/dataflow_agent/agentroles/paper2any_agents/websearch_curator.py new file mode 100644 index 00000000..2e0ff7e8 --- /dev/null +++ b/dataflow_agent/agentroles/paper2any_agents/websearch_curator.py @@ -0,0 +1,675 @@ +from __future__ import annotations + +import os +import json +import time +import httpx +import re +import asyncio +import base64 +from pathlib import Path +from datetime import datetime +from typing import Any, Dict, Optional, List, Tuple +from urllib.parse import urljoin, urlparse + +from bs4 import BeautifulSoup +from tqdm import tqdm +import fitz # PyMuPDF + +from dataflow_agent.state import MainState, WebsearchKnowledgeState +from dataflow_agent.toolkits.tool_manager import ToolManager +from dataflow_agent.logger import get_logger +from dataflow_agent.agentroles.cores.base_agent import BaseAgent +from dataflow_agent.agentroles.cores.registry import register + +# OpenAI 依赖 +from openai import AsyncOpenAI + +log = get_logger(__name__) + + +@register("websearch_curator") +class WebsearchChiefCuratorAgent(BaseAgent): + """ + Websearch Chief Curator Agent + 实现:针对每个子任务独立建立知识库 MD 文件,包含多模态资源占位。 + """ + + def __init__( + self, + tool_manager: Optional[ToolManager] = None, + llm_config: Optional[Dict] = None, + **kwargs + ): + super().__init__(tool_manager=tool_manager, **kwargs) + + self.llm_config = llm_config or { + "base_url": os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1"), + "api_key": os.getenv("DF_API_KEY", "sk-xxx"), + "model": os.getenv("THIRD_PARTY_MODEL", "gpt-4o"), + } + + self.mineruhtml_url = os.getenv("MINERUHTML_API_URL", "http://localhost:7771") + self.output_dir = Path("./raw_data_store") + self.output_dir.mkdir(parents=True, exist_ok=True) + + api_key = self.llm_config.get("api_key") + base_url = self.llm_config.get("base_url") + model = self.llm_config.get("model") + + http_client = httpx.AsyncClient(trust_env=False) + self.llm_client = AsyncOpenAI( + api_key=api_key, + base_url=base_url, + http_client=http_client + ) + self.llm_model = model + log.info(f"🔗 LLM Initialized for Independent Curation: {base_url}") + + @classmethod + def create(cls, tool_manager: Optional[ToolManager] = None, **kwargs) -> "WebsearchChiefCuratorAgent": + return cls(tool_manager=tool_manager, **kwargs) + + @property + def role_name(self) -> str: + return "websearch_curator" + + @property + def system_prompt_template_name(self) -> str: + return "system_prompt_for_websearch_curator" + + @property + def task_prompt_template_name(self) -> str: + return "task_prompt_for_websearch_curator" + + def _simplify_html(self, html: str) -> str: + """HTML 预处理,去除无关标签""" + try: + html = re.sub(r"", "", html, flags=re.IGNORECASE) + html = re.sub(r"", "", html, flags=re.IGNORECASE) + html = re.sub(r"", "", html) + html = re.sub(r"", "", html, flags=re.IGNORECASE) + except Exception as e: + log.warning(f"⚠️ 简化 HTML 出错: {e}") + return html + + async def _extract_content_with_mineruhtml(self, html_content: str) -> Optional[str]: + """调用 mineruhtml API 提取正文""" + try: + api_url = f"{self.mineruhtml_url}/extract" + payload = {"html": html_content} + async with httpx.AsyncClient(timeout=3000.0) as client: + response = await client.post(api_url, json=payload) + if response.status_code == 200: + return response.json().get("main_html", "") + return None + except Exception as e: + log.error(f"❌ mineruhtml API 出错: {e}") + return None + + def _is_data_uri(self, url: str) -> bool: + """检查 URL 是否是 Data URI""" + return url.strip().lower().startswith("data:") + + def _parse_data_uri(self, data_uri: str) -> Tuple[Optional[str], Optional[bytes]]: + """ + 解析 Data URI,返回 (mime_type, decoded_data) + Data URI 格式: data:[][;base64], + """ + try: + # 去除空白字符 + data_uri = data_uri.strip() + + # 检查是否是有效的 data URI + if not data_uri.lower().startswith("data:"): + return None, None + + # 移除 "data:" 前缀 + data_part = data_uri[5:] + + # 分离 metadata 和 data + if "," not in data_part: + return None, None + + metadata, encoded_data = data_part.split(",", 1) + + # 解析 mime type 和编码 + mime_type = "application/octet-stream" # 默认 + is_base64 = False + + if metadata: + parts = metadata.split(";") + if parts[0]: + mime_type = parts[0].lower() + if "base64" in [p.lower() for p in parts]: + is_base64 = True + + # 解码数据 + if is_base64: + # 处理可能的空格(有些 base64 字符串包含空格) + encoded_data = encoded_data.replace(" ", "+") + decoded_data = base64.b64decode(encoded_data) + else: + # URL 编码的数据 + from urllib.parse import unquote + decoded_data = unquote(encoded_data).encode("utf-8") + + return mime_type, decoded_data + + except Exception as e: + log.warning(f"⚠️ 解析 Data URI 失败: {e}") + return None, None + + def _get_extension_from_mime(self, mime_type: str) -> str: + """根据 MIME 类型获取文件扩展名""" + mime_to_ext = { + "image/png": ".png", + "image/jpeg": ".jpg", + "image/jpg": ".jpg", + "image/gif": ".gif", + "image/webp": ".webp", + "image/svg+xml": ".svg", + "image/bmp": ".bmp", + "image/x-icon": ".ico", + "video/mp4": ".mp4", + "video/webm": ".webm", + "video/ogg": ".ogv", + "audio/mpeg": ".mp3", + "audio/ogg": ".ogg", + "audio/wav": ".wav", + } + return mime_to_ext.get(mime_type, ".bin") + + async def _save_data_uri(self, data_uri: str, save_dir: Path) -> Optional[str]: + """保存 Data URI 中的数据到文件""" + mime_type, decoded_data = self._parse_data_uri(data_uri) + + if decoded_data is None: + log.warning(f"⚠️ 无法解析 Data URI") + return None + + # 过滤太小的图片(通常是1x1像素的追踪图片) + if len(decoded_data) < 100: # 小于 100 字节的数据太小,可能是追踪像素 + log.debug(f" 跳过过小的 Data URI({len(decoded_data)} 字节)") + return None + + try: + ext = self._get_extension_from_mime(mime_type) + filename = f"datauri_{int(time.time() * 1000)}{ext}" + save_path = save_dir / filename + + with open(save_path, "wb") as f: + f.write(decoded_data) + + log.debug(f" ✅ 保存 Data URI 资源: {filename} ({len(decoded_data)} 字节)") + return filename + + except Exception as e: + log.warning(f"⚠️ 保存 Data URI 失败: {e}") + return None + + async def _download_resource(self, url: str, save_dir: Path) -> Optional[str]: + """下载多媒体资源(支持 HTTP URL 和 Data URI)""" + # 检查是否是 Data URI + if self._is_data_uri(url): + return await self._save_data_uri(url, save_dir) + + if not url.startswith(("http://", "https://")): + log.warning(f"⚠️ 跳过无效 URL(非 HTTP/HTTPS 且非 Data URI): {url[:100]}...") + return None + + try: + async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: + response = await client.get(url) + if response.status_code == 200: + parsed = urlparse(url) + filename = os.path.basename(parsed.path) or f"res_{int(time.time())}" + filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.')) + save_path = save_dir / filename + with open(save_path, "wb") as f: + f.write(response.content) + return filename + return None + except Exception as e: + log.warning(f"⚠️ 下载失败 {url[:100]}...: {e}") + return None + + async def _process_media_and_placeholders(self, html_content: str, base_url: str, save_dir: Path) -> tuple[str, List[Dict[str, str]]]: + """替换 HTML 中的媒体为占位符""" + soup = BeautifulSoup(html_content, "html.parser") + resources = [] + + for img in soup.find_all("img"): + src = img.get("src") + if src: + # 检查是否是 Data URI,如果是则直接使用,否则与基础 URL 拼接 + if self._is_data_uri(src): + full_url = src # Data URI 本身就是完整的数据 + else: + full_url = urljoin(base_url, src) + + filename = await self._download_resource(full_url, save_dir) + if filename: + img.replace_with(f"[Image: {filename}]") + # 对于 Data URI,不记录完整内容(太长),只记录类型 + url_for_record = "data:..." if self._is_data_uri(src) else full_url + resources.append({"type": "image", "filename": filename, "url": url_for_record}) + + for video in soup.find_all(["video", "source"]): + src = video.get("src") + if src: + # 同样检查 Data URI + if self._is_data_uri(src): + full_url = src + else: + full_url = urljoin(base_url, src) + + filename = await self._download_resource(full_url, save_dir) + if filename: + video.replace_with(f"[Video: {filename}]") + url_for_record = "data:..." if self._is_data_uri(src) else full_url + resources.append({"type": "video", "filename": filename, "url": url_for_record}) + + return soup.get_text(separator="\n\n"), resources + + async def _rename_and_describe_images(self, resources: List[Dict[str, str]], resource_dir: Path, page_content: str) -> List[Dict[str, str]]: + """LLM 辅助重命名图片并生成语义化描述""" + image_resources = [r for r in resources if r["type"] == "image"] + if not image_resources: return resources + + try: + image_info = [f"图片{i}: {r['filename']}" for i, r in enumerate(image_resources)] + prompt = f"根据网页内容:{page_content[:1500]}\n为这些图片生成英文语义文件名(img_xxx.png)和中文描述。JSON格式输出。" + + response = await self.llm_client.chat.completions.create( + model=self.llm_model, + messages=[{"role": "user", "content": prompt}], + temperature=0.2 + ) + # 解析并重命名文件(逻辑简化,实际建议增加 JSON 解析鲁棒性) + # ... 此处逻辑同前 ... + return resources + except: + return resources + + def _extract_text_from_pdf(self, pdf_path: str) -> Optional[str]: + """ + 使用 PyMuPDF (fitz) 从 PDF 文件中提取纯文本。 + + Args: + pdf_path: PDF 文件的绝对路径 + + Returns: + 提取的纯文本内容,失败时返回 None + """ + try: + doc = fitz.open(pdf_path) + total_pages = len(doc) + text_parts = [] + for page_num in range(total_pages): + page = doc[page_num] + page_text = page.get_text("text") + if page_text.strip(): + text_parts.append(f"--- Page {page_num + 1} ---\n{page_text}") + doc.close() + + if not text_parts: + log.warning(f"⚠️ PDF 未提取到文本内容: {pdf_path}") + return None + + full_text = "\n\n".join(text_parts) + log.info(f" ✅ PDF 文本提取完成,共 {total_pages} 页,{len(full_text)} 字符") + return full_text + + except Exception as e: + log.error(f"❌ PDF 文本提取失败 ({pdf_path}): {e}") + return None + + def _split_into_chunks(self, source_data: List[Dict], max_chars: int = 80000) -> List[List[Dict]]: + """ + 将源数据分成多个 chunk,每个 chunk 的总字符数不超过 max_chars。 + 每个源作为一个整体,不会被拆分到不同 chunk 中。 + """ + chunks = [] + current_chunk = [] + current_size = 0 + + for item in source_data: + item_size = len(item.get("raw_knowledge", "")) + len(item.get("url", "")) + 50 # 50 for formatting + + # 如果单个 item 就超过限制,需要截断 + if item_size > max_chars: + # 如果当前 chunk 有内容,先保存 + if current_chunk: + chunks.append(current_chunk) + current_chunk = [] + current_size = 0 + + # 截断大型内容 + truncated_item = item.copy() + truncated_item["raw_knowledge"] = item["raw_knowledge"][:max_chars - 100] + "\n\n... [内容已截断] ..." + chunks.append([truncated_item]) + continue + + # 如果加入当前 item 会超过限制,开启新 chunk + if current_size + item_size > max_chars and current_chunk: + chunks.append(current_chunk) + current_chunk = [] + current_size = 0 + + current_chunk.append(item) + current_size += item_size + + # 保存最后一个 chunk + if current_chunk: + chunks.append(current_chunk) + + return chunks + + async def _extract_knowledge_from_chunk(self, route: str, chunk_data: List[Dict], chunk_idx: int, total_chunks: int) -> str: + """从单个 chunk 中提取与子任务相关的知识点""" + context_segments = [] + for item in chunk_data: + context_segments.append(f"### Source: {item['url']}\n{item['raw_knowledge']}") + + chunk_context = "\n\n".join(context_segments) + + prompt = f""" +你是一个首席馆长。请从以下参考资料中提取与【研究子任务】相关的知识点。 + +研究子任务:{route} + +参考资料(第 {chunk_idx + 1}/{total_chunks} 批): +{chunk_context} + +任务要求: +1. 仅提取与该子任务直接相关的内容,忽略无关信息。 +2. 识别并保留原始数据中的多媒体占位符 [Image: xxx] 或 [Video: xxx]。 +3. 知识表述要精炼、专业。 +4. 使用 Markdown 格式的要点列表。 +5. 如果本批资料中没有与子任务相关的内容,请回复"本批次无相关内容"。 + +请提取相关知识点: +""" + response = await self.llm_client.chat.completions.create( + model=self.llm_model, + messages=[{"role": "system", "content": "你是一个专业的技术馆长,擅长从大量资料中提取特定主题的知识。"}, + {"role": "user", "content": prompt}], + temperature=0.2 + ) + + return response.choices[0].message.content + + async def _merge_knowledge_points(self, route: str, knowledge_points: List[str]) -> str: + """将多个 chunk 提取的知识点合并成最终文档""" + # 过滤掉"无相关内容"的结果 + valid_points = [kp for kp in knowledge_points if "无相关内容" not in kp and len(kp.strip()) > 20] + + if not valid_points: + return f"# {route}\n\n暂无相关知识内容。" + + # 如果只有一个有效结果,直接返回 + if len(valid_points) == 1: + return f"# {route}\n\n{valid_points[0]}" + + combined = "\n\n---\n\n".join([f"### 知识点批次 {i+1}\n{kp}" for i, kp in enumerate(valid_points)]) + + prompt = f""" +你是一个首席馆长。请将以下多批次提取的知识点整合成一份完整、专业的知识文档。 + +研究子任务:{route} + +各批次提取的知识点: +{combined} + +任务要求: +1. 去除重复内容,合并相似观点。 +2. 按逻辑结构组织内容(如:概述、原理、应用、挑战等)。 +3. 保留所有多媒体占位符 [Image: xxx] 或 [Video: xxx]。 +4. 使用专业的 Markdown 格式输出。 +5. 确保内容完整、结构清晰。 + +请生成整合后的知识文档: +""" + response = await self.llm_client.chat.completions.create( + model=self.llm_model, + messages=[{"role": "system", "content": "你是一个专业的技术馆长,擅长知识整合与文档编写。"}, + {"role": "user", "content": prompt}], + temperature=0.3 + ) + + return response.choices[0].message.content + + async def _curate_single_knowledge_point(self, route: str, source_data: List[Dict], curated_dir: Path) -> str: + """为每一个子任务独立总结并生成 MD(分 chunk 处理)""" + log.info(f"🎯 正在针对任务独立建模: {route}") + + # 1. 将源数据分成多个 chunk + chunks = self._split_into_chunks(source_data, max_chars=80000) + log.info(f" 📦 数据已分成 {len(chunks)} 个批次进行处理") + + # 2. 对每个 chunk 提取知识点 + knowledge_points = [] + for idx, chunk in enumerate(chunks): + log.info(f" 🔄 处理批次 [{idx+1}/{len(chunks)}],包含 {len(chunk)} 个来源") + try: + kp = await self._extract_knowledge_from_chunk(route, chunk, idx, len(chunks)) + knowledge_points.append(kp) + log.info(f" ✅ 批次 [{idx+1}/{len(chunks)}] 完成,提取 {len(kp)} 字符") + except Exception as e: + log.warning(f" ⚠️ 批次 [{idx+1}/{len(chunks)}] 处理失败: {e}") + continue + + # 3. 合并所有知识点 + if len(knowledge_points) > 1: + log.info(f" 🔗 正在合并 {len(knowledge_points)} 个批次的知识点...") + content = await self._merge_knowledge_points(route, knowledge_points) + elif len(knowledge_points) == 1: + content = f"# {route}\n\n{knowledge_points[0]}" + else: + content = f"# {route}\n\n暂无相关知识内容。" + + # 4. 保存文件 + safe_name = re.sub(r'[\\/:*?"<>|]', "_", route).replace(" ", "_") + # 限制文件名长度 + if len(safe_name) > 100: + safe_name = safe_name[:100] + file_path = curated_dir / f"KNOWLEDGE_{safe_name}.md" + + with open(file_path, "w", encoding="utf-8") as f: + f.write(content) + + log.info(f" 📝 知识文档已保存,共 {len(content)} 字符") + return str(file_path) + + async def run(self, state: MainState, **kwargs) -> Dict[str, Any]: + """主执行逻辑:汇总数据 -> 网页清洗 -> 逐任务独立建模""" + log.info("=" * 60) + log.info("🚀 [WebsearchCurator] 开始知识独立建模流程") + log.info("=" * 60) + + if not isinstance(state, WebsearchKnowledgeState): + log.error("❌ 状态类型错误,期望 WebsearchKnowledgeState") + return {"status": "failed", "reason": "State Error"} + + raw_data = state.raw_data_store or [] + # 优先使用 original_research_routes(不会被 planner 清空的原始列表) + # 回退到 research_routes 以兼容旧版 + research_routes = state.original_research_routes or state.research_routes or [] + + log.info(f"📊 输入数据统计:") + log.info(f" - 原始数据源数量: {len(raw_data)}") + log.info(f" - 研究子任务数量: {len(research_routes)}") + if state.original_research_routes: + log.info(f" - 使用原始任务列表 (original_research_routes)") + else: + log.warning(f" - ⚠️ original_research_routes 为空,回退使用 research_routes") + + if not raw_data: + log.warning("⚠️ 没有原始数据,跳过处理") + return {"status": "skipped", "reason": "No raw data"} + + # 创建输出目录 + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + curated_dir = self.output_dir / f"{timestamp}_curated" + curated_dir.mkdir(exist_ok=True) + resource_dir = curated_dir / "resources" + resource_dir.mkdir(exist_ok=True) + # 创建正文内容文件夹 + extracted_contents_dir = curated_dir / "extracted_contents" + extracted_contents_dir.mkdir(exist_ok=True) + log.info(f"📁 创建输出目录: {curated_dir}") + log.info(f"📁 正文内容目录: {extracted_contents_dir}") + + # ==================== 阶段 1: 预处理 ==================== + log.info("-" * 60) + log.info("📥 [阶段 1/2] 开始预处理原始数据...") + log.info("-" * 60) + + processed_sources = [] + skipped_count = 0 + + pdf_count = 0 + html_count = 0 + + with tqdm(total=len(raw_data), desc="🔄 预处理数据", unit="条", ncols=100) as pbar: + for idx, item in enumerate(raw_data): + url = item.get("url", "未知URL") + item_type = item.get("type", "") + pdf_path = item.get("pdf_filepath") + dom_path = item.get("dom_filepath") + + pbar.set_postfix_str(f"处理: {url[:40]}..." if len(url) > 40 else f"处理: {url}") + + # ---- PDF 类型处理 ---- + if item_type == "pdf" or pdf_path: + if not pdf_path or not os.path.exists(pdf_path): + log.warning(f" ⚠️ [{idx+1}/{len(raw_data)}] 跳过 - PDF文件不存在: {pdf_path}") + skipped_count += 1 + pbar.update(1) + continue + + log.info(f" 📑 [{idx+1}/{len(raw_data)}] 处理PDF: {os.path.basename(pdf_path)}") + + clean_text = self._extract_text_from_pdf(pdf_path) + if not clean_text: + log.warning(f" ⚠️ PDF 文本提取为空,跳过") + skipped_count += 1 + pbar.update(1) + continue + + processed_sources.append({ + "url": url, + "raw_knowledge": clean_text, + "resources": [] # PDF 暂不提取内嵌媒体资源 + }) + pdf_count += 1 + pbar.update(1) + continue + + # ---- HTML 类型处理(原有逻辑)---- + if not dom_path or not os.path.exists(dom_path): + log.warning(f" ⚠️ [{idx+1}/{len(raw_data)}] 跳过 - DOM文件不存在: {dom_path}") + skipped_count += 1 + pbar.update(1) + continue + + log.info(f" 📄 [{idx+1}/{len(raw_data)}] 处理网页: {url}") + + with open(dom_path, "r", encoding="utf-8") as f: + html = f.read() + log.debug(f" - 原始 HTML 大小: {len(html)} 字符") + + # 简化 HTML + simplified_html = self._simplify_html(html) + log.debug(f" - 简化后 HTML 大小: {len(simplified_html)} 字符") + + # 从 DOM 提取媒体资源(在正文提取之前,确保不丢失任何媒体) + log.info(f" 🖼️ 从 DOM 提取媒体资源...") + _, resources = await self._process_media_and_placeholders(simplified_html, url, resource_dir) + log.info(f" ✅ 媒体处理完成,发现 {len(resources)} 个资源") + + # 提取正文 + log.info(f" 🔍 调用 mineruhtml 提取正文...") + main_html = await self._extract_content_with_mineruhtml(simplified_html) or html + log.info(f" ✅ 正文提取完成,大小: {len(main_html)} 字符") + + # 从正文提取纯文本内容 + soup = BeautifulSoup(main_html, "html.parser") + clean_text = soup.get_text(separator="\n\n") + + processed_sources.append({ + "url": url, + "raw_knowledge": clean_text, + "resources": resources + }) + html_count += 1 + + pbar.update(1) + + log.info(f"📊 预处理完成统计:") + log.info(f" - 成功处理: {len(processed_sources)} 条(HTML: {html_count}, PDF: {pdf_count})") + log.info(f" - 跳过数量: {skipped_count} 个") + total_resources = sum(len(s["resources"]) for s in processed_sources) + log.info(f" - 总媒体资源: {total_resources} 个") + + # ==================== 阶段 2: 知识建模 ==================== + log.info("-" * 60) + log.info("📝 [阶段 2/2] 开始独立知识建模...") + log.info("-" * 60) + + generated_md_files = [] + + with tqdm(total=len(research_routes), desc="🧠 知识建模", unit="任务", ncols=100) as pbar: + for idx, route in enumerate(research_routes): + route_display = route[:35] + "..." if len(route) > 35 else route + pbar.set_postfix_str(f"任务: {route_display}") + + log.info(f" 🎯 [{idx+1}/{len(research_routes)}] 建模任务: {route}") + + file_path = await self._curate_single_knowledge_point(route, processed_sources, curated_dir) + generated_md_files.append(file_path) + + log.info(f" ✅ 生成文件: {os.path.basename(file_path)}") + + pbar.update(1) + + # ==================== 完成汇总 ==================== + log.info("=" * 60) + log.info("🎉 [WebsearchCurator] 知识建模流程完成!") + log.info("=" * 60) + log.info(f"📊 最终统计:") + log.info(f" - 生成知识文档: {len(generated_md_files)} 个") + log.info(f" - 输出目录: {curated_dir}") + log.info(f" - 资源目录: {resource_dir}") + + # 更新状态 + state.knowledge_base_summary = f"已完成 {len(generated_md_files)} 个知识点独立建模。" + + result_payload = { + "status": "success", + "curated_directory": str(curated_dir), + "files_created": generated_md_files, + "tasks_processed": len(research_routes), + "sources_processed": len(processed_sources), + "sources_skipped": skipped_count, + "total_resources": total_resources + } + + self.update_state_result(state, result_payload, {}) + return result_payload + + def update_state_result(self, state: MainState, result: Dict[str, Any], pre_tool_results: Dict[str, Any]): + if hasattr(state, "agent_results"): + state.agent_results[self.role_name] = {"result": result, "pre_tool_results": pre_tool_results} + super().update_state_result(state, result, pre_tool_results) + + +def create_websearch_curator_agent( + tool_manager: Optional[ToolManager] = None, + **kwargs, +) -> WebsearchChiefCuratorAgent: + """ + 便捷创建函数。 + """ + return WebsearchChiefCuratorAgent.create(tool_manager=tool_manager, **kwargs) \ No newline at end of file diff --git a/dataflow_agent/agentroles/paper2any_agents/websearch_initial_analyzer.py b/dataflow_agent/agentroles/paper2any_agents/websearch_initial_analyzer.py new file mode 100644 index 00000000..0c164650 --- /dev/null +++ b/dataflow_agent/agentroles/paper2any_agents/websearch_initial_analyzer.py @@ -0,0 +1,557 @@ +""" +Websearch Initial Analyzer Agent +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +对应「Initial Analyzer / 初始分析师」角色: +- 访问 Input URLs(冷启动) +- 把原始正文 / 多模态引用写入 Raw Data Store +- 产出研究路线 Research Routes + +实现功能: +- 使用 DomFetcher 抓取网页 DOM 数据 +- 保存 DOM 到 raw_data_store 目录 +- 调用 mineruhtml API 提取网页正文 +- 使用 LLM 分析正文生成子任务列表 +""" + +from __future__ import annotations + +import os +import json +import time +import asyncio +import httpx +from pathlib import Path +from datetime import datetime +from typing import Any, Dict, Optional, List +from urllib.parse import urlparse + +from dataflow_agent.state import MainState, WebsearchKnowledgeState +from dataflow_agent.toolkits.tool_manager import ToolManager +from dataflow_agent.logger import get_logger +from dataflow_agent.agentroles.cores.base_agent import BaseAgent +from dataflow_agent.agentroles.cores.registry import register + +# 导入 DomFetcher(从 websearch_researcher 中复用) +from dataflow_agent.agentroles.paper2any_agents.websearch_researcher import DomFetcher + +# OpenAI 依赖 +from openai import AsyncOpenAI + +log = get_logger(__name__) + + +@register("websearch_initial_analyzer") +class WebsearchInitialAnalyzerAgent(BaseAgent): + """ + Websearch Initial Analyzer Agent(完整实现版) + """ + + def __init__( + self, + tool_manager: Optional[ToolManager] = None, + llm_config: Optional[Dict] = None, + **kwargs + ): + super().__init__(tool_manager=tool_manager, **kwargs) + + self.llm_config = llm_config or { + "base_url": os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1"), + "api_key": os.getenv("DF_API_KEY", "sk-xxx"), + "model": os.getenv("THIRD_PARTY_MODEL", "gpt-4o"), + } + + self.mineruhtml_url = os.getenv("MINERUHTML_API_URL", "http://localhost:7771") + self.output_dir = Path("./raw_data_store") + self.output_dir.mkdir(parents=True, exist_ok=True) + + headless_mode = os.getenv("HEADLESS", "true").lower() == "true" + self.dom_fetcher = DomFetcher(headless=headless_mode) + + api_key = self.llm_config.get("api_key") + base_url = self.llm_config.get("base_url") + model = self.llm_config.get("model") + + http_client = httpx.AsyncClient(trust_env=False) + self.llm_client = AsyncOpenAI( + api_key=api_key, + base_url=base_url, + http_client=http_client + ) + self.llm_model = model + log.info(f"🔗 LLM Initialized with Base URL: {base_url}") + + @classmethod + def create( + cls, + tool_manager: Optional[ToolManager] = None, + **kwargs, + ) -> "WebsearchInitialAnalyzerAgent": + return cls(tool_manager=tool_manager, **kwargs) + + @property + def role_name(self) -> str: + return "websearch_initial_analyzer" + + @property + def system_prompt_template_name(self) -> str: + return "system_prompt_for_websearch_initial_analyzer" + + @property + def task_prompt_template_name(self) -> str: + return "task_prompt_for_websearch_initial_analyzer" + + def get_task_prompt_params(self, pre_tool_results: Dict[str, Any]) -> Dict[str, Any]: + """ + 构造初始分析需要的 prompt 参数。 + """ + return { + "pre_tool_results": pre_tool_results, + } + + def get_default_pre_tool_results(self) -> Dict[str, Any]: + return {} + + def _extract_input_urls_from_state(self, state: MainState) -> List[str]: + """从 state 中提取 input_urls""" + if isinstance(state, WebsearchKnowledgeState): + # 优先从 state.input_urls 获取 + urls = state.input_urls or [] + if not urls: + # 从 request.input_urls 获取 + urls = getattr(state.request, "input_urls", []) or [] + return urls + else: + # 兼容其他 State 类型 + urls = getattr(state, "input_urls", []) or [] + if not urls: + request = getattr(state, "request", None) + if request: + urls = getattr(request, "input_urls", []) or [] + return urls + + async def _fetch_and_save_dom(self, url: str, session_dir: Path, url_index: int) -> Optional[str]: + """ + 抓取并保存网页 DOM + + Returns: + 保存的 HTML 文件路径,失败返回 None + """ + try: + log.info(f"🌐 [{url_index}] 正在抓取网页 DOM: {url}") + + # 抓取 HTML + html_content = await self.dom_fetcher.fetch_html(url, wait_time=3) + + if not html_content: + log.warning(f"⚠️ [{url_index}] DOM 抓取失败: {url}") + return None + + # 保存到 dom_snapshots 目录 + dom_dir = session_dir / "dom_snapshots" + dom_dir.mkdir(exist_ok=True) + + # 生成安全的文件名 + parsed = urlparse(url) + domain = parsed.netloc.replace('.', '_') + path = parsed.path.replace('/', '_').strip('_') or 'index' + if len(path) > 50: + path = path[:50] + + filename = f"url_{url_index:02d}_{domain}_{path}.html" + filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.')) + + filepath = dom_dir / filename + + with open(filepath, "w", encoding="utf-8") as f: + f.write(html_content) + + log.info(f"✅ [{url_index}] DOM 已保存至: {filepath}") + return str(filepath) + + except Exception as e: + log.error(f"❌ [{url_index}] 保存 DOM 时出错: {e}") + import traceback + log.error(traceback.format_exc()) + return None + + async def _extract_content_with_mineruhtml(self, html_content: str, url: str) -> Optional[str]: + """ + 调用 mineruhtml API 提取网页正文 + + API 规范: + - URL: http://localhost:7771/extract (这里用 self.mineruhtml_url 做前缀) + - Method: POST + - Content-Type: application/json + - Request JSON: { "html": "<完整 HTML 字符串>" } (字段名为 html) + - Response JSON: { "main_html": "<提取出的正文 HTML>" } + """ + try: + log.info("📄 正在调用 mineruhtml API 提取正文...") + + api_url = f"{self.mineruhtml_url}/extract" + payload = { + "html": html_content, + } + + # mineruhtml 端推理可能较慢,这里将超时时间放宽到 3000 秒(5 分钟) + async with httpx.AsyncClient(timeout=3000.0) as client: + response = await client.post( + api_url, + json=payload, + headers={"Content-Type": "application/json"}, + ) + + if response.status_code != 200: + log.error(f"❌ mineruhtml API 返回错误: {response.status_code}") + try: + log.error(f"错误详情: {response.text[:500]}") + except Exception: + pass + return None + + try: + result = response.json() + except json.JSONDecodeError: + log.error("❌ mineruhtml 返回的不是合法 JSON") + return None + + extracted_content = result.get("main_html", "") + + if extracted_content and extracted_content.strip(): + log.info(f"✅ 正文提取成功,长度: {len(extracted_content)} 字符") + return extracted_content + + log.warning("⚠️ mineruhtml 返回的 main_html 为空") + return None + + except Exception as e: + log.error(f"❌ 调用 mineruhtml API 时出错: {e}") + import traceback + log.error(traceback.format_exc()) + return None + + async def _analyze_content_and_generate_tasks( + self, + url: str, + extracted_content: str + ) -> List[str]: + """ + 使用 LLM 分析正文内容,生成子任务列表 + + Args: + url: 原始 URL + extracted_content: 提取的正文内容 + + Returns: + 子任务列表 + """ + try: + log.info(f"🤔 正在使用 LLM 分析内容并生成子任务...") + + # 截断内容(避免超出 token 限制) + max_content_length = 8000 + if len(extracted_content) > max_content_length: + extracted_content = extracted_content[:max_content_length] + "\n\n[... 内容已截断 ...]" + + prompt = f""" +你是一个专业的知识研究规划师。请分析以下网页内容,识别其中的核心知识点,并生成**研究型子任务**列表。 + +【核心目标】子任务将由 WebAgent 自动执行(搜索、访问网页、阅读内容),后续由知识提取 Agent 从研究结果中提取结构化知识。 +因此,每个子任务必须是一个**可执行的调研指令**,聚焦于某个知识点的深入调研。 + +## 子任务类型和格式 + +1. **[概念调研]** - 调研某个核心概念/术语的定义、原理和机制 + 格式:`[概念调研] 调研<概念名称>的<具体调研方向>` + +2. **[对比调研]** - 调研多种方法/技术/方案的异同与优劣 + 格式:`[对比调研] 对比<方法A>与<方法B>在<维度>上的异同` + +3. **[溯源调研]** - 调研某个方法/理论的来源、发展脉络和关键文献 + 格式:`[溯源调研] 调研<主题>的<具体溯源方向>` + +4. **[技术调研]** - 调研某个具体技术/实现的详细工作机制 + 格式:`[技术调研] 调研<技术名称>的<具体技术方向>` + +5. **[应用调研]** - 调研某技术/方法在特定领域的实际应用和效果 + 格式:`[应用调研] 调研<技术/方法>在<领域>中的应用方式和效果` + +## Few-shot 示例 + +**示例正文:** +扩散模型介绍:2020 年提出的 DDPM(Denoising Diffusion Probabilistic Model)开启了扩散模型的热潮。扩散模型通过从噪声中采样来生成目标数据,包含前向过程(逐步加噪)和反向过程(通过 U-Net 逐步去噪还原图片)。代码实现基于 MindSpore,包含正弦位置编码、Attention 与 Residual Block、GaussianDiffusion 以及引入 EMA 优化的 Trainer。参考论文《Denoising Diffusion Probabilistic Models》,代码仓库 GitHub: lvyufeng/denoising-diffusion-mindspore。 + +**示例输出:** +{{ + "tasks": [ + "[概念调研] 调研 DDPM 扩散模型前向加噪和反向去噪的数学原理及噪声调度策略", + "[技术调研] 调研 U-Net 在扩散模型中作为去噪网络的架构设计和跳跃连接机制", + "[对比调研] 对比扩散模型与 GAN、VAE 在生成质量、训练稳定性和推理速度上的优劣", + "[溯源调研] 调研扩散模型从 Sohl-Dickstein 非平衡热力学到 DDPM 再到 Score-based models 的发展脉络", + "[技术调研] 调研 EMA 指数移动平均在扩散模型训练中的优化机制和衰减率选择", + "[概念调研] 调研 DDIM 加速采样将马尔可夫过程转化为非马尔可夫过程的原理" + ], + "reasoning": "从知识提取角度分析:1) DDPM 数学原理是核心基础知识;2) U-Net 架构是关键技术组件;3) 与 GAN/VAE 对比能建立生成模型知识体系;4) 溯源研究构建发展脉络;5) EMA 是重要训练优化技术;6) DDIM 是采样加速的关键方法。" +}} + +--- + +## 当前任务 + +网页 URL: {url} + +网页正文内容: +{extracted_content} + +请根据网页内容,生成 3-8 个**研究型子任务**。设计原则: + +1. **可执行性**:每个任务是 WebAgent 可直接执行的调研指令(去搜索、阅读、收集信息) +2. **面向知识提取**:调研方向必须能产出可结构化的知识(定义、原理、对比、流程等) +3. **深度聚焦**:每个任务聚焦一个明确的知识点,深入调研而非泛泛浏览 +4. **知识覆盖**:任务之间应覆盖不同知识维度(概念、对比、溯源、技术、应用) +5. **严格遵循格式**:使用 [概念调研]/[对比调研]/[溯源调研]/[技术调研]/[应用调研] 标签 + +输出格式:请返回一个 JSON 对象,格式如下: +{{ + "tasks": [ + "[概念调研] 调研...", + "[对比调研] 对比...", + "[溯源调研] 调研...", + "[技术调研] 调研...", + "[应用调研] 调研...", + ... + ], + "reasoning": "从知识提取角度说明为什么选择这些调研方向" +}} + +只返回 JSON,不要其他内容。 +""" + + response = await self.llm_client.chat.completions.create( + model=self.llm_model, + messages=[ + {"role": "system", "content": "你是一个专业的知识研究规划师,擅长从内容中识别核心知识点并设计面向知识提取的研究型任务。你总是返回有效的 JSON 格式。"}, + {"role": "user", "content": prompt} + ], + response_format={"type": "json_object"}, + temperature=0.3 + ) + + result_text = response.choices[0].message.content + result_json = json.loads(result_text) + + tasks = result_json.get("tasks", []) + reasoning = result_json.get("reasoning", "") + + log.info(f"✅ 生成了 {len(tasks)} 个子任务") + if reasoning: + log.info(f"📝 生成理由: {reasoning}") + + return tasks + + except Exception as e: + log.error(f"❌ LLM 分析时出错: {e}") + import traceback + log.error(traceback.format_exc()) + # 返回默认任务 + return [f"深入研究 {url} 的相关内容"] + + async def run(self, state: MainState, **kwargs) -> Dict[str, Any]: + """ + 执行初始分析任务 + + 流程: + 1. 从 state 获取 input_urls + 2. 对每个 URL: + a. 抓取 DOM 并保存 + b. 调用 mineruhtml API 提取正文 + c. 保存正文内容 + 3. 使用 LLM 分析所有正文,生成子任务列表 + 4. 更新 state 的 research_routes 和 raw_data_store + """ + log.info(f"[WebsearchInitialAnalyzer] 开始执行初始分析任务") + + # 1. 提取 input_urls + input_urls = self._extract_input_urls_from_state(state) + + if not input_urls: + log.warning("[WebsearchInitialAnalyzer] 未找到 input_urls,跳过处理") + result_payload = { + "status": "skipped", + "reason": "No input URLs found", + "research_routes": [], + "raw_data_store": [] + } + self.update_state_result(state, result_payload, self.get_default_pre_tool_results()) + return result_payload + + log.info(f"[WebsearchInitialAnalyzer] 找到 {len(input_urls)} 个 URL: {input_urls}") + + # 2. 准备存储目录 + timestamp = int(time.time()) + session_dir = self.output_dir / f"{timestamp}_initial_analysis" + session_dir.mkdir(exist_ok=True) + + # 3. 处理每个 URL + all_extracted_contents = [] + raw_data_records = [] + + for idx, url in enumerate(input_urls, 1): + log.info(f"\n{'='*60}") + log.info(f"处理 URL {idx}/{len(input_urls)}: {url}") + log.info(f"{'='*60}") + + # 3.1 抓取并保存 DOM + dom_filepath = await self._fetch_and_save_dom(url, session_dir, idx) + + # 3.2 读取 HTML 内容(如果保存成功) + html_content = None + if dom_filepath: + try: + with open(dom_filepath, "r", encoding="utf-8") as f: + html_content = f.read() + except Exception as e: + log.warning(f"⚠️ 读取 DOM 文件失败: {e}") + + # 如果 DOM 抓取失败,尝试直接抓取 + if not html_content: + log.info(f"🔄 重新抓取 HTML 内容...") + html_content = await self.dom_fetcher.fetch_html(url, wait_time=3) + + if not html_content: + log.error(f"❌ 无法获取 HTML 内容,跳过该 URL") + continue + + # 3.3 调用 mineruhtml API 提取正文 + extracted_content = await self._extract_content_with_mineruhtml(html_content, url) + + if not extracted_content: + log.warning(f"⚠️ 正文提取失败,使用原始 HTML 的前 5000 字符作为正文") + extracted_content = html_content[:5000] + + # 3.4 保存提取的正文 + content_filepath = session_dir / f"extracted_content_url_{idx:02d}.md" + with open(content_filepath, "w", encoding="utf-8") as f: + f.write(f"# URL: {url}\n\n") + f.write(f"提取时间: {datetime.now().isoformat()}\n\n") + f.write("---\n\n") + f.write(extracted_content) + + log.info(f"✅ 正文已保存至: {content_filepath}") + + # 3.5 记录到 raw_data_store + record = { + "url": url, + "timestamp": datetime.now().isoformat(), + "dom_filepath": dom_filepath, + "content_filepath": str(content_filepath), + "extracted_content_length": len(extracted_content), + "extracted_content_preview": extracted_content[:200] + "..." if len(extracted_content) > 200 else extracted_content + } + raw_data_records.append(record) + all_extracted_contents.append({ + "url": url, + "content": extracted_content + }) + + # 4. 使用 LLM 分析所有内容,生成子任务列表 + research_routes = [] + + if all_extracted_contents: + log.info(f"\n{'='*60}") + log.info(f"开始分析内容并生成研究子任务...") + log.info(f"{'='*60}") + + # 合并所有内容 + combined_content = "\n\n".join([ + f"## URL {i+1}: {item['url']}\n\n{item['content'][:2000]}" # 每个 URL 最多 2000 字符 + for i, item in enumerate(all_extracted_contents) + ]) + + # 生成子任务 + research_routes = await self._analyze_content_and_generate_tasks( + url=", ".join([item["url"] for item in all_extracted_contents]), + extracted_content=combined_content + ) + + log.info(f"✅ 生成了 {len(research_routes)} 个研究子任务:") + for i, task in enumerate(research_routes, 1): + log.info(f" {i}. {task}") + + # 5. 保存汇总信息 + summary_filepath = session_dir / "analysis_summary.json" + summary = { + "timestamp": datetime.now().isoformat(), + "input_urls": input_urls, + "research_routes": research_routes, + "raw_data_records": raw_data_records, + "session_dir": str(session_dir) + } + with open(summary_filepath, "w", encoding="utf-8") as f: + json.dump(summary, f, indent=2, ensure_ascii=False) + + log.info(f"✅ 分析汇总已保存至: {summary_filepath}") + + # 6. 更新 state(如果是 WebsearchKnowledgeState) + if isinstance(state, WebsearchKnowledgeState): + # 更新 research_routes(直接赋值,因为这是初始分析) + state.research_routes = research_routes + + # 同时保存一份原始任务列表(不会被 planner 修改),供 chief_curator 使用 + state.original_research_routes = research_routes.copy() + + # 更新 raw_data_store(追加模式,保留已有数据) + if not hasattr(state, 'raw_data_store') or state.raw_data_store is None: + state.raw_data_store = [] + # 追加新记录(避免重复) + existing_urls = {r.get("url") for r in state.raw_data_store if isinstance(r, dict)} + for record in raw_data_records: + if record.get("url") not in existing_urls: + state.raw_data_store.append(record) + + log.info(f"✅ 已更新 state.research_routes ({len(research_routes)} 个任务)") + log.info(f"✅ 已保存 state.original_research_routes ({len(state.original_research_routes)} 个原始任务)") + log.info(f"✅ 已更新 state.raw_data_store (新增 {len(raw_data_records)} 条记录,总计 {len(state.raw_data_store)} 条)") + + # 7. 返回结果 + result_payload = { + "status": "success", + "session_dir": str(session_dir), + "input_urls": input_urls, + "research_routes": research_routes, + "raw_data_store": raw_data_records, + "summary_filepath": str(summary_filepath) + } + + self.update_state_result(state, result_payload, self.get_default_pre_tool_results()) + return result_payload + + def update_state_result( + self, + state: MainState, + result: Dict[str, Any], + pre_tool_results: Dict[str, Any], + ): + """ + 把初始分析结果写回 state.agent_results。 + """ + if getattr(state, "agent_results", None) is not None: + state.agent_results[self.role_name] = { + "result": result, + "pre_tool_results": pre_tool_results, + } + log.debug("[WebsearchInitialAnalyzerAgent] result written to state.agent_results.") + super().update_state_result(state, result, pre_tool_results) + + +def create_websearch_initial_analyzer_agent( + tool_manager: Optional[ToolManager] = None, + **kwargs, +) -> WebsearchInitialAnalyzerAgent: + return WebsearchInitialAnalyzerAgent.create(tool_manager=tool_manager, **kwargs) + + + + + + diff --git a/dataflow_agent/agentroles/paper2any_agents/websearch_planner.py b/dataflow_agent/agentroles/paper2any_agents/websearch_planner.py new file mode 100644 index 00000000..7a858620 --- /dev/null +++ b/dataflow_agent/agentroles/paper2any_agents/websearch_planner.py @@ -0,0 +1,222 @@ +""" +Websearch Planner Agent +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +负责 Web 搜索知识入库流程中的「Planner / 全知指挥官」角色。 + +职责: +1. 监控研究进度:检查 Web Researcher 的产出,并将其汇总到 Raw Data Store。 +2. 维护任务队列:从 research_routes 中弹出已完成的任务,并设定下一个 current_task。 +3. 动态规划(可选):后续可扩展利用 LLM 分析当前进度,动态增删任务。 +""" + +from __future__ import annotations + +import os +import json +from pathlib import Path +from datetime import datetime +from typing import Any, Dict, Optional, List + +from dataflow_agent.state import MainState, WebsearchKnowledgeState +from dataflow_agent.toolkits.tool_manager import ToolManager +from dataflow_agent.logger import get_logger +from dataflow_agent.agentroles.cores.base_agent import BaseAgent +from dataflow_agent.agentroles.cores.registry import register + +# OpenAI 依赖 +from openai import AsyncOpenAI +import httpx + +log = get_logger(__name__) + + +@register("websearch_planner") +class WebsearchPlannerAgent(BaseAgent): + """ + Websearch Planner Agent(完整实现版) + """ + + def __init__( + self, + tool_manager: Optional[ToolManager] = None, + llm_config: Optional[Dict] = None, + **kwargs + ): + super().__init__(tool_manager=tool_manager, **kwargs) + + self.llm_config = llm_config or { + "base_url": os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1"), + "api_key": os.getenv("DF_API_KEY", "sk-xxx"), + "model": os.getenv("THIRD_PARTY_MODEL", "gpt-4o"), + } + + api_key = self.llm_config.get("api_key") + base_url = self.llm_config.get("base_url") + model = self.llm_config.get("model") + + http_client = httpx.AsyncClient(trust_env=False) + self.llm_client = AsyncOpenAI( + api_key=api_key, + base_url=base_url, + http_client=http_client + ) + self.llm_model = model + log.info(f"🔗 LLM Initialized with Base URL: {base_url}") + + @classmethod + def create( + cls, + tool_manager: Optional[ToolManager] = None, + **kwargs, + ) -> "WebsearchPlannerAgent": + return cls(tool_manager=tool_manager, **kwargs) + + @property + def role_name(self) -> str: + return "websearch_planner" + + @property + def system_prompt_template_name(self) -> str: + return "system_prompt_for_websearch_planner" + + @property + def task_prompt_template_name(self) -> str: + return "task_prompt_for_websearch_planner" + + def get_task_prompt_params(self, pre_tool_results: Dict[str, Any]) -> Dict[str, Any]: + """ + 构造 Planner 所需的 prompt 参数。 + """ + return { + "pre_tool_results": pre_tool_results, + } + + async def run(self, state: MainState, **kwargs) -> Dict[str, Any]: + """ + 执行规划任务 + + 1. 检查 researcher 结果,同步数据到 raw_data_store + 2. 维护 research_routes 队列 + 3. 设定 state.current_task 供 researcher 使用 + """ + log.info(f"[{self.role_name}] 开始执行规划逻辑...") + + if not isinstance(state, WebsearchKnowledgeState): + log.error("State 类型错误,必须为 WebsearchKnowledgeState") + return {"status": "failed", "reason": "Invalid state type"} + + # 1. 检查 Web Researcher 的执行产出 + researcher_results = state.agent_results.get("websearch_researcher", {}) + + # 如果有成功的 researcher 结果且未被 Planner 处理过 + if (researcher_results and + researcher_results.get("result", {}).get("status") == "success" and + not researcher_results.get("planner_processed", False)): + + log.info("发现未处理的 Web Researcher 产出,正在同步到 Raw Data Store...") + res_data = researcher_results["result"] + storage_path_str = res_data.get("storage_path") + + if storage_path_str: + storage_path = Path(storage_path_str) + dom_snapshots_dir = storage_path / "dom_snapshots" + + # --- 同步 DOM 快照(HTML)--- + if dom_snapshots_dir.exists(): + new_records_count = 0 + for dom_file in dom_snapshots_dir.glob("*.html"): + # 检查是否已存在(根据路径判断) + if any(r.get("dom_filepath") == str(dom_file) for r in state.raw_data_store): + continue + + record = { + "url": "extracted_from_research", # 理想情况下 researcher 应该返回具体 URL + "timestamp": datetime.now().isoformat(), + "dom_filepath": str(dom_file), + "source": "websearch_researcher", + "task": getattr(state, "current_task", "Unknown Task") + } + state.raw_data_store.append(record) + new_records_count += 1 + log.info(f"成功同步 {new_records_count} 条 DOM 快照记录到 Raw Data Store") + + # --- 同步 final_resources 中的 PDF 文件 --- + final_resources_dir = storage_path / "final_resources" + if final_resources_dir.exists(): + pdf_records_count = 0 + for pdf_file in final_resources_dir.glob("*.pdf"): + # 检查是否已存在(根据 pdf_filepath 判断) + if any(r.get("pdf_filepath") == str(pdf_file) for r in state.raw_data_store): + continue + + record = { + "url": f"pdf://{pdf_file.name}", + "timestamp": datetime.now().isoformat(), + "pdf_filepath": str(pdf_file), + "source": "websearch_researcher", + "task": getattr(state, "current_task", "Unknown Task"), + "type": "pdf" + } + state.raw_data_store.append(record) + pdf_records_count += 1 + if pdf_records_count > 0: + log.info(f"成功同步 {pdf_records_count} 条 PDF 记录到 Raw Data Store") + + # 任务队列维护:既然 Researcher 成功完成了当前任务,就将其从队列中弹出 + if state.research_routes: + done_task = state.research_routes.pop(0) + log.info(f"已完成并移除研究子任务: {done_task}") + + # 标记该 researcher 结果已处理 + researcher_results["planner_processed"] = True + + # 2. 设定下一个任务 + if state.research_routes: + # 设定下一个要执行的任务给 researcher 节点看 + state["current_task"] = state.research_routes[0] + log.info(f"设定下一个 current_task: {state['current_task']}") + else: + # 如果没有待执行任务,清理 current_task + if hasattr(state, "current_task"): + state["current_task"] = None + log.info("研究任务队列已空") + + # 3. 构建返回结果 + result_payload = { + "status": "success", + "remaining_tasks_count": len(state.research_routes), + "raw_data_store_count": len(state.raw_data_store), + "next_task": state.get("current_task") + } + + self.update_state_result(state, result_payload, {}) + return result_payload + + def update_state_result( + self, + state: MainState, + result: Dict[str, Any], + pre_tool_results: Dict[str, Any], + ): + """ + 将结果写回到 state.agent_results。 + """ + if getattr(state, "agent_results", None) is not None: + state.agent_results[self.role_name] = { + "result": result, + "pre_tool_results": pre_tool_results, + } + log.debug(f"[{self.role_name}] result written to state.agent_results.") + super().update_state_result(state, result, pre_tool_results) + + +def create_websearch_planner_agent( + tool_manager: Optional[ToolManager] = None, + **kwargs, +) -> WebsearchPlannerAgent: + """ + 便捷创建函数。 + """ + return WebsearchPlannerAgent.create(tool_manager=tool_manager, **kwargs) + + diff --git a/dataflow_agent/agentroles/paper2any_agents/websearch_researcher.py b/dataflow_agent/agentroles/paper2any_agents/websearch_researcher.py new file mode 100644 index 00000000..5c17dba9 --- /dev/null +++ b/dataflow_agent/agentroles/paper2any_agents/websearch_researcher.py @@ -0,0 +1,880 @@ +from __future__ import annotations + +import os +import sys +import json +import time +import asyncio +import httpx +from pathlib import Path +from datetime import datetime +from typing import Any, Dict, Optional, List +from urllib.parse import urlparse, unquote + +# Dataflow Agent 依赖 +from dataflow_agent.state import MainState +from dataflow_agent.toolkits.tool_manager import ToolManager +from dataflow_agent.logger import get_logger +from dataflow_agent.agentroles.cores.base_agent import BaseAgent +from dataflow_agent.agentroles.cores.registry import register + +from playwright.async_api import async_playwright, Page, BrowserContext, Browser, Playwright + +from playwright_stealth import Stealth + + +from openai import AsyncOpenAI + +log = get_logger(__name__) + + +class DomFetcher: + """网页DOM数据抓取类""" + + def __init__(self, headless: bool = True): + self.headless = headless + self.proxy_config = self._get_proxy_config() + + def _get_proxy_config(self) -> Optional[Dict[str, str]]: + """从环境变量读取代理配置""" + http_proxy = os.getenv("HTTP_PROXY") or os.getenv("http_proxy") + https_proxy = os.getenv("HTTPS_PROXY") or os.getenv("https_proxy") + + if not http_proxy and not https_proxy: + all_proxy = os.getenv("ALL_PROXY") or os.getenv("all_proxy") + if all_proxy: + if all_proxy.startswith("socks5h://"): + http_proxy = all_proxy.replace("socks5h://", "http://") + else: + http_proxy = all_proxy + + proxy_url = http_proxy or https_proxy + + if proxy_url: + parsed = urlparse(proxy_url) + server = f"{parsed.scheme}://{parsed.netloc}" + no_proxy = os.getenv("NO_PROXY") or os.getenv("no_proxy", "localhost,127.0.0.1,::1") + + return { + "server": server, + "bypass": no_proxy + } + + return None + + async def fetch_html(self, url: str, wait_time: int = 3) -> Optional[str]: + log.info(f"🌐 正在访问页面: {url}") + try: + log.info("🕵️ 正在应用反爬虫绕过技术 (playwright-stealth)...") + async with Stealth().use_async(async_playwright()) as p: + return await self._process_page(p, url, wait_time) + except Exception as e: + log.error(f"❌ 抓取失败: {str(e)}") + import traceback + log.error(traceback.format_exc()) + return None + + async def _process_page(self, p, url: str, wait_time: int) -> str: + launch_args = { + "headless": self.headless, + "args": [ + "--no-sandbox", + "--disable-setuid-sandbox", + "--disable-blink-features=AutomationControlled", + ] + } + + if self.proxy_config: + launch_args["proxy"] = self.proxy_config + log.info(f"🕵️ DomFetcher 使用代理: {self.proxy_config['server']}") + + browser = await p.chromium.launch(**launch_args) + + context_options = { + "viewport": {"width": 1280, "height": 800}, + "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "locale": "zh-CN", + "timezone_id": "Asia/Shanghai", + } + + if self.proxy_config: + context_options["proxy"] = self.proxy_config + + context = await browser.new_context(**context_options) + page = await context.new_page() + + try: + log.info(f"📄 正在加载页面...") + await page.goto(url, timeout=60000, wait_until="domcontentloaded") + + try: + await page.wait_for_load_state("networkidle", timeout=15000) + log.info("✅ 网络请求已稳定") + except Exception: + log.warning("⚠️ 网络空闲等待超时,继续执行...") + + if wait_time > 0: + log.info(f"⏳ 额外等待 {wait_time} 秒...") + await asyncio.sleep(wait_time) + + html_content = await page.content() + log.info(f"✅ HTML获取成功,长度: {len(html_content)} 字符") + + return html_content + + finally: + await browser.close() + + + +class PlaywrightToolKit: + def __init__(self, headless: bool = True, use_proxy: Optional[bool] = None): + self.headless = headless + self.playwright: Optional[Playwright] = None + self.browser: Optional[Browser] = None + self.context: Optional[BrowserContext] = None + self.page: Optional[Page] = None + self.base_download_dir: Optional[str] = None + + if use_proxy is None: + has_env_proxy = bool( + os.getenv("HTTP_PROXY") or os.getenv("http_proxy") or + os.getenv("HTTPS_PROXY") or os.getenv("https_proxy") or + os.getenv("ALL_PROXY") or os.getenv("all_proxy") + ) + self.use_proxy = has_env_proxy + else: + self.use_proxy = use_proxy + + self.proxy_config = self._get_proxy_config() if self.use_proxy else None + + def _get_proxy_config(self) -> Optional[Dict[str, str]]: + http_proxy = os.getenv("HTTP_PROXY") or os.getenv("http_proxy") + https_proxy = os.getenv("HTTPS_PROXY") or os.getenv("https_proxy") + + if not http_proxy and not https_proxy: + all_proxy = os.getenv("ALL_PROXY") or os.getenv("all_proxy") + if all_proxy: + if all_proxy.startswith("socks5h://"): + http_proxy = all_proxy.replace("socks5h://", "http://") + else: + http_proxy = all_proxy + + proxy_url = http_proxy or https_proxy + + if proxy_url: + parsed = urlparse(proxy_url) + server = f"{parsed.scheme}://{parsed.netloc}" + no_proxy = os.getenv("NO_PROXY") or os.getenv("no_proxy", "localhost,127.0.0.1,::1") + return { + "server": server, + "bypass": no_proxy + } + + return { + "server": "http://127.0.0.1:7890", + "bypass": "localhost,127.0.0.1,0.0.0.0" + } + + async def start(self): + if not self.playwright: + self.playwright = await async_playwright().start() + if not self.browser: + launch_args = { + "headless": self.headless, + "args": ["--no-sandbox", "--disable-setuid-sandbox"] + } + if self.use_proxy: + launch_args["proxy"] = self.proxy_config + self.browser = await self.playwright.chromium.launch(**launch_args) + + context_options = { + "viewport": {"width": 1280, "height": 800}, + "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "ignore_https_errors": True + } + + if self.use_proxy and self.proxy_config: + context_options["proxy"] = self.proxy_config + log.info(f"✅ 已配置浏览器代理: {self.proxy_config['server']}") + else: + log.info("⚠️ 未使用代理") + + self.context = await self.browser.new_context(**context_options) + self.page = await self.context.new_page() + + async def _ensure_browser(self): + if not self.page: + await self.start() + + async def close(self): + if self.context: await self.context.close() + if self.browser: await self.browser.close() + if self.playwright: await self.playwright.stop() + self.page = None + + async def _wait_and_stabilize(self): + try: + if self.page: + await self.page.wait_for_load_state("domcontentloaded", timeout=5000) + await self.page.wait_for_load_state("networkidle", timeout=2000) + except Exception: + pass + await asyncio.sleep(1) + + async def navigate(self, url: str) -> str: + try: + await self._ensure_browser() + await self.page.goto(url, timeout=60000, wait_until="domcontentloaded") + await self._wait_and_stabilize() + title = await self.page.title() + return f"Success: Navigated to {url}. Page Title: {title}" + except Exception as e: + return f"Error navigating to {url}: {str(e)}" + + async def get_accessibility_tree(self, use_accessibility_tree: bool = True) -> str: + try: + await self._ensure_browser() + if use_accessibility_tree: + snapshot = await self.page.locator("body").aria_snapshot() + if not snapshot: + return "Empty accessibility tree (aria_snapshot)." + return snapshot + snapshot = await self.page.content() + if not snapshot: + return "Empty DOM (raw DOM)." + return snapshot + except Exception as e: + return f"Error getting snapshot: {str(e)}" + + async def click_element(self, element_text: str) -> str: + await self._ensure_browser() + log.info(f"🖱️ Attempting to click element with text: '{element_text}'") + + strategies = [ + { "name": "get_by_role('button')", "locator": lambda: self.page.get_by_role("button", name=element_text).first }, + { "name": "get_by_role('link')", "locator": lambda: self.page.get_by_role("link", name=element_text).first }, + { "name": "button:text=", "locator": lambda: self.page.locator(f"button:text={element_text}").first }, + { "name": "button:has-text()", "locator": lambda: self.page.locator(f"button:has-text('{element_text}')").first }, + { "name": "summary:text=", "locator": lambda: self.page.locator(f"summary:text={element_text}").first }, + { "name": "summary:has-text()", "locator": lambda: self.page.locator(f"summary:has-text('{element_text}')").first }, + { "name": "text=", "locator": lambda: self.page.locator(f"text={element_text}").first }, + { "name": ":has-text()", "locator": lambda: self.page.locator(f":has-text('{element_text}')").first }, + { "name": "CSS [aria-label]", "locator": lambda: self.page.locator(f"[aria-label='{element_text}']").first }, + { "name": "CSS [aria-label*='...']", "locator": lambda: self.page.locator(f"[aria-label*='{element_text[:20]}']").first if len(element_text) >= 5 else None, "skip_if": lambda: len(element_text) < 5 }, + { "name": "XPath contains", "locator": lambda: self.page.locator(f"xpath=//*[contains(text(), '{element_text}')]").first if "'" not in element_text else None, "skip_if": lambda: "'" in element_text }, + { "name": "XPath button contains", "locator": lambda: self.page.locator(f"xpath=//button[contains(text(), '{element_text}')]").first if "'" not in element_text else None, "skip_if": lambda: "'" in element_text }, + ] + + errors = [] + for strategy in strategies: + try: + if "skip_if" in strategy and strategy["skip_if"](): continue + locator = strategy["locator"]() + if locator is None: continue + + count = await locator.count() + if count > 0: + if await locator.is_visible(): + try: + await locator.wait_for(state="visible", timeout=3000) + await locator.click(timeout=5000) + await self._wait_and_stabilize() + return f"Success: Clicked element '{element_text}' using strategy '{strategy['name']}'" + except Exception as click_error: + errors.append(f"{strategy['name']}: 点击失败 - {str(click_error)}") + else: + errors.append(f"{strategy['name']}: 元素存在但不可见") + else: + errors.append(f"{strategy['name']}: 未找到元素") + except Exception as e: + errors.append(f"{strategy['name']}: 执行异常 - {str(e)}") + + error_summary = "\n".join([f" - {err}" for err in errors[:5]]) + return f"Error: Could not click element '{element_text}'.\n{error_summary}" + + async def input_text(self, element_label_or_placeholder: str, text: str) -> str: + await self._ensure_browser() + log.info(f"⌨️ Attempting to input '{text}' into field: '{element_label_or_placeholder}'") + + locate_strategies = [ + { "name": "get_by_placeholder", "locator": lambda: self.page.get_by_placeholder(element_label_or_placeholder).first }, + { "name": "get_by_label", "locator": lambda: self.page.get_by_label(element_label_or_placeholder).first }, + { "name": "get_by_role('textbox')", "locator": lambda: self.page.get_by_role("textbox", name=element_label_or_placeholder).first }, + { "name": "get_by_role('combobox')", "locator": lambda: self.page.get_by_role("combobox", name=element_label_or_placeholder).first }, + { "name": "input[placeholder*='...']", "locator": lambda: self.page.locator(f"input[placeholder*='{element_label_or_placeholder[:10]}']").first if len(element_label_or_placeholder) >= 5 else None, "skip_if": lambda: len(element_label_or_placeholder) < 5 }, + { "name": "容器查找 input", "locator": lambda: self.page.locator(f":text('{element_label_or_placeholder}') >> xpath=.. >> input").first if "'" not in element_label_or_placeholder else None, "skip_if": lambda: "'" in element_label_or_placeholder }, + ] + + target_locator = None + for strategy in locate_strategies: + try: + if "skip_if" in strategy and strategy["skip_if"](): continue + locator = strategy["locator"]() + if locator and await locator.count() > 0 and await locator.is_visible(): + target_locator = locator + log.info(f" ✅ Found input using: {strategy['name']}") + break + except: + continue + + if not target_locator: + return f"Error: Could not find input field matching '{element_label_or_placeholder}'." + + input_strategies = [ + { "name": "fill()", "func": lambda loc: self._input_using_fill(loc, text) }, + { "name": "type()", "func": lambda loc: self._input_using_type(loc, text) }, + { "name": "keyboard", "func": lambda loc: self._input_using_keyboard_char_by_char(loc, text) }, + ] + + for strategy in input_strategies: + try: + result = await strategy["func"](target_locator) + if result.startswith("Success"): + await self._wait_and_stabilize() + return f"{result} using {strategy['name']}" + except: + continue + + return f"Error: All input methods failed for '{element_label_or_placeholder}'." + + async def _input_using_fill(self, locator, text: str) -> str: + await locator.fill("") + await locator.fill(text) + await locator.press("Enter") + return f"Success: Input '{text}' using fill()" + + async def _input_using_type(self, locator, text: str) -> str: + await locator.fill("") + await locator.type(text, delay=50) + await locator.press("Enter") + return f"Success: Input '{text}' using type()" + + async def _input_using_keyboard_char_by_char(self, locator, text: str) -> str: + await locator.wait_for(state="visible", timeout=3000) + await locator.click() + for char in text: + await self.page.keyboard.type(char, delay=30) + await self.page.keyboard.press("Enter") + return f"Success: Input '{text}' using keyboard" + + async def go_back(self) -> str: + try: + await self._ensure_browser() + await self.page.go_back() + await self._wait_and_stabilize() + title = await self.page.title() + return f"Success: Navigated back. Current page title: {title}" + except Exception as e: + return f"Error going back: {str(e)}" + + async def download_resource(self, url: str, download_dir: Optional[str] = None) -> str: + """下载资源文件(信任 LLM 传入的完整 URL)""" + try: + if not url or not url.strip(): + return "Error: URL is required." + + log.info(f"📥 开始下载资源: {url}") + await self._ensure_browser() + + if download_dir is None: + download_dir = self.base_download_dir if self.base_download_dir else os.path.join(os.getcwd(), "downloads") + + os.makedirs(download_dir, exist_ok=True) + + parsed_url = urlparse(url) + path = unquote(parsed_url.path) + suggested_filename = os.path.basename(path) if path else "download" + + # 尝试通过 HEAD 请求获取文件名,但不修改 URL + try: + response = await self.context.request.head(url, timeout=30000) + content_disposition = response.headers.get('content-disposition', '') + if content_disposition: + import re + filename_match = re.search(r'filename[^;=\n]*=(([\'"]).*?\2|[^;\n]*)', content_disposition) + if filename_match: + suggested_filename = filename_match.group(1).strip('"\'') + await response.dispose() + except: + pass + + if not suggested_filename or suggested_filename == 'download' or '.' not in suggested_filename: + # 如果没有后缀,自动补全pdf(仅作为文件名兜底,不改变请求URL) + suggested_filename = f"download_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" + + save_path = os.path.join(download_dir, suggested_filename) + if os.path.exists(save_path): + name, ext = os.path.splitext(suggested_filename) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + save_path = os.path.join(download_dir, f"{name}_{timestamp}{ext}") + + log.info(f"⬇️ 正在下载文件...") + # 直接使用传入的 URL,不做任何硬修改 + response = await self.context.request.get(url, timeout=60000) + + if response.status >= 400: + error_text = await response.text() + await response.dispose() + return f"Error: HTTP {response.status} - {error_text[:200]}" + + content = await response.body() + await response.dispose() + + with open(save_path, 'wb') as f: + f.write(content) + + file_size_mb = os.path.getsize(save_path) / (1024 * 1024) + return f"Success: Downloaded resource to {save_path} (Size: {file_size_mb:.2f} MB)" + except Exception as e: + log.error(f"❌ 下载过程中发生错误: {str(e)}") + return f"Error downloading resource from {url}: {str(e)}" + + async def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: + tool_map = { + "navigate": self.navigate, + "get_accessibility_tree": self.get_accessibility_tree, + "click_element": self.click_element, + "input_text": self.input_text, + "go_back": self.go_back, + "download_resource": self.download_resource + } + if tool_name not in tool_map: + return f"Error: Tool '{tool_name}' not found." + return await tool_map[tool_name](**arguments) + + def get_tools_schema(self) -> List[Dict[str, Any]]: + return [ + { + "name": "navigate", + "description": "Navigate to a specific URL", + "parameters": { + "type": "object", + "properties": { + "url": {"type": "string", "description": "The URL to navigate to"} + }, + "required": ["url"] + } + }, + { + "name": "get_accessibility_tree", + "description": "Get the current page structure to read text and identify elements", + "parameters": { + "type": "object", + "properties": { + "use_accessibility_tree": { + "type": "boolean", + "description": "True 使用 aria_snapshot 过滤树;False 使用原始 DOM 快照", + "default": True + } + }, + "required": [] + } + }, + { + "name": "click_element", + "description": "Click an element by its visible text content", + "parameters": { + "type": "object", + "properties": { + "element_text": {"type": "string", "description": "The visible text on the element to click"} + }, + "required": ["element_text"] + } + }, + { + "name": "input_text", + "description": "Input text into a field identified by its Placeholder text or Label name", + "parameters": { + "type": "object", + "properties": { + "element_label_or_placeholder": {"type": "string", "description": "The placeholder text or label name"}, + "text": {"type": "string", "description": "Text to input"} + }, + "required": ["element_label_or_placeholder", "text"] + } + }, + { + "name": "go_back", + "description": "Navigate back to the previous page", + "parameters": { + "type": "object", + "properties": {}, + "required": [] + } + }, + { + "name": "download_resource", + "description": "Download a resource file. **CRITICAL**: You must provide the *final direct download URL*. The tool will NOT fix URLs for you. If you are downloading a paper (e.g., arXiv), you MUST manually convert the abstract URL to a PDF URL (e.g. change 'abs' to 'pdf' and add '.pdf') BEFORE calling this tool.", + "parameters": { + "type": "object", + "properties": { + "url": {"type": "string", "description": "The complete direct download URL. Example: verify that 'https://arxiv.org/abs/2506.21506' is changed to 'https://arxiv.org/pdf/2506.21506.pdf' before inputting here."}, + "download_dir": {"type": "string", "description": "Optional directory"} + }, + "required": ["url"] + } + }, + { + "name": "terminate", + "description": "Finish the task", + "parameters": { + "type": "object", + "properties": {}, + "required": [] + } + } + ] + +# ========================================== +# 2. 智能体逻辑 (Agent Logic) +# ========================================== + +class WebAgent: + def __init__(self, toolkit: PlaywrightToolKit, llm_config: Optional[Dict] = None, dom_save_dir: Optional[Path] = None): + self.toolkit = toolkit + self.action_history: List[Dict[str, Any]] = [] + self.accessibility_trees: List[Dict[str, Any]] = [] + self.consecutive_failures = 0 + self.dom_save_dir = dom_save_dir + self.visited_urls: set = set() + self.dom_fetcher = DomFetcher(headless=toolkit.headless) if hasattr(toolkit, 'headless') else DomFetcher(headless=True) + + self.llm_config = llm_config or { + "base_url": os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1"), + "api_key": os.getenv("DF_API_KEY", "sk-xxx"), + "model": os.getenv("THIRD_PARTY_MODEL", "gpt-4o"), + } + api_key = llm_config.get("api_key") + base_url = llm_config.get("base_url") + model = llm_config.get("model") + + http_client = httpx.AsyncClient(trust_env=False) + + self.client = AsyncOpenAI( + api_key=api_key, + base_url=base_url, + http_client=http_client + ) + self.model = model + log.info(f"🔗 LLM Initialized with Base URL: {base_url}") + + def _construct_prompt(self, task: str, current_tree: str) -> str: + if self.action_history: + history_lines = [] + for i, record in enumerate(self.action_history, 1): + step_num = record.get("step", i) + action = record.get("action", "") + result = record.get("result", "") + success = record.get("success", False) + status = "✅ SUCCESS" if success else "❌ FAILED" + history_lines.append(f"Step {step_num} [{status}]: {action}") + history_lines.append(f" Result: {result}") + history_str = "\n".join(history_lines) + else: + history_str = "None (Start of task)" + + tools_schema = self.toolkit.get_tools_schema() + tools_schema_str = json.dumps(tools_schema, indent=2, ensure_ascii=False) + + failure_warning = "" + if self.consecutive_failures >= 2: + failure_warning = f"\n⚠️ WARNING: {self.consecutive_failures} consecutive failures detected. Consider using go_back()." + + return f""" +You are an autonomous web automation agent. + +Your Goal: {task} + +--- ACTION SPACE (Available Tools) --- + +{tools_schema_str} + +--- ACTION HISTORY --- + +{history_str} + +{failure_warning} + +--- CURRENT ACCESSIBILITY TREE --- + +{current_tree} + +--- FEW-SHOT EXAMPLES (URL CORRECTION) --- + +**IMPORTANT**: When using `download_resource`, you MUST manually correct URLs from abstract/view pages to direct file links. The tool does NOT do this for you. + +Example 1 (arXiv Paper): +User Goal: "Download the paper at https://arxiv.org/abs/2506.21506" +Observation: The URL is an abstract page (.../abs/...), not a PDF. +Thought: "I need to download the PDF. The tool requires a direct link. I will convert 'abs' to 'pdf' and add the extension." +Tool Call: +{{ + "thought": "I am converting the abstract URL to a direct PDF URL to ensure successful download.", + "tool": "download_resource", + "args": {{ + "url": "https://arxiv.org/pdf/2506.21506.pdf" + }} +}} + +Example 2 (General PDF): +Observation: Link text says "Download Manual", href is "http://example.com/manual" (redirects to PDF). +Thought: "It is safer to assume the file ends in .pdf for the tool." +Tool Call: +{{ + "thought": "Appending .pdf to ensure the tool handles it as a file.", + "tool": "download_resource", + "args": {{ + "url": "http://example.com/manual.pdf" + }} +}} + +--- INSTRUCTIONS --- + +1. Review ACTION HISTORY. If actions failed, try a different approach. +2. Choose a tool from ACTION SPACE. +3. **CRITICAL**: BEFORE calling `download_resource`, verify the URL in your thought process. + - If it is an arXiv link (`/abs/`), YOU MUST change it to `/pdf/` and ensure it ends in `.pdf`. + - Do not use "hard" methods in the tool; use your intelligence to format the URL correctly in the `args`. +4. AVOID CAPTCHA sites (Google Scholar, Stack Overflow). +5. Output JSON format ONLY with "thought", "tool", and "args". +""" + + async def _call_real_llm(self, prompt: str) -> str: + log.info("🤔 Agent is thinking...") + try: + response = await self.client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": "You are a helpful web assistant. You always respond in valid JSON."}, + {"role": "user", "content": prompt} + ], + response_format={"type": "json_object"}, + temperature=0.1 + ) + return response.choices[0].message.content + except Exception as e: + log.error(f"❌ OpenAI API Error: {e}") + return "{}" + + async def _get_current_url(self) -> str: + try: + await self.toolkit._ensure_browser() + if self.toolkit.page: + return self.toolkit.page.url + except: + pass + return "Unknown" + + async def _save_dom_if_new_url(self, url: str, step: int) -> Optional[str]: + if not self.dom_save_dir: return None + normalized_url = url.split('#')[0] + if normalized_url in self.visited_urls: return None + self.visited_urls.add(normalized_url) + + dom_dir = self.dom_save_dir / "dom_snapshots" + dom_dir.mkdir(exist_ok=True) + + try: + html_content = await self.dom_fetcher.fetch_html(url, wait_time=3) + if html_content: + from urllib.parse import urlparse + parsed = urlparse(normalized_url) + domain = parsed.netloc.replace('.', '_') + path = parsed.path.replace('/', '_').strip('_') or 'index' + filename = f"step_{step:03d}_{domain}_{path[:50]}.html" + filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.')) + filepath = dom_dir / filename + with open(filepath, "w", encoding="utf-8") as f: + f.write(html_content) + return str(filepath) + return None + except Exception: + return None + + async def run(self, task: str, max_steps: int = 12) -> str: + log.info(f"🚀 Starting Real Task: {task}") + final_summary = "Task executed but no summary returned." + + for i in range(max_steps): + log.info(f"\n--- Step {i+1} ---") + tree = await self.toolkit.get_accessibility_tree() + current_url = await self._get_current_url() + + if current_url != "Unknown": + await self._save_dom_if_new_url(current_url, i + 1) + + tree_record = { + "step": i + 1, + "timestamp": datetime.now().isoformat(), + "url": current_url, + "accessibility_tree_snippet": tree[:5000], + "task": task + } + self.accessibility_trees.append(tree_record) + + prompt = self._construct_prompt(task, tree) + response_str = await self._call_real_llm(prompt) + + try: + action_data = json.loads(response_str) + thought = action_data.get("thought", "No thought") + tool_name = action_data.get("tool") + args = action_data.get("args", {}) + + log.info(f"🧠 Thought: {thought}") + log.info(f"🛠️ Action: {tool_name} {args}") + + if tool_name == "terminate": + log.info("✅ Agent completed the task.") + final_summary = thought or "Task completed successfully." + break + + if not tool_name or not str(tool_name).strip(): + log.warning("⚠️ LLM 返回的 tool 为空(可能超时或 API 异常),本步跳过") + result = "Error: No tool returned (API timeout or empty response)." + self.consecutive_failures += 1 + self.action_history.append({ + "step": i + 1, + "action": "(empty)", + "result": result, + "success": False, + "thought": thought + }) + continue + + result = await self.toolkit.execute_tool(tool_name, args) + log.info(f"📝 Result: {result}") + + if tool_name == "navigate" and result.startswith("Success"): + await asyncio.sleep(1) + new_url = await self._get_current_url() + if new_url != "Unknown": + await self._save_dom_if_new_url(new_url, i + 1) + + is_success = result.startswith("Success") + if is_success: + self.consecutive_failures = 0 + else: + self.consecutive_failures += 1 + + self.action_history.append({ + "step": i + 1, + "action": f"{tool_name}({args})", + "result": result, + "success": is_success, + "thought": thought + }) + + except json.JSONDecodeError: + log.error(f"❌ JSON Error: {response_str}") + self.consecutive_failures += 1 + except Exception as e: + log.error(f"❌ Exec Error: {e}") + self.consecutive_failures += 1 + + return final_summary + + def save_accessibility_trees(self, filepath: Optional[str] = None) -> str: + if not filepath: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filepath = f"accessibility_trees_{timestamp}.json" + try: + full_record = { + "accessibility_trees": self.accessibility_trees, + "action_history": self.action_history, + "consecutive_failures": self.consecutive_failures + } + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(full_record, f, indent=2, ensure_ascii=False) + return filepath + except: + return "" + +@register("websearch_researcher") +class WebsearchResearcherAgent(BaseAgent): + """Websearch Researcher Agent""" + + def __init__(self, tool_manager: Optional[ToolManager] = None, llm_config: Optional[Dict] = None, **kwargs): + super().__init__(tool_manager=tool_manager, **kwargs) + self.llm_config = llm_config or { + "base_url": os.getenv("DF_API_URL", "http://123.129.219.111:3000/v1"), + "api_key": os.getenv("DF_API_KEY", "sk-xxx"), + "model": os.getenv("THIRD_PARTY_MODEL", "gpt-4o"), + } + self.output_dir = Path("./raw_data_store") + self.output_dir.mkdir(parents=True, exist_ok=True) + + @classmethod + def create(cls, tool_manager: Optional[ToolManager] = None, **kwargs) -> "WebsearchResearcherAgent": + return cls(tool_manager=tool_manager, **kwargs) + + @property + def role_name(self) -> str: + return "websearch_researcher" + + @property + def system_prompt_template_name(self) -> str: + return "system_prompt_for_websearch_researcher" + + @property + def task_prompt_template_name(self) -> str: + return "task_prompt_for_websearch_researcher" + + def get_task_prompt_params(self, pre_tool_results: Dict[str, Any]) -> Dict[str, Any]: + return {"pre_tool_results": pre_tool_results} + + def get_default_pre_tool_results(self) -> Dict[str, Any]: + return {} + + async def run(self, state: MainState, **kwargs) -> Dict[str, Any]: + task_description = getattr(state, "current_task", "Find relevant financial data.") + log.info(f"[WebsearchResearcher] Task: {task_description}") + + timestamp = int(time.time()) + safe_task_name = "".join([c for c in task_description[:15] if c.isalnum() or c in (' ', '_')]).strip().replace(' ', '_') or "task" + session_dir = self.output_dir / f"{timestamp}_{safe_task_name}" + session_dir.mkdir(exist_ok=True) + + headless_mode = os.getenv("HEADLESS", "true").lower() == "true" + use_proxy_env = os.getenv("USE_PROXY", "").lower() + use_proxy = True if use_proxy_env == "true" else (False if use_proxy_env == "false" else None) + + toolkit = PlaywrightToolKit(headless=headless_mode, use_proxy=use_proxy) + final_resources_dir = session_dir / "final_resources" + final_resources_dir.mkdir(exist_ok=True) + toolkit.base_download_dir = str(final_resources_dir) + + final_summary = "Task executed but no summary returned." + + try: + await toolkit.start() + agent = WebAgent(toolkit=toolkit, llm_config=self.llm_config, dom_save_dir=session_dir) + final_summary = await agent.run(task_description, max_steps=12) + + try: + trees_file = agent.save_accessibility_trees(str(session_dir / "accessibility_trees.json")) + if trees_file: log.info(f"Accessibility trees saved to: {trees_file}") + except Exception as e: + log.warning(f"Failed to save accessibility trees: {e}") + + result_payload = { + "summary": final_summary, + "storage_path": str(session_dir), + "captured_files_count": 0, + "captured_files": [], + "status": "success" + } + except Exception as e: + log.error(f"[WebsearchResearcher] Execution failed: {e}", exc_info=True) + result_payload = {"error": str(e), "status": "failed"} + finally: + await toolkit.close() + + if hasattr(state, "agent_results") and state.agent_results is not None: + state.agent_results[self.role_name] = {"result": result_payload, "pre_tool_results": self.get_default_pre_tool_results()} + + return result_payload + +def create_websearch_researcher_agent(tool_manager: Optional[ToolManager] = None, **kwargs) -> WebsearchResearcherAgent: + return WebsearchResearcherAgent.create(tool_manager=tool_manager, **kwargs) \ No newline at end of file diff --git a/dataflow_agent/state.py b/dataflow_agent/state.py index 09baaaff..a86bcfdd 100644 --- a/dataflow_agent/state.py +++ b/dataflow_agent/state.py @@ -601,3 +601,47 @@ class Paper2DrawioState(MainState): output_xml_path: str = "" # XML 文件路径 output_png_path: str = "" # PNG 导出路径 output_svg_path: str = "" # SVG 导出路径 +# ==================== WebSearch Knowledge Store State ==================== +@dataclass +class WebsearchKnowledgeRequest(MainRequest): + """ + Web 搜索知识入库任务的 Request + - input_urls: 用户初始输入的 URL 列表 + """ + input_urls: List[str] = field(default_factory=list) + + +@dataclass +class WebsearchKnowledgeState(MainState): + """ + Web 搜索知识入库任务的 State,继承自 MainState + + 全局状态字段: + - input_urls: 用户初始输入的 URL 列表 + - research_routes: 由初始 URL 分析得出的不同领域调研路线队列(会被 planner 逐步弹出) + - original_research_routes: 原始的研究路线列表(不会被修改,供 curator 使用) + - raw_data_store: 追加型列表,存储所有阶段抓取到的原始内容 + - knowledge_base_summary: 最终清洗后的结构化数据的总结 + """ + # 重写 request 类型 + request: WebsearchKnowledgeRequest = field(default_factory=WebsearchKnowledgeRequest) + + # === 全局状态 === + # Input URLs: 用户初始输入的 URL 列表 + input_urls: List[str] = field(default_factory=list) + + # Research Routes (研究计划队列) - 会被 planner 逐步弹出 + research_routes: List[str] = field(default_factory=list) + + # Original Research Routes (原始研究路线) - 不会被修改,供 chief_curator 使用 + original_research_routes: List[str] = field(default_factory=list) + + # 当前由 Planner 分配给 Web Researcher 执行的任务 + # 注意:必须作为显式字段存在,避免在 LangGraph 状态合并时被丢弃 + current_task: str = "" + + # Raw Data Store: 存储原始内容(文本、多模态资源引用等) + raw_data_store: List[Dict[str, Any]] = field(default_factory=list) + + # Knowledge Base Summary: 最终结构化总结 + knowledge_base_summary: str = "" diff --git a/dataflow_agent/workflow/wf_websearch_knowledge_store.py b/dataflow_agent/workflow/wf_websearch_knowledge_store.py new file mode 100644 index 00000000..72b5dce7 --- /dev/null +++ b/dataflow_agent/workflow/wf_websearch_knowledge_store.py @@ -0,0 +1,184 @@ +""" +websearch_knowledge_store workflow +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +根据用户提供的 Agent 设计,基于 LangGraph / GenericGraphBuilder +搭建一个「只包含节点与流转逻辑」的基础图。 + +全局状态字段(见 `WebsearchKnowledgeState`): +- input_urls: 用户初始输入的 URL 列表 +- research_routes: 研究计划队列 +- raw_data_store: 原始数据仓库 +- knowledge_base_summary: 最终结构化知识总结 + +节点角色: +- planner : 全知指挥官 +- initial_analyzer : 初始分析师 +- web_researcher : 外勤研究员 +- chief_curator : 首席馆长 + + +""" + +from __future__ import annotations + +from typing import List + +from dataflow_agent.workflow.registry import register +from dataflow_agent.graphbuilder.graph_builder import GenericGraphBuilder +from dataflow_agent.logger import get_logger +from dataflow_agent.state import WebsearchKnowledgeState + +log = get_logger(__name__) + + +@register("websearch_knowledge_store") +def create_websearch_knowledge_store_graph() -> GenericGraphBuilder: # noqa: N802 + """ + Workflow factory: dfa run --wf websearch_knowledge_store + + 仅负责: + - 声明全局状态类型 `WebsearchKnowledgeState` + - 搭建节点与流转关系(基于 LangGraph 条件边) + + """ + builder = GenericGraphBuilder( + state_model=WebsearchKnowledgeState, + entry_point="planner", # 流程从 Planner 开始 + ) + + async def planner_node(state: WebsearchKnowledgeState) -> WebsearchKnowledgeState: + """ + Planner Node (全知指挥官) + + 职责: + - 管理任务队列 research_routes + - 处理 Researcher 的产出并入库 + - 设定下一个执行任务 current_task + """ + from dataflow_agent.agentroles.paper2any_agents.websearch_planner import create_websearch_planner_agent + planner = create_websearch_planner_agent() + await planner.run(state) + return state + + async def initial_analyzer_node(state: WebsearchKnowledgeState) -> WebsearchKnowledgeState: + """ + Initial Analyzer (初始分析师) + + 职责: + - 访问 Input URLs + - 提取正文并保存 DOM + - 产出初始研究路线 research_routes + """ + from dataflow_agent.agentroles.paper2any_agents.websearch_initial_analyzer import create_websearch_initial_analyzer_agent + analyzer = create_websearch_initial_analyzer_agent() + await analyzer.run(state) + return state + + async def web_researcher_node(state: WebsearchKnowledgeState) -> WebsearchKnowledgeState: + """ + Web Researcher (外勤研究员) + + 职责: + - 依据 current_task 进行联网深度搜索 + - 抓取新网页内容,保存 DOM + """ + from dataflow_agent.agentroles.paper2any_agents.websearch_researcher import create_websearch_researcher_agent + researcher = create_websearch_researcher_agent() + await researcher.run(state) + return state + + async def chief_curator_node(state: WebsearchKnowledgeState) -> WebsearchKnowledgeState: + """ + Chief Curator (首席馆长) + + 职责: + - 读取 Raw Data Store 全量数据 + - 生成最终结构化知识总结 knowledge_base_summary + """ + from dataflow_agent.agentroles.paper2any_agents.websearch_curator import create_websearch_curator_agent + curator = create_websearch_curator_agent() + await curator.run(state) + return state + + # 简单终止节点:直接回传状态 + def _end_node(state: WebsearchKnowledgeState) -> WebsearchKnowledgeState: + log.debug("[_end_] reached end of websearch_knowledge_store workflow.") + return state + + # ============================================================== + # 注册 nodes + # ============================================================== + nodes = { + "planner": planner_node, + "initial_analyzer": initial_analyzer_node, + "web_researcher": web_researcher_node, + "chief_curator": chief_curator_node, + "_end_": _end_node, + } + + builder.add_nodes(nodes) + + # ============================================================== + # EDGES: 非条件边(执行完之后统一回到 planner) + # ============================================================== + edges: List[tuple[str, str]] = [ + ("initial_analyzer", "planner"), + ("web_researcher", "planner"), + ("chief_curator", "planner"), + ] + builder.add_edges(edges) + + # ============================================================== + # CONDITIONAL EDGES: Planner 的流转逻辑 + # ============================================================== + def planner_condition(state: WebsearchKnowledgeState) -> str: + """ + Planner 条件路由逻辑,对应用户描述中的: + - 判断逻辑 A: Research Routes 为空?Input URLs 有内容吗? + - 判断逻辑 B: Research Routes 里有未执行的计划吗? + - 判断逻辑 C: 计划都执行完了吗?内容尚未清洗入库吗? + - 判断逻辑 D: 计划都执行完了吗?内容都清洗入库了吗? + + 约定(仅作图阶段的简化): + - `research_routes` 作为待执行计划队列: + - 非空 → 代表还有未执行计划 + - 由各节点自行维护入队 / 出队 + - `raw_data_store` 非空且 `knowledge_base_summary` 为空 → 代表需要 Chief Curator 清洗 + - `knowledge_base_summary` 非空 → 代表已完成清洗入库 + """ + # 从主 state 或 request 上获取用户输入 URL + input_urls = state.input_urls or getattr(state.request, "input_urls", []) + research_routes = state.research_routes + raw_data_store = state.raw_data_store + knowledge_base_summary = state.knowledge_base_summary + + # 判断逻辑 B: Research Routes 里有未执行的计划吗? + if research_routes: + # 有计划就优先执行计划(执行 Web Researcher) + return "web_researcher" + + # 判断逻辑 A: Research Routes 为空,Input URLs 有内容吗? + if not research_routes and input_urls and not raw_data_store: + # 还没有做过初始分析,但有 URL,可以进入 Initial Analyzer + return "initial_analyzer" + + # 判断逻辑 C: 计划都执行完,但内容尚未清洗入库 + if (not research_routes) and raw_data_store and not knowledge_base_summary: + return "chief_curator" + + # 判断逻辑 D: 计划都执行完,内容都清洗入库 + if (not research_routes) and knowledge_base_summary: + return "_end_" + + # 兜底:如果没有 URL 且无计划也无数据,直接结束 + return "_end_" + + builder.add_conditional_edges({"planner": planner_condition}) + + return builder + + + + + + diff --git a/pyproject.toml b/pyproject.toml index d411119d..b55c251f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,6 +27,7 @@ dependencies = [ [project.optional-dependencies] dev = [ "pytest", + "pytest-asyncio", "pytest-cov", "black", "ruff", @@ -60,3 +61,7 @@ exclude = ["tests*"] [project.scripts] dfa = "dataflow_agent.cli:cli" + +[tool.pytest.ini_options] +asyncio_mode = "auto" + diff --git a/requirements-base.txt b/requirements-base.txt index 724962ba..1959692e 100644 --- a/requirements-base.txt +++ b/requirements-base.txt @@ -85,7 +85,8 @@ opencv-python==4.11.0.86 # rebuttal docling==2.71.0 - +# websearch knowledge store +playwright-stealth # speech librosa==0.11.0 soundfile==0.13.1 diff --git a/script/run_websearch_knowledge_store.py b/script/run_websearch_knowledge_store.py new file mode 100644 index 00000000..0f311fce --- /dev/null +++ b/script/run_websearch_knowledge_store.py @@ -0,0 +1,261 @@ +""" +运行 websearch_knowledge_store 工作流的脚本 +=========================================================================== + +■ 功能简介 +----------- +本脚本用于启动 "websearch_knowledge_store" 工作流。 +该工作流以一组种子 URL 为起点,通过 **多 Agent 协作** 完成: + 1. 种子页面的正文提取与初步分析 + 2. 自动规划「深度调研路线」 + 3. 联网搜索、逐条执行调研计划 + 4. 对全部原始数据进行清洗,产出结构化知识总结 + +最终产物存放于 raw_data_store/ 目录,包括 Markdown 知识文件与多媒体资源。 + + +■ 工作流节点 & 流程 +--------------------- +整体流程由 LangGraph 条件边驱动,以 planner 为"中枢": + + ┌─────────────────────────────────────────────────────┐ + │ START │ + └────────────────────────┬────────────────────────────┘ + ▼ + ┌─────────────┐ + ┌────▶│ planner │◀──────────────────┐ + │ │ │ │ + │ └──────┬──────┘ │ + │ │ 根据当前状态做条件路由 │ + │ ├───────────┐ │ + │ │ │ │ + │ ▼ ▼ │ + │ ┌──────────────┐ ┌──────────────┐ │ + │ │ initial_ │ │ web_ │ │ + │ │ analyzer │ │ researcher │ │ + │ │ (初始分析师) │ │ (webagent) │ │ + │ └──────┬───────┘ └──────┬───────┘ │ + │ │ │ │ + │ └───────┬────────┘ │ + │ ▼ │ + │ (回到 planner) │ + │ │ + │ 当调研计划全部完成且数据尚未清洗 ──────▶│ + │ │ + │ ┌──────────────┐ │ + └─────────│ chief_curator│───────────────┘ + │ (数据整理) │ + └──────┬───────┘ + │ + ▼ + ┌─────────┐ + │ _end_ │ + └─────────┘ + +节点说明: + 1. planner + - 管理任务队列 research_routes + - 处理 Researcher 产出并入库 raw_data_store + - 设定下一个执行任务 current_task + - 根据状态做如下条件路由: + A) research_routes 为空 & input_urls 有值 & 尚未分析 → initial_analyzer + B) research_routes 非空(有未执行计划) → web_researcher + C) 计划全部完成 & raw_data_store 有数据但未清洗 → chief_curator + D) 计划全部完成 & knowledge_base_summary 已生成 → _end_ + + 2. initial_analyzer (初始分析师) + - 访问种子 URL,提取正文内容并保存 DOM 快照 + - 分析页面内容,规划出一系列深度调研路线 (research_routes) + + 3. web_researcher (web_agent) + - 依据 current_task 进行联网搜索探索 + - 抓取新网页内容,保存 DOM 与提取文本 + + 4. chief_curator (数据整理) + - 读取 raw_data_store 全量数据 + - 对每条调研路线分别生成结构化 Markdown 知识文件 + - 汇总生成 knowledge_base_summary + + +■ 全局状态字段 (WebsearchKnowledgeState) +------------------------------------------ + - input_urls : 用户初始输入的种子 URL 列表 + - research_routes : 研究计划队列(会被 planner 逐步弹出) + - original_research_routes: 原始研究路线(不变,供 curator 参照) + - current_task : 当前由 planner 分配给 researcher 的任务 + - raw_data_store : 追加型列表,存储所有阶段抓取到的原始数据 + - knowledge_base_summary : 最终清洗后的结构化知识总结 + + +■ 可配置参数 (见下方 "可配置参数" 区域) +------------------------------------------ + INPUT_URLS : 种子 URL 列表,可以是一篇技术博客、论文页面、文档等 + LANGUAGE : 输出语言偏好,"zh" 为中文、"en" 为英文 + + +■ 环境变量 (在 MainRequest 基类中读取) +------------------------------------------ + DF_API_URL : LLM Chat API 的基础地址 (默认 "test") + DF_API_KEY : LLM API Key (默认 "test") + + 也可在下方 WebsearchKnowledgeRequest 实例中手动覆盖: + req.chat_api_url = "https://..." + req.api_key = "sk-..." + req.model = "gpt-4o" + + +■ 依赖安装 +----------- + 1. 安装项目基础依赖(在项目根目录下执行): + pip install -r requirements-base.txt + + 其中与本工作流直接相关的关键依赖包括: + - langgraph / langchain 系列 : 工作流引擎 & LLM 调用 + - playwright : 无头浏览器,用于抓取网页 DOM + - beautifulsoup4 : HTML 解析与正文清洗 + - httpx : 异步 HTTP 客户端(调用 MineruHTML API 等) + - trafilatura : 备选正文提取 + - openai : OpenAI 兼容 Chat API 调用 + + 2. 安装 Playwright 浏览器内核(首次需要): + playwright install chromium + + 如果系统缺少依赖库,可追加: + playwright install --with-deps chromium + + +■ MineruHTML 部署 +------------------ + 本工作流的 initial_analyzer 和 chief_curator 均依赖 MineruHTML + (一个基于 LLM 微调的 HTML 正文提取服务)来从网页 DOM 中提取有效正文。 + + 默认 API 地址: http://localhost:7771 (可通过环境变量 MINERUHTML_API_URL 覆盖) + API 端点: POST /extract + 请求体: { "html": "<完整 HTML 字符串>" } + 响应体: { "main_html": "<提取出的正文 HTML>" } + + 部署步骤: + 1) 克隆仓库: + git clone https://github.com/opendatalab/MinerU-HTML.git + cd MinerU-HTML + + 2) 安装依赖与模型: + pip install . + + 3) 启动服务(默认监听 7771 端口): + python -m dripper.server \ + --model_path /path/to/your/model \ + --port 7771 + + 4) 验证服务是否就绪: + curl -X POST http://localhost:7771/extract \ + -H "Content-Type: application/json" \ + -d '{"html": "

hello

"}' + + 如果 MineruHTML 部署在其他机器或端口,通过环境变量指定: + export MINERUHTML_API_URL="http://:" + + +■ 使用方法 +----------- + 1. 激活 conda 环境: + conda activate + + 2. 配置必需的环境变量(也可写入 .env 文件): + export DF_API_URL="https://api.openai.com/v1" # LLM Chat API 地址 + export DF_API_KEY="sk-..." # LLM API Key + # 可选: + export THIRD_PARTY_MODEL="gpt-4o" # 模型名称(默认 gpt-4o) + export MINERUHTML_API_URL="http://localhost:7771" # MineruHTML 服务地址 + export HEADLESS="true" # 浏览器是否无头模式(默认 true) + export HTTP_PROXY="http://127.0.0.1:7890" # 代理(可选,最好设置代理) + export HTTPS_PROXY="http://127.0.0.1:7890" + + 3. 在项目根目录下运行: + python script/run_websearch_knowledge_store.py + + 4. 运行结束后,在 raw_data_store/ 目录下查看产出的知识文件。 +""" + +from __future__ import annotations + +import asyncio + +from dataflow_agent.state import WebsearchKnowledgeRequest, WebsearchKnowledgeState +from dataflow_agent.workflow import run_workflow + + +# ================== 可配置参数 ================== +# 种子 URL 列表 —— 工作流将从这些页面出发进行深度调研 +# 可以替换为你感兴趣的任意网页 URL(支持多个) +INPUT_URLS: list[str] = [ + "https://zhuanlan.zhihu.com/p/624221952", +] + +# 输出语言偏好: "zh" (中文) | "en" (英文) | 其他 BCP-47 语言代码 +LANGUAGE: str = "zh" +# ================================================= + + +async def run_websearch_knowledge_store_pipeline(): + """ + 构造 WebsearchKnowledgeState,并运行 websearch_knowledge_store 工作流。 + + 流程: + 1) 根据上方可配置参数构造 Request 对象 + 2) 用 Request 初始化全局 State + 3) 调用 run_workflow 启动 LangGraph 工作流 + 4) 返回最终 State(包含 knowledge_base_summary 等产物) + """ + # 1) 构造 Request —— 携带用户输入与 LLM 配置 + req = WebsearchKnowledgeRequest( + language=LANGUAGE, + input_urls=INPUT_URLS, + ) + + # 2) 初始化 State —— 将 input_urls 同步写入顶层字段 + state = WebsearchKnowledgeState( + request=req, + input_urls=req.input_urls, + ) + + # 3) 运行工作流 —— 名称需与 @register("websearch_knowledge_store") 一致 + final_state = await run_workflow( + "websearch_knowledge_store", state + ) + return final_state + + +def main() -> None: + """ + 同步入口: 运行异步主流程并打印关键结果。 + """ + final_state = asyncio.run(run_websearch_knowledge_store_pipeline()) + + # ---------- 打印关键信息,便于快速查看结果 ---------- + print("\n=== WebsearchKnowledgeState ===") + + # 兼容处理: run_workflow 可能返回 dict 或 dataclass 对象 + if isinstance(final_state, dict): + input_urls = final_state.get("input_urls", []) + research_routes = final_state.get("research_routes", []) + raw_data_store = final_state.get("raw_data_store", []) + knowledge_base_summary = final_state.get("knowledge_base_summary", "") + else: + input_urls = getattr(final_state, "input_urls", []) + research_routes = getattr(final_state, "research_routes", []) + raw_data_store = getattr(final_state, "raw_data_store", []) + knowledge_base_summary = getattr(final_state, "knowledge_base_summary", "") + + print(f"input_urls : {input_urls}") + print(f"research_routes : {research_routes}") + print(f"raw_data_store size : {len(raw_data_store) if raw_data_store else 0}") + print("knowledge_base_summary (截取前 500 字):") + if knowledge_base_summary: + print(knowledge_base_summary[:500]) + else: + print("(empty)") + + +if __name__ == "__main__": + main() diff --git a/tests/test_websearch_curator.py b/tests/test_websearch_curator.py new file mode 100644 index 00000000..b2f9e54c --- /dev/null +++ b/tests/test_websearch_curator.py @@ -0,0 +1,243 @@ +""" +WebsearchChiefCuratorAgent 直接调用测试 +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +运行方式: + python tests/test_websearch_curator.py + +说明: +- 不使用 pytest,直接调用 chief_curator 节点(WebsearchChiefCuratorAgent) +- 使用 WebsearchKnowledgeState,模拟 workflow 中的 chief_curator_node 调用 +- 使用 tests/raw_data_store 中的真实数据进行测试 +- 将返回的完整结果 JSON 保存到 tests/debug_websearch_curator_result.json +""" + +from __future__ import annotations + +import asyncio +import json +import os +from pathlib import Path +import sys +from datetime import datetime + +# 确保项目根目录在 sys.path 中 +CURRENT_FILE = Path(__file__).resolve() +PROJECT_ROOT = CURRENT_FILE.parent.parent +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from dataflow_agent.state import WebsearchKnowledgeState +from dataflow_agent.agentroles.paper2any_agents.websearch_curator import ( + WebsearchChiefCuratorAgent, +) + + +def discover_raw_data_store(raw_data_dir: Path) -> tuple[list[dict], list[str]]: + """ + 扫描 raw_data_store 目录,构建 raw_data_store 和 research_routes + + Returns: + tuple: (raw_data_store, research_routes) + """ + raw_data_store = [] + research_routes = [] + + if not raw_data_dir.exists(): + print(f"⚠️ 目录不存在: {raw_data_dir}") + return raw_data_store, research_routes + + # 遍历所有子目录 + for session_dir in sorted(raw_data_dir.iterdir()): + if not session_dir.is_dir(): + continue + + # 1. 检查 initial_analysis 目录,读取 research_routes + if "initial_analysis" in session_dir.name: + summary_file = session_dir / "analysis_summary.json" + if summary_file.exists(): + with open(summary_file, "r", encoding="utf-8") as f: + summary = json.load(f) + + # 提取 research_routes(只取一次,避免重复) + if not research_routes: + research_routes = summary.get("research_routes", []) + print(f"📋 从 {summary_file.name} 读取到 {len(research_routes)} 个研究子任务") + + # 从 raw_data_records 构建数据 + for record in summary.get("raw_data_records", []): + dom_path = record.get("dom_filepath", "") + # 修正相对路径 - 需要基于项目根目录 + if dom_path and not os.path.isabs(dom_path): + full_dom_path = PROJECT_ROOT / dom_path + else: + full_dom_path = Path(dom_path) + + if full_dom_path.exists(): + raw_data_store.append({ + "url": record.get("url", ""), + "dom_filepath": str(full_dom_path), + "timestamp": record.get("timestamp", datetime.now().isoformat()), + "source": "initial_analysis" + }) + + # 2. 检查子任务目录的 dom_snapshots + dom_snapshots_dir = session_dir / "dom_snapshots" + if dom_snapshots_dir.exists() and dom_snapshots_dir.is_dir(): + # 读取 accessibility_trees.json 获取 URL 信息 + acc_trees_file = session_dir / "accessibility_trees.json" + url_map = {} + if acc_trees_file.exists(): + with open(acc_trees_file, "r", encoding="utf-8") as f: + acc_data = json.load(f) + for tree in acc_data.get("accessibility_trees", []): + step = tree.get("step") + url = tree.get("url", "") + if step and url: + url_map[step] = url + + # 遍历 dom 快照 + for html_file in sorted(dom_snapshots_dir.glob("*.html")): + # 跳过 blank 页面 + if "_blank" in html_file.name: + continue + + # 从文件名提取 step 编号 + # 格式: step_001_arxiv_org_index.html + parts = html_file.stem.split("_") + if len(parts) >= 2 and parts[0] == "step": + try: + step_num = int(parts[1]) + url = url_map.get(step_num, f"https://{parts[2]}" if len(parts) > 2 else "unknown") + except (ValueError, IndexError): + url = "unknown" + else: + url = "unknown" + + raw_data_store.append({ + "url": url, + "dom_filepath": str(html_file), + "timestamp": datetime.now().isoformat(), + "source": session_dir.name + }) + + print(f"📦 共发现 {len(raw_data_store)} 个原始数据源") + return raw_data_store, research_routes + + +async def run_curator_node() -> dict: + """ + 直接调用 chief_curator 节点,使用真实数据 + """ + # 1. 设置真实数据目录 + raw_data_dir = PROJECT_ROOT / "tests" / "raw_data_store" + + print("=" * 80) + print("🔍 正在扫描真实数据目录...") + print(f"📁 数据目录: {raw_data_dir}") + print("=" * 80) + + # 2. 发现数据 + raw_data_store, research_routes = discover_raw_data_store(raw_data_dir) + + if not raw_data_store: + print("❌ 未发现任何有效数据") + return {"status": "failed", "reason": "No raw data found"} + + if not research_routes: + print("⚠️ 未找到 research_routes,使用默认子任务") + research_routes = ["分析网页内容并提取关键信息"] + + # 打印发现的数据摘要 + print("\n📊 数据摘要:") + print(f" - 原始数据源数量: {len(raw_data_store)}") + print(f" - 研究子任务数量: {len(research_routes)}") + + print("\n📋 研究子任务列表:") + for i, route in enumerate(research_routes[:5], 1): # 只打印前5个 + print(f" {i}. {route[:60]}..." if len(route) > 60 else f" {i}. {route}") + if len(research_routes) > 5: + print(f" ... 还有 {len(research_routes) - 5} 个子任务") + + print("\n📄 部分数据源预览:") + for item in raw_data_store[:3]: # 只打印前3个 + print(f" - URL: {item['url'][:50]}...") + print(f" DOM: {Path(item['dom_filepath']).name}") + if len(raw_data_store) > 3: + print(f" ... 还有 {len(raw_data_store) - 3} 个数据源") + + # 3. 构造 WebsearchKnowledgeState + state = WebsearchKnowledgeState() + state.raw_data_store = raw_data_store + state.research_routes = research_routes + + # 4. 初始化 WebsearchChiefCuratorAgent + agent = WebsearchChiefCuratorAgent.create(tool_manager=None) + + print("\n" + "=" * 80) + print("🚀 开始执行 chief_curator 节点...") + print("=" * 80) + + # 5. 执行 Agent + result = await agent.run(state) + + print("\n" + "=" * 80) + print("✅ chief_curator 节点执行完成") + print("=" * 80) + + print(f"\n📊 执行结果:") + print(f" - 状态: {result.get('status', 'unknown')}") + + if result.get("status") == "success": + print(f" - 输出目录: {result.get('curated_directory', 'N/A')}") + print(f" - 处理任务数: {result.get('tasks_processed', 0)}") + print(f" - 生成文件数: {len(result.get('files_created', []))}") + + # 打印生成的文件列表 + files = result.get('files_created', []) + if files: + print("\n📝 生成的知识文件:") + for f in files: + print(f" - {Path(f).name}") + + # 打印生成的总结预览 + summary = getattr(state, "knowledge_base_summary", "") + if summary: + print(f"\n📖 知识库状态: {summary}") + else: + print(f" - 原因: {result.get('reason', 'N/A')}") + + # 6. 将结果落盘 + debug_dir = Path("tests") + debug_dir.mkdir(parents=True, exist_ok=True) + debug_path = debug_dir / "debug_websearch_curator_result.json" + + # 添加更多调试信息 + result["_debug"] = { + "raw_data_count": len(raw_data_store), + "research_routes_count": len(research_routes), + "research_routes": research_routes, + "raw_data_sources": [{"url": d["url"], "source": d.get("source")} for d in raw_data_store[:10]] + } + + with debug_path.open("w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + print(f"\n💾 详细结果已保存至: {debug_path}") + + return result + + +async def main(): + """主函数""" + try: + result = await run_curator_node() + return result + except Exception as e: + print(f"\n❌ 执行失败: {e}") + import traceback + traceback.print_exc() + return {"status": "failed", "error": str(e)} + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_websearch_initial_analyzer.py b/tests/test_websearch_initial_analyzer.py new file mode 100644 index 00000000..b9e829e5 --- /dev/null +++ b/tests/test_websearch_initial_analyzer.py @@ -0,0 +1,109 @@ +""" +WebsearchInitialAnalyzerAgent 直接调用测试 +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +运行方式: + python tests/test_websearch_initial_analyzer.py + +说明: +- 不使用 pytest,直接调用 initial_analyzer 节点(WebsearchInitialAnalyzerAgent) +- 使用 WebsearchKnowledgeState,模拟 workflow 中的 initial_analyzer_node 调用 +- 将返回的完整结果 JSON 保存到 tests/debug_websearch_initial_analyzer_result.json +""" + +from __future__ import annotations + +import asyncio +import json +from pathlib import Path +import sys + +# 确保项目根目录在 sys.path 中,避免从 tests 目录运行时 import 失败 +CURRENT_FILE = Path(__file__).resolve() +PROJECT_ROOT = CURRENT_FILE.parent.parent +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from dataflow_agent.state import WebsearchKnowledgeState +from dataflow_agent.agentroles.paper2any_agents.websearch_initial_analyzer import ( + WebsearchInitialAnalyzerAgent, +) + + +async def run_initial_analyzer_node() -> dict: + """ + 直接调用 initial_analyzer 节点(模拟 workflow 中的 initial_analyzer_node) + + 返回: + dict: 包含执行结果的字典 + """ + # 使用指定的测试 URL + test_url = "https://ai-bot.cn/webagent/" + + # 构造 WebsearchKnowledgeState(模拟 workflow 中的状态) + state = WebsearchKnowledgeState() + + # 设置 input_urls(同时写到 state 和 request 中,和 workflow 逻辑保持一致) + state.input_urls = [test_url] + state.request.input_urls = [test_url] + + # 初始化 WebsearchInitialAnalyzerAgent(不注入 ToolManager,让 Agent 内部按默认逻辑执行) + agent = WebsearchInitialAnalyzerAgent.create(tool_manager=None) + + print("🚀 开始执行 initial_analyzer 节点...") + print(f"🔗 测试 URL: {test_url}") + print("-" * 80) + + result = await agent.run(state) + + print("-" * 80) + print("✅ initial_analyzer 节点执行完成") + print(f"📊 执行状态: {result.get('status', 'unknown')}") + + if result.get("status") == "success": + print(f"📁 存储路径: {result.get('session_dir', 'N/A')}") + print(f"📌 研究子任务数量: {len(result.get('research_routes', []))}") + print(f"📦 原始记录条数: {len(result.get('raw_data_store', []))}") + + # 将结果落盘,便于后续人工检查与回归 + debug_dir = Path("tests") + debug_dir.mkdir(parents=True, exist_ok=True) + debug_path = debug_dir / "debug_websearch_initial_analyzer_result.json" + + with debug_path.open("w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + print(f"💾 结果已保存至: {debug_path}") + + # 基础结构校验 + if not isinstance(result, dict): + print("⚠️ 警告: 返回结果不是字典类型") + elif "status" not in result: + print("⚠️ 警告: 结果中缺少 status 字段") + elif result.get("status") == "success": + required_fields = ["session_dir", "research_routes", "raw_data_store"] + missing_fields = [f for f in required_fields if f not in result] + if missing_fields: + print(f"⚠️ 警告: 成功时缺少以下字段: {missing_fields}") + else: + print("✅ 结果结构校验通过") + + return result + + +async def main(): + """主函数:直接运行 initial_analyzer 节点""" + try: + result = await run_initial_analyzer_node() + return result + except Exception as e: + print(f"❌ 执行失败: {e}") + import traceback + + traceback.print_exc() + return {"status": "failed", "error": str(e)} + + +if __name__ == "__main__": + asyncio.run(main()) + + diff --git a/tests/test_websearch_researcher.py b/tests/test_websearch_researcher.py new file mode 100644 index 00000000..50a8082b --- /dev/null +++ b/tests/test_websearch_researcher.py @@ -0,0 +1,112 @@ +""" +WebsearchResearcherAgent 直接调用测试 +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +运行方式: + python tests/test_websearch_researcher.py + +说明: +- 不使用 pytest,直接调用 research 节点(WebsearchResearcherAgent) +- 真实调用 LLM 与浏览器(依赖环境变量 DF_API_URL、DF_API_KEY 等) +- 使用 WebsearchKnowledgeState,模拟 workflow 中的 web_researcher_node 调用 +- 将返回的完整结果 JSON 保存到 tests/debug_websearch_researcher_result.json +""" + +from __future__ import annotations + +import asyncio +import json +from pathlib import Path +import sys + +# 确保项目根目录在 sys.path 中,避免从 tests 目录运行时 import 失败 +CURRENT_FILE = Path(__file__).resolve() +PROJECT_ROOT = CURRENT_FILE.parent.parent +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) + +from dataflow_agent.state import WebsearchKnowledgeState +from dataflow_agent.agentroles.paper2any_agents.websearch_researcher import ( + WebsearchResearcherAgent, +) + + +async def run_web_researcher_node() -> dict: + """ + 直接调用 research 节点(模拟 workflow 中的 web_researcher_node) + + 返回: + dict: 包含执行结果的字典 + """ + # 使用真实的 user_query(可以根据需要修改) + user_query = "请帮我在网页上查找一篇关于 diffusion model 的公开论文(例如 arXiv),并下载可用的 PDF 或相关附件。" + + # 构造 WebsearchKnowledgeState(模拟 workflow 中的状态) + state = WebsearchKnowledgeState() + + # 设置当前任务(WebsearchResearcherAgent 会从 state.current_task 读取) + setattr(state, "current_task", user_query) + + # 可以设置 research_routes(如果有的话) + # state.research_routes = ["研究路线1", "研究路线2"] + + # 初始化 WebsearchResearcherAgent(不注入 ToolManager,让 Agent 内部按默认逻辑执行) + agent = WebsearchResearcherAgent.create(tool_manager=None) + + # 直接调用 agent.run 方法(内部会自己构造 LLM 与 Browser) + print(f"🚀 开始执行 research 节点...") + print(f"📝 任务描述: {user_query}") + print("-" * 80) + + result = await agent.run(state) + + print("-" * 80) + print(f"✅ Research 节点执行完成") + print(f"📊 执行状态: {result.get('status', 'unknown')}") + + if result.get("status") == "success": + print(f"📄 摘要: {result.get('summary', 'N/A')[:200]}...") + print(f"📁 存储路径: {result.get('storage_path', 'N/A')}") + print(f"📎 捕获文件数: {result.get('captured_files_count', 0)}") + + # 将结果落盘,便于后续人工检查与回归 + debug_dir = Path("tests") + debug_dir.mkdir(parents=True, exist_ok=True) + debug_path = debug_dir / "debug_websearch_researcher_result.json" + + with debug_path.open("w", encoding="utf-8") as f: + json.dump(result, f, ensure_ascii=False, indent=2) + + print(f"💾 结果已保存至: {debug_path}") + + # 基础结构校验 + if not isinstance(result, dict): + print("⚠️ 警告: 返回结果不是字典类型") + elif "status" not in result: + print("⚠️ 警告: 结果中缺少 status 字段") + elif result.get("status") == "success": + required_fields = ["summary", "storage_path", "captured_files"] + missing_fields = [f for f in required_fields if f not in result] + if missing_fields: + print(f"⚠️ 警告: 成功时缺少以下字段: {missing_fields}") + else: + print("✅ 结果结构校验通过") + + return result + + +async def main(): + """主函数:直接运行 research 节点""" + try: + result = await run_web_researcher_node() + return result + except Exception as e: + print(f"❌ 执行失败: {e}") + import traceback + traceback.print_exc() + return {"status": "failed", "error": str(e)} + + +if __name__ == "__main__": + asyncio.run(main()) + +