diff --git a/Dockerfile b/Dockerfile index e69de29..37c7584 100644 --- a/Dockerfile +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.12-slim as builder + +WORKDIR /app +COPY pyproject.toml . +RUN pip install -e . + +FROM python:3.12-slim as runtime + +WORKDIR /app +COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin +COPY src/ ./src/ + +EXPOSE 8000 +CMD ["uvicorn", "src.api.app:create_app", "--host", "0.0.0.0", "--port", "8000", "--factory"] \ No newline at end of file diff --git a/agent_system/Dockerfile b/agent_system/Dockerfile deleted file mode 100644 index 37c7584..0000000 --- a/agent_system/Dockerfile +++ /dev/null @@ -1,15 +0,0 @@ -FROM python:3.12-slim as builder - -WORKDIR /app -COPY pyproject.toml . -RUN pip install -e . - -FROM python:3.12-slim as runtime - -WORKDIR /app -COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages -COPY --from=builder /usr/local/bin /usr/local/bin -COPY src/ ./src/ - -EXPOSE 8000 -CMD ["uvicorn", "src.api.app:create_app", "--host", "0.0.0.0", "--port", "8000", "--factory"] \ No newline at end of file diff --git a/agent_system/README.md b/agent_system/README.md index c435737..a6679bd 100644 --- a/agent_system/README.md +++ b/agent_system/README.md @@ -59,19 +59,20 @@ A self-optimizing blog generation agent that creates SEO-optimized content using ```bash git clone - cd gemini-with-search + cd agent_system ``` 2. **Create virtual environment** ```bash - python -m venv venv - source venv/bin/activate # On Windows: venv\Scripts\activate + python -m venv .venv + source .venv/bin/activate # On Windows: venv\Scripts\activate ``` 3. **Install dependencies** ```bash + pip install -r requirements.txt pip install -e . ``` @@ -93,7 +94,7 @@ Create a `.env` file with the following variables: ```env GOOGLE_API_KEY=your_google_api_key_here -GEMINI_MODEL=gemini-1.5-pro-latest +GEMINI_MODEL=gemini-2.5-flash LANGSMITH_API_KEY=your_langsmith_api_key_here LANGSMITH_PROJECT=gemini-search-blog-agent MAX_CONCURRENT_REQUESTS=10 diff --git a/agent_system/docker-compose.yml b/agent_system/docker-compose.yml deleted file mode 100644 index b12ce31..0000000 --- a/agent_system/docker-compose.yml +++ /dev/null @@ -1,45 +0,0 @@ -version: "3.8" - -services: - gemini-blog-agent: - build: - context: . - dockerfile: Dockerfile - ports: - - "8000:8000" - environment: - - GOOGLE_API_KEY=${GOOGLE_API_KEY} - - GEMINI_MODEL=${GEMINI_MODEL:-gemini-1.5-pro-latest} - - LANGSMITH_API_KEY=${LANGSMITH_API_KEY} - - LANGSMITH_PROJECT=${LANGSMITH_PROJECT:-gemini-search-blog-agent} - - MAX_CONCURRENT_REQUESTS=${MAX_CONCURRENT_REQUESTS:-10} - - MAX_SCRAPE_TIMEOUT=${MAX_SCRAPE_TIMEOUT:-10} - - MAX_ATTEMPTS=${MAX_ATTEMPTS:-3} - - SEO_THRESHOLD=${SEO_THRESHOLD:-75} - - ENVIRONMENT=production - volumes: - - ./logs:/app/logs - restart: unless-stopped - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/health"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 40s - - # Optional: Redis for production memory storage - redis: - image: redis:7-alpine - ports: - - "6379:6379" - volumes: - - redis_data:/data - restart: unless-stopped - healthcheck: - test: ["CMD", "redis-cli", "ping"] - interval: 30s - timeout: 3s - retries: 3 - -volumes: - redis_data: diff --git a/agent_system/requirements.txt b/agent_system/requirements.txt index a6b02b1..f8cd6cb 100644 --- a/agent_system/requirements.txt +++ b/agent_system/requirements.txt @@ -6,6 +6,7 @@ anyio==4.9.0 attrs==25.3.0 babel==2.17.0 beautifulsoup4==4.13.4 +black==25.1.0 cachetools==5.5.2 certifi==2025.7.14 charset-normalizer==3.4.2 @@ -21,6 +22,8 @@ google-api-core==2.25.1 google-api-python-client==2.176.0 google-auth==2.40.3 google-auth-httplib2==0.2.0 +google-custom-search==3.0.0 +google-genai==1.26.0 google-generativeai==0.8.5 googleapis-common-protos==1.70.0 greenlet==3.2.3 @@ -55,9 +58,13 @@ multidict==6.6.3 mypy_extensions==1.1.0 numpy==2.3.1 openai==1.97.0 +-e git+ssh://git@github.com/solve-ease/OptiBlogAi.git@98832b544e26ab25cf2140ab04fcd78e8fdbd151#egg=optiblogai&subdirectory=agent_system orjson==3.11.0 ormsgpack==1.10.0 packaging==25.0 +pathspec==0.12.1 +platformdirs==4.3.8 +playwright==1.53.0 propcache==0.3.2 proto-plus==1.26.1 protobuf==5.29.5 @@ -66,6 +73,7 @@ pyasn1_modules==0.4.2 pydantic==2.11.7 pydantic-settings==2.10.1 pydantic_core==2.33.2 +pyee==13.0.0 pyparsing==3.2.3 python-dateutil==2.9.0.post0 python-dotenv==1.1.1 @@ -81,7 +89,7 @@ soupsieve==2.7 SQLAlchemy==2.0.41 starlette==0.47.1 structlog==25.4.0 -tenacity==9.1.2 +tenacity==8.5.0 tiktoken==0.9.0 tld==0.13.1 tqdm==4.67.1 diff --git a/agent_system/smoke_test.py b/agent_system/smoke_test.py new file mode 100644 index 0000000..d14e4bf --- /dev/null +++ b/agent_system/smoke_test.py @@ -0,0 +1,9 @@ +import asyncio +from src.agents.graph import get_blog_generation_graph + +async def smoke(): + g = await get_blog_generation_graph() + result = await g.run_blog_generation("AI trends 2025") + print("✅ SUCCESS" if result["success"] else "❌ FAILED", result["final_score"]) + +asyncio.run(smoke()) \ No newline at end of file diff --git a/agent_system/src/__init__.py b/agent_system/src/__init__.py index 6f9da51..f79c37d 100644 --- a/agent_system/src/__init__.py +++ b/agent_system/src/__init__.py @@ -1 +1 @@ -"""Gemini-with-Search Blog Agent.""" \ No newline at end of file +"""Gemini-with-Search Blog Agent.""" diff --git a/agent_system/src/agents/__init__.py b/agent_system/src/agents/__init__.py index c3c37e5..72a27db 100644 --- a/agent_system/src/agents/__init__.py +++ b/agent_system/src/agents/__init__.py @@ -1 +1 @@ -"""LangGraph agent modules.""" \ No newline at end of file +"""LangGraph agent modules.""" diff --git a/agent_system/src/agents/graph.py b/agent_system/src/agents/graph.py index a217b70..817878b 100644 --- a/agent_system/src/agents/graph.py +++ b/agent_system/src/agents/graph.py @@ -13,6 +13,7 @@ evaluate_seo, react_agent, ) +from langgraph.checkpoint.memory import MemorySaver from src.agents.nodes.react_agent import decide_next_action from src.memory.checkpointer import get_memory_saver from src.utils.logger import get_logger @@ -20,126 +21,162 @@ logger = get_logger(__name__) +def check_search_results(state: GraphState) -> str: + """Check if search found results or failed.""" + if getattr(state, "search_failed", False) or not getattr(state, "top_posts", []): + logger.info("Search failed - ending workflow early") + return "search_failed" + return "continue" + + class BlogGenerationGraph: """Blog generation workflow using LangGraph.""" - + def __init__(self): """Initialize the blog generation graph.""" self.workflow = None self.app = None - + async def create_workflow(self) -> StateGraph: """Create and configure the LangGraph workflow.""" logger.info("Creating blog generation workflow") - + # Create the state graph workflow = StateGraph(GraphState) - + # Add nodes workflow.add_node("search", search_top_posts) workflow.add_node("scrape", scrape_posts) workflow.add_node("clean", clean_validate) workflow.add_node("generate", generate_blog) workflow.add_node("evaluate", evaluate_seo) - + # Set entry point workflow.set_entry_point("search") - - # Add linear edges - workflow.add_edge("search", "scrape") + + # Optional explicit finish point (not required if using END) + workflow.set_finish_point("evaluate") + + # Add conditional edge after search to check if results were found + workflow.add_conditional_edges( + "search", + check_search_results, + { + "continue": "scrape", # Continue workflow if search succeeded + "search_failed": END, # End workflow if search failed + }, + ) + + # Add linear edges for successful path workflow.add_edge("scrape", "clean") workflow.add_edge("clean", "generate") workflow.add_edge("generate", "evaluate") - + # Add conditional edge for the react agent logic workflow.add_conditional_edges( "evaluate", decide_next_action, { - "generate": "generate", # Retry generation - "__end__": END, # Fix: Use "__end__" as key - } + "generate": "generate", # match whatever your react_agent returns + END: END, + }, ) - + self.workflow = workflow logger.info("Blog generation workflow created successfully") - + return workflow - + async def compile_app(self): """Compile the workflow into a runnable application.""" if not self.workflow: await self.create_workflow() - + + ### Removign the memroy saver for nwo (** TBD by ADMIN) # Get memory saver memory_saver = await get_memory_saver() - + # Compile the workflow - self.app = self.workflow.compile(checkpointer=memory_saver) - + self.app = self.workflow.compile(checkpointer=MemorySaver()) + + # # NEW – compile **without** any check-pointer + # self.app = self.workflow.compile() + logger.info("Blog generation app compiled successfully") return self.app - + async def run_blog_generation( self, keyword: str, max_attempts: int = 3, seo_threshold: float = 75.0, - thread_id: str = "default" + thread_id: str = "default", ) -> Dict[str, Any]: """Run the complete blog generation workflow.""" if not self.app: await self.compile_app() - + # Create initial state initial_state = GraphState( keyword=keyword, max_attempts=min(max_attempts, 5), - seo_threshold=seo_threshold + seo_threshold=seo_threshold, ) - - # Configuration for LangGraph execution + + # Configuration for LangGraph execution (Errror Part with Enahce memory) config = { - "configurable": { - "thread_id": thread_id - }, + "configurable": {"thread_id": thread_id}, "recursion_limit": 15, - "max_concurrency": 4 + "max_concurrency": 4, } - + + ### skipping the meory saver for now ( To be changes very important) + + # # NEW – only keep execution limits + # config = { + # "recursion_limit": 15, + # "max_concurrency": 4, + # } + logger.info( "Starting blog generation workflow", keyword=keyword, max_attempts=initial_state.max_attempts, seo_threshold=seo_threshold, thread_id=thread_id, - recursion_limit=config["recursion_limit"] + recursion_limit=config["recursion_limit"], ) - + try: # Execute the workflow with proper configuration final_state = None - + # Use ainvoke as primary method try: final_state = await self.app.ainvoke(initial_state, config=config) - logger.info("Workflow completed via ainvoke", keyword=keyword, thread_id=thread_id) - + logger.info( + "Workflow completed via ainvoke", + keyword=keyword, + thread_id=thread_id, + ) + except Exception as invoke_error: - logger.warning("ainvoke failed, falling back to astream", error=str(invoke_error)) - + logger.warning( + "ainvoke failed, falling back to astream", error=str(invoke_error) + ) + # Fallback to astream if ainvoke fails step_count = 0 max_steps = 20 - + async for state in self.app.astream(initial_state, config=config): final_state = state step_count += 1 - + if step_count > max_steps: logger.error("Emergency stop: Too many workflow steps") break - + # Log intermediate progress if isinstance(state, dict) and len(state) == 1: node_name = list(state.keys())[0] @@ -148,48 +185,75 @@ async def run_blog_generation( node=node_name, keyword=keyword, thread_id=thread_id, - step=step_count + step=step_count, ) - + if final_state is None: raise Exception("Workflow execution failed - no final state") - + # Handle the final state properly - if isinstance(final_state, GraphState): - final_graph_state = final_state - elif isinstance(final_state, dict): - # Extract state from dict if needed - if len(final_state) == 1 and list(final_state.keys())[0] != "__end__": - final_graph_state = list(final_state.values())[0] - else: - # This might be the final state itself - final_graph_state = GraphState(**final_state) if "__end__" not in final_state else None - else: + # if isinstance(final_state, GraphState): + # final_graph_state = final_state + # elif isinstance(final_state, dict): + # # Extract state from dict if needed + # if len(final_state) == 1 and list(final_state.keys())[0] != "__end__": + # final_graph_state = list(final_state.values())[0] + # else: + # # This might be the final state itself + # final_graph_state = GraphState(**final_state) if "__end__" not in final_state else None + + if isinstance(final_state, dict) and "__end__" in final_state: + final_graph_state = final_state["__end__"] + elif isinstance(final_state, GraphState): final_graph_state = final_state - + # else: + # final_graph_state = final_state + + # Check if workflow ended due to search failure + # if hasattr(final_graph_state, 'search_failed') and final_graph_state.search_failed: + # logger.warning("Workflow ended due to search failure", keyword=keyword) + # return { + # "success": False, + # "final_blog": "", + # "seo_scores": {}, + # "final_score": 0.0, + # "attempts": 0, + # "keyword": keyword, + # "thread_id": thread_id, + # "error": final_graph_state.error_message, + # "reason": "search_failed" + # } + + final_graph_state = GraphState(**final_state) + # If we couldn't extract proper state, create fallback if not isinstance(final_graph_state, GraphState): - logger.warning("Could not extract proper final state, creating fallback") + logger.warning( + "Could not extract proper final state, creating fallback" + ) final_graph_state = GraphState( keyword=keyword, final_blog="", seo_scores={"final_score": 50.0}, final_score=50.0, - attempts=1 + attempts=1, ) - + # Determine success and content - has_content = bool(final_graph_state.final_blog.strip() or final_graph_state.draft_blog.strip()) + has_content = bool( + final_graph_state.final_blog.strip() + or final_graph_state.draft_blog.strip() + ) success = ( final_graph_state.final_score >= seo_threshold and has_content - ) or ( - has_content and final_graph_state.attempts >= max_attempts - ) - + ) or (has_content and final_graph_state.attempts >= max_attempts) + # Use final_blog if available, otherwise use draft_blog final_content = final_graph_state.final_blog or final_graph_state.draft_blog - if not final_content: - # Generate minimal fallback content + if not final_content and not getattr( + final_graph_state, "search_failed", False + ): + # Generate minimal fallback content only if search didn't fail final_content = f""" {keyword.title()} - Complete Guide @@ -212,7 +276,7 @@ async def run_blog_generation(

This guide provides an overview of {keyword}. Continue exploring to learn more!

""" success = True - + logger.info( "Blog generation workflow completed", keyword=keyword, @@ -220,9 +284,9 @@ async def run_blog_generation( final_score=final_graph_state.final_score, attempts=final_graph_state.attempts, content_length=len(final_content), - thread_id=thread_id + thread_id=thread_id, ) - + return { "success": success, "final_blog": final_content, @@ -230,36 +294,28 @@ async def run_blog_generation( "final_score": final_graph_state.final_score, "attempts": final_graph_state.attempts, "keyword": keyword, - "thread_id": thread_id + "thread_id": thread_id, } - + except Exception as e: logger.error( "Blog generation workflow failed", keyword=keyword, thread_id=thread_id, error=str(e), - error_type=type(e).__name__ + error_type=type(e).__name__, ) - - # Return fallback response - fallback_content = f""" - {keyword.title()} - Guide - - -

{keyword.title()}

-

This is a basic guide about {keyword}.

- """ - + return { "success": False, - "final_blog": fallback_content, - "seo_scores": {"final_score": 50.0}, - "final_score": 50.0, + "final_blog": "", + "seo_scores": {"final_score": 0.0}, + "final_score": 0.0, "attempts": 1, "keyword": keyword, "thread_id": thread_id, - "error": str(e) + "error": str(e), + "reason": "workflow_error", } @@ -270,9 +326,9 @@ async def run_blog_generation( async def get_blog_generation_graph() -> BlogGenerationGraph: """Get singleton blog generation graph instance.""" global _blog_graph - + if _blog_graph is None: _blog_graph = BlogGenerationGraph() await _blog_graph.compile_app() - - return _blog_graph \ No newline at end of file + + return _blog_graph diff --git a/agent_system/src/agents/nodes/__init__.py b/agent_system/src/agents/nodes/__init__.py index 5d5fc50..f2d5e86 100644 --- a/agent_system/src/agents/nodes/__init__.py +++ b/agent_system/src/agents/nodes/__init__.py @@ -9,9 +9,9 @@ __all__ = [ "search_top_posts", - "scrape_posts", + "scrape_posts", "clean_validate", "generate_blog", "evaluate_seo", "react_agent", -] \ No newline at end of file +] diff --git a/agent_system/src/agents/nodes/clean_validate.py b/agent_system/src/agents/nodes/clean_validate.py index 450ac48..490c1f4 100644 --- a/agent_system/src/agents/nodes/clean_validate.py +++ b/agent_system/src/agents/nodes/clean_validate.py @@ -3,7 +3,7 @@ from typing import Dict, Any, List from pydantic import BaseModel, ValidationError from src.schemas.state import GraphState -from src.tools.scraper import create_web_scraper +from src.tools.scraper import create_scraper from src.utils.logger import get_logger logger = get_logger(__name__) @@ -11,6 +11,7 @@ class CleanedPostSchema(BaseModel): """Schema for cleaned post content.""" + url: str title: str meta_description: str @@ -21,84 +22,84 @@ class CleanedPostSchema(BaseModel): async def clean_validate(state: GraphState) -> Dict[str, Any]: """Clean and validate scraped HTML content. - + Args: state: Current graph state containing raw_html_content - + Returns: Updated state with cleaned_posts """ - raw_html_content = getattr(state, 'raw_html_content', {}) - + raw_html_content = getattr(state, "raw_html_content", {}) + if not raw_html_content: logger.warning("No raw HTML content to clean") return {"cleaned_posts": []} - - logger.info("Starting content cleaning and validation", content_count=len(raw_html_content)) - + + logger.info( + "Starting content cleaning and validation", content_count=len(raw_html_content) + ) + cleaned_posts = [] - scraper = create_web_scraper() - + scraper = create_scraper() + for url, html in raw_html_content.items(): if not html: logger.debug("Skipping empty HTML content", url=url) continue - + try: # Clean HTML content using scraper utility cleaned_content = scraper.clean_html_content(html, url) - + if not cleaned_content: logger.warning("Failed to clean content", url=url) continue - + # Validate against schema validated_post = CleanedPostSchema(**cleaned_content) cleaned_posts.append(validated_post.model_dump()) - + logger.debug( "Successfully cleaned and validated post", url=url, word_count=validated_post.word_count, headings_count=len(validated_post.headings), - paragraphs_count=len(validated_post.paragraphs) + paragraphs_count=len(validated_post.paragraphs), ) - + except ValidationError as e: logger.warning( - "Content validation failed", - url=url, - validation_errors=str(e) + "Content validation failed", url=url, validation_errors=str(e) ) continue except Exception as e: logger.error( - "Unexpected error during content cleaning", - url=url, - error=str(e) + "Unexpected error during content cleaning", url=url, error=str(e) ) continue - + # Filter posts with insufficient content quality_posts = [] for post in cleaned_posts: - if (post["word_count"] >= 300 and - len(post["paragraphs"]) >= 3 and - post["title"].strip()): + if ( + post["word_count"] >= 300 + and len(post["paragraphs"]) >= 3 + and post["title"].strip() + ): quality_posts.append(post) else: logger.debug( "Filtered out low-quality post", url=post["url"], word_count=post["word_count"], - paragraphs=len(post["paragraphs"]) + paragraphs=len(post["paragraphs"]), ) - + logger.info( "Content cleaning completed", total_raw=len(raw_html_content), cleaned=len(cleaned_posts), - quality_filtered=len(quality_posts) + quality_filtered=len(quality_posts), ) - - return {"cleaned_posts": quality_posts} \ No newline at end of file + + return {"cleaned_posts": quality_posts} diff --git a/agent_system/src/agents/nodes/evaluate_seo.py b/agent_system/src/agents/nodes/evaluate_seo.py index 099b493..7350a85 100644 --- a/agent_system/src/agents/nodes/evaluate_seo.py +++ b/agent_system/src/agents/nodes/evaluate_seo.py @@ -1,326 +1,3 @@ -# """SEO evaluation node implementation.""" - -# import json -# import re -# from typing import Dict, Any -# from src.schemas.state import GraphState -# from src.tools.gemini_client import get_gemini_client -# from src.utils.logger import get_logger - -# logger = get_logger(__name__) - - -# async def evaluate_seo(state: GraphState) -> Dict[str, Any]: -# """Evaluate SEO quality of the generated blog content. - -# Args: -# state: Current graph state containing draft_blog and keyword - -# Returns: -# Updated state with seo_scores and final_score -# """ -# draft_blog = state.draft_blog -# keyword = state.keyword - -# if not draft_blog: -# logger.warning("No draft blog content to evaluate") -# return { -# "seo_scores": {}, -# "final_score": 0.0 -# } - -# logger.info("Starting SEO evaluation", keyword=keyword) - -# try: -# # Load SEO evaluation prompt template -# with open("src/agents/prompts/seo_eval_prompt.txt", "r") as f: -# seo_prompt_template = f.read() - -# # Format prompt with content and keyword -# seo_prompt = seo_prompt_template.format( -# blog_content=draft_blog, -# keyword=keyword -# ) - -# # Get evaluation from Gemini -# gemini_client = await get_gemini_client() - -# evaluation_response = await gemini_client.generate_content( -# prompt=seo_prompt, -# temperature=0.1 # Lower temperature for more consistent evaluation -# ) - -# # Parse JSON response -# seo_scores = _parse_seo_evaluation(evaluation_response) - -# # Add deterministic rule-based evaluation -# rule_based_scores = _evaluate_with_rules(draft_blog, keyword) - -# # Combine AI and rule-based scores (weighted average) -# final_scores = _combine_scores(seo_scores, rule_based_scores) - -# final_score = final_scores.get("final_score", 0.0) - -# logger.info( -# "SEO evaluation completed", -# keyword=keyword, -# final_score=final_score, -# ai_score=seo_scores.get("final_score", 0), -# rule_score=rule_based_scores.get("final_score", 0) -# ) - -# return { -# "seo_scores": final_scores, -# "final_score": final_score -# } - -# except Exception as e: -# logger.error( -# "SEO evaluation failed", -# keyword=keyword, -# error=str(e) -# ) - -# # Fallback to rule-based evaluation only -# rule_based_scores = _evaluate_with_rules(draft_blog, keyword) -# final_score = rule_based_scores.get("final_score", 0.0) - -# return { -# "seo_scores": rule_based_scores, -# "final_score": final_score -# } - - -# def _parse_seo_evaluation(response: str) -> Dict[str, Any]: -# """Parse SEO evaluation response from Gemini. - -# Args: -# response: Raw response from Gemini - -# Returns: -# Parsed SEO scores dictionary -# """ -# try: -# # Look for JSON content in the response -# json_match = re.search(r'```json\s*(\{.*?\})\s*```', response, re.DOTALL) - -# if json_match: -# json_content = json_match.group(1) -# scores = json.loads(json_content) - -# # Validate required fields -# required_fields = [ -# "title_score", "meta_description_score", "keyword_optimization_score", -# "content_structure_score", "readability_score", "content_quality_score", -# "technical_seo_score", "final_score" -# ] - -# for field in required_fields: -# if field not in scores: -# scores[field] = 0.0 - -# return scores -# else: -# logger.warning("No JSON found in SEO evaluation response") -# return {} - -# except json.JSONDecodeError as e: -# logger.error("Failed to parse SEO evaluation JSON", error=str(e)) -# return {} -# except Exception as e: -# logger.error("Unexpected error parsing SEO evaluation", error=str(e)) -# return {} - - -# def _evaluate_with_rules(content: str, keyword: str) -> Dict[str, Any]: -# """Evaluate content using deterministic rules. - -# Args: -# content: Blog content to evaluate -# keyword: Target keyword - -# Returns: -# Rule-based SEO scores -# """ -# scores = {} - -# # Title evaluation -# title_match = re.search(r'(.*?)', content, re.IGNORECASE) -# if title_match: -# title = title_match.group(1) -# title_score = 0 - -# if keyword.lower() in title.lower(): -# title_score += 40 -# if 30 <= len(title) <= 60: -# title_score += 30 -# if len(title) > 0: -# title_score += 30 - -# scores["title_score"] = min(title_score, 100) -# else: -# scores["title_score"] = 0 - -# # Meta description evaluation -# meta_match = re.search(r' 0: -# meta_score += 20 - -# scores["meta_description_score"] = min(meta_score, 100) -# else: -# scores["meta_description_score"] = 0 - -# # Keyword density evaluation -# content_text = re.sub(r'<[^>]+>', '', content) # Strip HTML -# word_count = len(content_text.split()) -# keyword_occurrences = len(re.findall(r'\b' + re.escape(keyword.lower()) + r'\b', content_text.lower())) - -# if word_count > 0: -# keyword_density = (keyword_occurrences / word_count) * 100 - -# if 1.0 <= keyword_density <= 2.5: -# scores["keyword_optimization_score"] = 100 -# elif 0.5 <= keyword_density < 1.0 or 2.5 < keyword_density <= 3.5: -# scores["keyword_optimization_score"] = 80 -# elif keyword_density > 0: -# scores["keyword_optimization_score"] = 60 -# else: -# scores["keyword_optimization_score"] = 0 -# else: -# scores["keyword_optimization_score"] = 0 - -# # Content structure evaluation -# h1_count = len(re.findall(r']*>', content, re.IGNORECASE)) -# h2_count = len(re.findall(r']*>', content, re.IGNORECASE)) -# h3_count = len(re.findall(r']*>', content, re.IGNORECASE)) -# p_count = len(re.findall(r']*>', content, re.IGNORECASE)) - -# structure_score = 0 -# if h1_count == 1: -# structure_score += 25 -# if h2_count >= 3: -# structure_score += 25 -# if h3_count >= 2: -# structure_score += 25 -# if p_count >= 5: -# structure_score += 25 - -# scores["content_structure_score"] = structure_score - -# # Content length evaluation -# if word_count >= 1200: -# length_score = 100 -# elif word_count >= 800: -# length_score = 80 -# elif word_count >= 500: -# length_score = 60 -# else: -# length_score = 40 - -# scores["content_quality_score"] = length_score - -# # Readability (simplified - based on average sentence length) -# sentences = re.split(r'[.!?]+', content_text) -# if len(sentences) > 1: -# avg_sentence_length = word_count / len(sentences) -# if 15 <= avg_sentence_length <= 20: -# readability_score = 100 -# elif 10 <= avg_sentence_length < 15 or 20 < avg_sentence_length <= 25: -# readability_score = 80 -# else: -# readability_score = 60 -# else: -# readability_score = 60 - -# scores["readability_score"] = readability_score - -# # Technical SEO (basic checks) -# tech_score = 0 -# if '' in content and '' in content: -# tech_score += 20 -# if 'meta name="description"' in content: -# tech_score += 20 -# if '= 1000: -# tech_score += 20 - -# scores["technical_seo_score"] = tech_score - -# # Calculate weighted final score -# weights = { -# "title_score": 0.15, -# "meta_description_score": 0.10, -# "keyword_optimization_score": 0.20, -# "content_structure_score": 0.15, -# "readability_score": 0.15, -# "content_quality_score": 0.15, -# "technical_seo_score": 0.10 -# } - -# final_score = sum(scores[key] * weights[key] for key in weights.keys()) -# scores["final_score"] = round(final_score, 1) - -# return scores - - -# def _combine_scores(ai_scores: Dict[str, Any], rule_scores: Dict[str, Any]) -> Dict[str, Any]: -# """Combine AI and rule-based scores with weighted average. - -# Args: -# ai_scores: Scores from AI evaluation -# rule_scores: Scores from rule-based evaluation - -# Returns: -# Combined scores dictionary -# """ -# combined = {} -# ai_weight = 0.4 -# rule_weight = 0.6 - -# score_keys = [ -# "title_score", "meta_description_score", "keyword_optimization_score", -# "content_structure_score", "readability_score", "content_quality_score", -# "technical_seo_score" -# ] - -# for key in score_keys: -# ai_score = ai_scores.get(key, 0) -# rule_score = rule_scores.get(key, 0) -# combined[key] = round(ai_score * ai_weight + rule_score * rule_weight, 1) - -# # Calculate final score -# weights = { -# "title_score": 0.15, -# "meta_description_score": 0.10, -# "keyword_optimization_score": 0.20, -# "content_structure_score": 0.15, -# "readability_score": 0.15, -# "content_quality_score": 0.15, -# "technical_seo_score": 0.10 -# } - -# final_score = sum(combined[key] * weights[key] for key in weights.keys()) -# combined["final_score"] = round(final_score, 1) - -# # Add feedback if available -# if "feedback" in ai_scores: -# combined["feedback"] = ai_scores["feedback"] - -# return combined - - - """SEO evaluation node implementation - Fixed JSON parsing.""" import json @@ -337,25 +14,22 @@ async def evaluate_seo(state: GraphState) -> Dict[str, Any]: """Evaluate SEO quality of the generated blog content.""" draft_blog = state.draft_blog keyword = state.keyword - + if not draft_blog: logger.warning("No draft blog content to evaluate") - return { - "seo_scores": {}, - "final_score": 0.0 - } - + return {"seo_scores": {}, "final_score": 0.0} + logger.info("Starting SEO evaluation", keyword=keyword) - + try: # Use rule-based evaluation as primary method (more reliable) rule_based_scores = _evaluate_with_rules(draft_blog, keyword) - + # Try AI evaluation as enhancement (if API key available) ai_scores = {} try: gemini_client = await get_gemini_client() - + seo_prompt = f""" Evaluate this blog content for SEO quality. Return ONLY a JSON object with these exact fields: {{ @@ -376,52 +50,43 @@ async def evaluate_seo(state: GraphState) -> Dict[str, Any]: Respond with ONLY the JSON object, no additional text. """ - + evaluation_response = await gemini_client.generate_content( - prompt=seo_prompt, - temperature=0.1 + prompt=seo_prompt, temperature=0.1 ) - + # Parse AI evaluation ai_scores = _parse_seo_evaluation(evaluation_response) - + except Exception as e: - logger.warning("AI SEO evaluation failed, using rule-based only", error=str(e)) - + logger.warning( + "AI SEO evaluation failed, using rule-based only", error=str(e) + ) + # Combine scores (prefer rule-based if AI fails) if ai_scores: final_scores = _combine_scores(ai_scores, rule_based_scores) else: final_scores = rule_based_scores - + final_score = final_scores.get("final_score", 0.0) - + logger.info( "SEO evaluation completed", keyword=keyword, final_score=final_score, - method="combined" if ai_scores else "rule_based" + method="combined" if ai_scores else "rule_based", ) - - return { - "seo_scores": final_scores, - "final_score": final_score - } - + + return {"seo_scores": final_scores, "final_score": final_score} + except Exception as e: - logger.error( - "SEO evaluation failed", - keyword=keyword, - error=str(e) - ) - + logger.error("SEO evaluation failed", keyword=keyword, error=str(e)) + # Fallback to basic rule evaluation basic_score = min(50.0 + (len(draft_blog) / 100), 80.0) - - return { - "seo_scores": {"final_score": basic_score}, - "final_score": basic_score - } + + return {"seo_scores": {"final_score": basic_score}, "final_score": basic_score} def _parse_seo_evaluation(response: str) -> Dict[str, Any]: @@ -429,30 +94,39 @@ def _parse_seo_evaluation(response: str) -> Dict[str, Any]: try: # Clean the response cleaned_response = response.strip() - + # Look for JSON content between ```json blocks - json_match = re.search(r'```json\s*(\{.*?\})\s*```', cleaned_response, re.DOTALL) + json_match = re.search( + r"```json\s*(\{.*?\})\s*```", cleaned_response, re.DOTALL + ) if json_match: json_content = json_match.group(1) else: # Look for direct JSON object - json_match = re.search(r'\{[^}]*"final_score"[^}]*\}', cleaned_response, re.DOTALL) + json_match = re.search( + r'\{[^}]*"final_score"[^}]*\}', cleaned_response, re.DOTALL + ) if json_match: json_content = json_match.group(0) else: # Try to parse the entire response as JSON json_content = cleaned_response - + # Parse JSON scores = json.loads(json_content) - + # Validate and sanitize scores required_fields = [ - "title_score", "meta_description_score", "keyword_optimization_score", - "content_structure_score", "readability_score", "content_quality_score", - "technical_seo_score", "final_score" + "title_score", + "meta_description_score", + "keyword_optimization_score", + "content_structure_score", + "readability_score", + "content_quality_score", + "technical_seo_score", + "final_score", ] - + for field in required_fields: if field not in scores: scores[field] = 0.0 @@ -462,9 +136,9 @@ def _parse_seo_evaluation(response: str) -> Dict[str, Any]: scores[field] = max(0.0, min(100.0, float(scores[field]))) except (ValueError, TypeError): scores[field] = 0.0 - + return scores - + except json.JSONDecodeError as e: logger.warning("Failed to parse JSON from SEO evaluation", error=str(e)) return {} @@ -476,49 +150,53 @@ def _parse_seo_evaluation(response: str) -> Dict[str, Any]: def _evaluate_with_rules(content: str, keyword: str) -> Dict[str, Any]: """Evaluate content using deterministic rules.""" scores = {} - + # Title evaluation - title_match = re.search(r'(.*?)', content, re.IGNORECASE) + title_match = re.search(r"(.*?)", content, re.IGNORECASE) if title_match: title = title_match.group(1) title_score = 0 - + if keyword.lower() in title.lower(): title_score += 40 if 30 <= len(title) <= 60: title_score += 30 if len(title) > 0: title_score += 30 - + scores["title_score"] = min(title_score, 100) else: scores["title_score"] = 0 - + # Meta description evaluation - meta_match = re.search(r' 0: meta_score += 20 - + scores["meta_description_score"] = min(meta_score, 100) else: scores["meta_description_score"] = 0 - + # Keyword density evaluation - content_text = re.sub(r'<[^>]+>', '', content) # Strip HTML + content_text = re.sub(r"<[^>]+>", "", content) # Strip HTML word_count = len(content_text.split()) - keyword_occurrences = len(re.findall(r'\b' + re.escape(keyword.lower()) + r'\b', content_text.lower())) - + keyword_occurrences = len( + re.findall(r"\b" + re.escape(keyword.lower()) + r"\b", content_text.lower()) + ) + if word_count > 0: keyword_density = (keyword_occurrences / word_count) * 100 - + if 1.0 <= keyword_density <= 2.5: scores["keyword_optimization_score"] = 100 elif 0.5 <= keyword_density < 1.0 or 2.5 < keyword_density <= 3.5: @@ -529,13 +207,13 @@ def _evaluate_with_rules(content: str, keyword: str) -> Dict[str, Any]: scores["keyword_optimization_score"] = 0 else: scores["keyword_optimization_score"] = 0 - + # Content structure evaluation - h1_count = len(re.findall(r']*>', content, re.IGNORECASE)) - h2_count = len(re.findall(r']*>', content, re.IGNORECASE)) - h3_count = len(re.findall(r']*>', content, re.IGNORECASE)) - p_count = len(re.findall(r']*>', content, re.IGNORECASE)) - + h1_count = len(re.findall(r"]*>", content, re.IGNORECASE)) + h2_count = len(re.findall(r"]*>", content, re.IGNORECASE)) + h3_count = len(re.findall(r"]*>", content, re.IGNORECASE)) + p_count = len(re.findall(r"]*>", content, re.IGNORECASE)) + structure_score = 0 if h1_count == 1: structure_score += 25 @@ -545,25 +223,25 @@ def _evaluate_with_rules(content: str, keyword: str) -> Dict[str, Any]: structure_score += 25 if p_count >= 5: structure_score += 25 - + scores["content_structure_score"] = structure_score - + # Content length evaluation - if word_count >= 1200: + if word_count >= 500: length_score = 100 - elif word_count >= 800: + elif word_count >= 400: length_score = 80 - elif word_count >= 500: + elif word_count >= 300: length_score = 60 else: length_score = 40 - + scores["content_quality_score"] = length_score - + # Readability (simplified - based on average sentence length) - sentences = re.split(r'[.!?]+', content_text) + sentences = re.split(r"[.!?]+", content_text) sentences = [s.strip() for s in sentences if s.strip()] - + if len(sentences) > 1: avg_sentence_length = word_count / len(sentences) if 15 <= avg_sentence_length <= 20: @@ -574,24 +252,24 @@ def _evaluate_with_rules(content: str, keyword: str) -> Dict[str, Any]: readability_score = 60 else: readability_score = 60 - + scores["readability_score"] = readability_score - + # Technical SEO (basic checks) tech_score = 0 - if '' in content and '' in content: + if "" in content and "" in content: tech_score += 20 if 'meta name="description"' in content: tech_score += 20 - if '= 1000: tech_score += 20 - + scores["technical_seo_score"] = tech_score - + # Calculate weighted final score weights = { "title_score": 0.15, @@ -600,32 +278,38 @@ def _evaluate_with_rules(content: str, keyword: str) -> Dict[str, Any]: "content_structure_score": 0.15, "readability_score": 0.15, "content_quality_score": 0.15, - "technical_seo_score": 0.10 + "technical_seo_score": 0.10, } - + final_score = sum(scores.get(key, 0) * weights[key] for key in weights.keys()) scores["final_score"] = round(final_score, 1) - + return scores -def _combine_scores(ai_scores: Dict[str, Any], rule_scores: Dict[str, Any]) -> Dict[str, Any]: +def _combine_scores( + ai_scores: Dict[str, Any], rule_scores: Dict[str, Any] +) -> Dict[str, Any]: """Combine AI and rule-based scores with weighted average.""" combined = {} ai_weight = 0.3 rule_weight = 0.7 - + score_keys = [ - "title_score", "meta_description_score", "keyword_optimization_score", - "content_structure_score", "readability_score", "content_quality_score", - "technical_seo_score" + "title_score", + "meta_description_score", + "keyword_optimization_score", + "content_structure_score", + "readability_score", + "content_quality_score", + "technical_seo_score", ] - + for key in score_keys: ai_score = ai_scores.get(key, 0) rule_score = rule_scores.get(key, 0) combined[key] = round(ai_score * ai_weight + rule_score * rule_weight, 1) - + # Calculate final score weights = { "title_score": 0.15, @@ -634,10 +318,10 @@ def _combine_scores(ai_scores: Dict[str, Any], rule_scores: Dict[str, Any]) -> D "content_structure_score": 0.15, "readability_score": 0.15, "content_quality_score": 0.15, - "technical_seo_score": 0.10 + "technical_seo_score": 0.10, } - + final_score = sum(combined[key] * weights[key] for key in weights.keys()) combined["final_score"] = round(final_score, 1) - - return combined \ No newline at end of file + + return combined diff --git a/agent_system/src/agents/nodes/generate_blog.py b/agent_system/src/agents/nodes/generate_blog.py index 67f3105..337518c 100644 --- a/agent_system/src/agents/nodes/generate_blog.py +++ b/agent_system/src/agents/nodes/generate_blog.py @@ -10,27 +10,27 @@ async def generate_blog(state: GraphState) -> Dict[str, Any]: """Generate blog content by synthesizing cleaned posts. - + Args: state: Current graph state containing cleaned_posts and keyword - + Returns: Updated state with draft_blog content """ cleaned_posts = state.cleaned_posts keyword = state.keyword attempts = state.attempts - + # Critical check: If no cleaned posts, generate fallback content if not cleaned_posts: logger.warning("No cleaned posts available for blog generation") - + # Generate basic blog content without source material try: gemini_client = await get_gemini_client() - + fallback_prompt = f""" - Write a comprehensive 1500-word blog post about: {keyword} + Write a comprehensive 500-word blog post about: {keyword} Since no source material is available, create original content that covers: - Introduction to the topic @@ -50,37 +50,34 @@ async def generate_blog(state: GraphState) -> Dict[str, Any]: Format as HTML with proper tags. """ - + draft_blog = await gemini_client.generate_content( prompt=fallback_prompt, use_search=False, # Don't use search for fallback temperature=0.7, - max_output_tokens=4000 + max_output_tokens=4000, ) - + if draft_blog and len(draft_blog.strip()) >= 500: logger.info( "Fallback blog generation completed", keyword=keyword, content_length=len(draft_blog), - attempts=attempts + 1 + attempts=attempts + 1, ) - - return { - "draft_blog": draft_blog, - "attempts": attempts + 1 - } + + return {"draft_blog": draft_blog, "attempts": attempts + 1} else: raise ValueError("Generated fallback content is too short") - + except Exception as e: logger.error( "Fallback blog generation failed", keyword=keyword, error=str(e), - attempts=attempts + 1 + attempts=attempts + 1, ) - + # Return minimal fallback content to prevent infinite loops minimal_content = f""" {keyword.title()} - Complete Guide @@ -99,77 +96,67 @@ async def generate_blog(state: GraphState) -> Dict[str, Any]:

Conclusion

This guide provides a foundation for understanding {keyword}. For more detailed information, consider exploring additional resources and documentation.

""" - - return { - "draft_blog": minimal_content, - "attempts": attempts + 1 - } - + + return {"draft_blog": minimal_content, "attempts": attempts + 1} + logger.info( - "Starting blog generation", + "Starting blog generation", keyword=keyword, source_posts=len(cleaned_posts), - attempts=attempts + 1 + attempts=attempts + 1, ) - + try: # Prepare reference posts summary reference_posts = _prepare_reference_posts(cleaned_posts) - + # Load blog generation prompt template with open("src/agents/prompts/blog_gen_prompt.txt", "r") as f: blog_prompt_template = f.read() - + # Format prompt with data blog_prompt = blog_prompt_template.format( - keyword=keyword, - reference_posts=reference_posts + keyword=keyword, reference_posts=reference_posts ) - + # Generate content using Gemini gemini_client = await get_gemini_client() - + draft_blog = await gemini_client.generate_content( prompt=blog_prompt, use_search=False, # Don't use search for content generation temperature=0.7, - max_output_tokens=4000 + max_output_tokens=4000, ) - + if not draft_blog or len(draft_blog.strip()) < 500: raise ValueError("Generated blog content is too short or empty") - + logger.info( "Blog generation completed successfully", keyword=keyword, content_length=len(draft_blog), - attempts=attempts + 1 + attempts=attempts + 1, ) - - return { - "draft_blog": draft_blog, - "attempts": attempts + 1 - } - + + return {"draft_blog": draft_blog, "attempts": attempts + 1} + except Exception as e: logger.error( "Blog generation failed", keyword=keyword, error=str(e), - attempts=attempts + 1 + attempts=attempts + 1, ) - + # Return empty content to trigger failure handling - return { - "draft_blog": "", - "attempts": attempts + 1 - } + return {"draft_blog": "", "attempts": attempts + 1} def _prepare_reference_posts(cleaned_posts: list[Dict[str, Any]]) -> str: """Prepare reference posts summary for prompt.""" reference_sections = [] - + for i, post in enumerate(cleaned_posts[:8], 1): # Limit to top 8 posts # Create a summary of each post title = post.get("title", "Untitled") @@ -177,15 +164,15 @@ def _prepare_reference_posts(cleaned_posts: list[Dict[str, Any]]) -> str: headings = post.get("headings", []) paragraphs = post.get("paragraphs", []) word_count = post.get("word_count", 0) - + # Take first 3 paragraphs as summary summary_paragraphs = paragraphs[:3] summary_text = " ".join(summary_paragraphs) - + # Truncate if too long if len(summary_text) > 800: summary_text = summary_text[:800] + "..." - + # Format reference post reference_post = f""" POST {i}: {title} @@ -196,5 +183,5 @@ def _prepare_reference_posts(cleaned_posts: list[Dict[str, Any]]) -> str: --- """ reference_sections.append(reference_post) - - return "\n".join(reference_sections) \ No newline at end of file + + return "\n".join(reference_sections) diff --git a/agent_system/src/agents/nodes/react_agent.py b/agent_system/src/agents/nodes/react_agent.py index 898f2eb..495691d 100644 --- a/agent_system/src/agents/nodes/react_agent.py +++ b/agent_system/src/agents/nodes/react_agent.py @@ -1,6 +1,8 @@ -"""React agent node for decision making - Fixed END handling.""" +"""React agent node for decision making – LangGraph 0.5.x compatible.""" from typing import Dict, Any, Literal +from langgraph.constants import END # canonical END literal + from src.schemas.state import GraphState from src.utils.logger import get_logger @@ -9,13 +11,20 @@ DecisionType = Literal["ACCEPT", "REVISE", "FAIL"] +# ------------------------------------------------------------------ +# 1. Core decision node (used if you *call* the agent as a node) +# ------------------------------------------------------------------ async def react_agent(state: GraphState) -> DecisionType: - """Make decision on whether to accept, revise, or fail the blog generation.""" + """ + Decide whether to ACCEPT, REVISE, or fail the current blog attempt. + This function is **not** used by the conditional edge logic below; + it is kept for backward compatibility / unit tests. + """ final_score = state.final_score attempts = state.attempts max_attempts = state.max_attempts seo_threshold = state.seo_threshold - + logger.info( "React agent making decision", final_score=final_score, @@ -23,46 +32,48 @@ async def react_agent(state: GraphState) -> DecisionType: max_attempts=max_attempts, threshold=seo_threshold, has_content=bool(state.draft_blog.strip()), - has_cleaned_posts=len(state.cleaned_posts) > 0 + has_cleaned_posts=len(state.cleaned_posts) > 0, ) - - # Check for failure conditions first + + # 1) Hard failure rules if attempts >= max_attempts: logger.warning("FAIL: Maximum attempts reached") return "FAIL" - - # Check if we have no content and no source material - if not state.draft_blog.strip() and len(state.cleaned_posts) == 0: - logger.warning("FAIL: No content generated and no source material") + + if not state.draft_blog.strip() and not state.cleaned_posts: + logger.warning("FAIL: No content and no source material") return "FAIL" - - # Check for acceptance + + # 2) Accept rules if final_score >= seo_threshold and state.draft_blog.strip(): - logger.info("ACCEPT: Score meets threshold and content exists") + logger.info("ACCEPT: Score meets threshold") return "ACCEPT" - - # Accept if we have reasonable content even with lower score + if state.draft_blog.strip() and len(state.draft_blog) > 500 and attempts >= 2: - logger.info("ACCEPT: Reasonable content available after multiple attempts") + logger.info("ACCEPT: Reasonable content after ≥2 attempts") return "ACCEPT" - - # Retry if conditions allow - if attempts < max_attempts: - logger.info("REVISE: Retrying generation") - return "REVISE" - - # Default to FAIL - logger.warning("FAIL: No valid conditions met") - return "FAIL" - - -def decide_next_action(state: GraphState) -> str: - """Determine next action based on react agent decision.""" + + # 3) Otherwise retry + logger.info("REVISE: Retrying generation") + return "REVISE" + + +# ------------------------------------------------------------------ +# 2. Conditional-edge router used in graph.py +# Must return strings understood by LangGraph 0.5.x +# ------------------------------------------------------------------ +def decide_next_action(state: GraphState): + """ + Router used by the conditional edge coming out of the 'evaluate' node. + Returns: + "generate" – loop back to generate node + END – finish the workflow (LangGraph constant) + """ final_score = state.final_score attempts = state.attempts max_attempts = state.max_attempts seo_threshold = state.seo_threshold - + logger.info( "Deciding next action", final_score=final_score, @@ -70,44 +81,29 @@ def decide_next_action(state: GraphState) -> str: max_attempts=max_attempts, threshold=seo_threshold, has_content=bool(state.draft_blog.strip()), - has_cleaned_posts=len(state.cleaned_posts) > 0 + has_cleaned_posts=len(state.cleaned_posts) > 0, ) - - # CRITICAL: Always check for termination conditions first - - # Max attempts reached - terminate + + # --- Termination conditions ------------------------------------------------- if attempts >= max_attempts: - logger.info("Terminating: Maximum attempts reached") - if state.draft_blog.strip(): - state.final_blog = state.draft_blog - return "__end__" # Fix: Use "__end__" instead of END - - # No source material and no content - terminate - if len(state.cleaned_posts) == 0 and not state.draft_blog.strip(): - logger.info("Terminating: No source material and no content") - return "__end__" - - # Good content achieved - terminate with success + logger.info("Terminating: max attempts reached") + # state.final_blog = state.draft_blog + return END + + if not state.cleaned_posts and not state.draft_blog.strip(): + logger.info("Terminating: no material & no content") + return END + if final_score >= seo_threshold and state.draft_blog.strip(): - logger.info("Terminating: Target score achieved") - state.final_blog = state.draft_blog - return "__end__" - - # Reasonable content after multiple attempts - accept - if (state.draft_blog.strip() and - len(state.draft_blog) > 500 and - attempts >= 2): - logger.info("Terminating: Accepting reasonable content") - state.final_blog = state.draft_blog - return "__end__" - - # Continue only if we have attempts left and a reason to continue - if attempts < max_attempts and (len(state.cleaned_posts) > 0 or not state.draft_blog.strip()): - logger.info("Continuing: Retrying generation") - return "generate" - - # Default termination - logger.info("Terminating: Default case") - if state.draft_blog.strip(): - state.final_blog = state.draft_blog - return "__end__" \ No newline at end of file + logger.info("Terminating: target score achieved") + # state.final_blog = state.draft_blog + return END + + if state.draft_blog.strip() and len(state.draft_blog) > 500 and attempts >= 2: + logger.info("Terminating: reasonable content accepted") + # state.final_blog = state.draft_blog + return END + + # --- Continue --------------------------------------------------------------- + logger.info("Continuing: retrying generation") + return "generate" diff --git a/agent_system/src/agents/nodes/scrape_posts.py b/agent_system/src/agents/nodes/scrape_posts.py index 16a4d53..f20043a 100644 --- a/agent_system/src/agents/nodes/scrape_posts.py +++ b/agent_system/src/agents/nodes/scrape_posts.py @@ -1,63 +1,33 @@ -"""Scrape posts node implementation - Fixed version.""" - -from typing import Dict, Any, List +from typing import Dict, Any from src.schemas.state import GraphState -from src.tools.scraper import create_web_scraper, ScrapeError +from src.tools.scraper import create_scraper from src.utils.logger import get_logger logger = get_logger(__name__) async def scrape_posts(state: GraphState) -> Dict[str, Any]: - """Scrape content from the top posts URLs. - - Args: - state: Current graph state containing top_posts - - Returns: - Updated state with raw_html_content - """ - top_posts = state.top_posts - - if not top_posts: - logger.warning("No top posts to scrape") - return {"raw_html_content": {}} - - # Extract URLs from top posts - urls = [post.get("url", "") for post in top_posts if post.get("url")] - + """Scrape content from the top posts URLs.""" + top_posts = state.top_posts or [] + urls = [p["url"] for p in top_posts if p.get("url")] if not urls: - logger.warning("No valid URLs found in top posts") + logger.warning("No URLs to scrape") return {"raw_html_content": {}} - + logger.info("Starting to scrape posts", url_count=len(urls)) - + scraper = create_scraper() + try: - # Create web scraper instance - scraper = create_web_scraper() - - # Fix: Properly await the async method - raw_html_content = await scraper.scrape_multiple_urls(urls) - - # Filter out failed scrapes - successful_scrapes = { - url: html for url, html in raw_html_content.items() - if html is not None - } - + raw_html = await scraper.scrape_multiple(urls) + successful = {u: h for u, h in raw_html.items() if h} logger.info( "Scraping completed", - total_urls=len(urls), - successful=len(successful_scrapes), - failed=len(urls) - len(successful_scrapes) + total=len(urls), + successful=len(successful), + failed=len(urls) - len(successful), ) - - return {"raw_html_content": successful_scrapes} - - except ScrapeError as e: + return {"raw_html_content": successful} + + except Exception as e: logger.error("Scraping failed", error=str(e)) - # Return empty dict to continue pipeline return {"raw_html_content": {}} - except Exception as e: - logger.error("Unexpected error during scraping", error=str(e)) - return {"raw_html_content": {}} \ No newline at end of file diff --git a/agent_system/src/agents/nodes/search_top_posts.py b/agent_system/src/agents/nodes/search_top_posts.py index b719969..b395e03 100644 --- a/agent_system/src/agents/nodes/search_top_posts.py +++ b/agent_system/src/agents/nodes/search_top_posts.py @@ -1,200 +1,4 @@ -# """Updated search implementation with proper async handling.""" - -# import asyncio -# from typing import Dict, Any, List -# from src.schemas.state import GraphState -# from src.tools.gemini_client import get_gemini_client -# from src.tools.search_client import create_search_client, SearchError -# from src.utils.logger import get_logger - -# logger = get_logger(__name__) - - -# async def search_top_posts(state: GraphState) -> Dict[str, Any]: -# """Search for top blog posts related to the keyword. - -# Args: -# state: Current graph state containing keyword - -# Returns: -# Updated state with top_posts populated -# """ -# keyword = state.keyword -# logger.info("Starting search for top posts", keyword=keyword) - -# # Method 1: Try Gemini with search grounding (Primary) -# try: -# gemini_client = await get_gemini_client() - -# search_prompt = f""" -# Find the top 10 most comprehensive and authoritative blog posts, tutorials, and guides about: {keyword} - -# Focus on: -# - High-quality, well-structured content -# - Recent publications (preferably last 2 years) -# - Authoritative sources and established blogs -# - Content that provides practical value and actionable insights -# - Posts with good SEO and readability - -# For each result, I need: -# - URL -# - Title -# - Brief description/snippet - -# Return the results in a structured format. -# """ - -# logger.info("Attempting Gemini search with grounding") -# search_response = await gemini_client.generate_content( -# prompt=search_prompt, -# use_search=True, -# temperature=0.3 -# ) - -# # Parse Gemini search response -# top_posts = _parse_gemini_search_response(search_response, keyword) - -# if len(top_posts) >= 5: -# logger.info( -# "Successfully found posts via Gemini search grounding", -# keyword=keyword, -# count=len(top_posts) -# ) -# return {"top_posts": top_posts} -# else: -# logger.warning("Gemini search returned insufficient results, trying fallback") - -# except Exception as e: -# logger.warning( -# "Gemini search grounding failed, trying fallback", -# keyword=keyword, -# error=str(e) -# ) - -# # Method 2: Fallback to Google Custom Search API -# try: -# search_client = create_search_client() - -# if search_client is None: -# logger.warning("Custom Search client not available, using mock results") -# return {"top_posts": _generate_mock_results(keyword)} - -# logger.info("Attempting Custom Search API") -# # Fix: Properly await the async method -# top_posts = await search_client.search_top_posts(keyword, num_results=10) - -# if top_posts: -# logger.info( -# "Successfully found posts via Custom Search API", -# keyword=keyword, -# count=len(top_posts) -# ) -# return {"top_posts": top_posts} - -# except SearchError as e: -# logger.warning("Custom Search API failed", keyword=keyword, error=str(e)) -# except Exception as e: -# logger.error("Unexpected error in Custom Search", keyword=keyword, error=str(e)) - -# # Method 3: Generate mock results for testing/development -# logger.warning("All search methods failed, generating mock results for testing") -# mock_posts = _generate_mock_results(keyword) - -# return {"top_posts": mock_posts} - - -# def _parse_gemini_search_response(response: str, keyword: str) -> List[Dict[str, Any]]: -# """Parse Gemini search response to extract structured data.""" -# posts = [] - -# try: -# import re - -# # Look for URL patterns -# url_pattern = r'https?://[^\s<>"\']+[^\s<>"\'.,)]' -# urls = re.findall(url_pattern, response) - -# # Split response into sections -# lines = response.split('\n') -# current_post = {} - -# for line in lines: -# line = line.strip() -# if not line: -# if current_post and 'url' in current_post: -# posts.append(current_post) -# current_post = {} -# continue - -# # Look for URLs -# if any(url in line for url in urls): -# url_match = re.search(url_pattern, line) -# if url_match: -# current_post['url'] = url_match.group(0) - -# # Look for titles (various patterns) -# elif any(marker in line.lower() for marker in ['title:', 'post:', '**', '##']): -# title = re.sub(r'^[#*\-\d\.\s]*', '', line) -# title = re.sub(r'[*#]*$', '', title).strip() -# if title and len(title) > 10: -# current_post['title'] = title - -# # Look for descriptions -# elif len(line) > 30 and not line.startswith(('http', 'www')): -# if 'snippet' not in current_post: -# current_post['snippet'] = line - -# # Add the last post -# if current_post and 'url' in current_post: -# posts.append(current_post) - -# # Fill in missing fields and validate -# validated_posts = [] -# for i, post in enumerate(posts[:10]): -# validated_post = { -# 'url': post.get('url', f'https://example.com/{keyword.replace(" ", "-")}-{i+1}'), -# 'title': post.get('title', f'{keyword.title()} Guide #{i+1}'), -# 'snippet': post.get('snippet', f'Comprehensive guide about {keyword}'), -# 'meta_description': post.get('meta_description', '') -# } -# validated_posts.append(validated_post) - -# return validated_posts - -# except Exception as e: -# logger.warning("Failed to parse Gemini search response", error=str(e)) -# return [] - - -# def _generate_mock_results(keyword: str) -> List[Dict[str, Any]]: -# """Generate mock search results for testing/development.""" -# logger.info("Generating mock search results", keyword=keyword) - -# mock_domains = [ -# "medium.com", "dev.to", "realpython.com", "towardsdatascience.com", -# "freecodecamp.org", "digitalocean.com", "hackernoon.com", "auth0.com", -# "blog.miguelgrinberg.com", "testdriven.io" -# ] - -# mock_posts = [] -# keyword_clean = keyword.replace(" ", "-").lower() - -# for i, domain in enumerate(mock_domains): -# post = { -# "url": f"https://{domain}/{keyword_clean}-tutorial-{i+1}", -# "title": f"Complete {keyword.title()} Tutorial - Part {i+1}", -# "snippet": f"Learn {keyword} with this comprehensive guide. " -# f"Covers everything from basics to advanced concepts.", -# "meta_description": f"A complete guide to {keyword} for developers" -# } -# mock_posts.append(post) - -# return mock_posts - - - -# src/agents/nodes/search_top_posts.py - +import re import asyncio import json from typing import Any, Dict, List @@ -224,31 +28,55 @@ Return only valid JSON. """.strip() + async def search_top_posts(state: GraphState) -> Dict[str, Any]: keyword = state.keyword logger.info("Starting search for top posts", keyword=keyword) + # url sanitization helper + def _sanitize_url(url: str) -> str: + """Strips wrapper if present.""" + match = re.search(r'https?://[^\s<>"\']+', url) + return match.group(0) if match else url + # 1️⃣ Try grounding through Gemini try: client = await get_gemini_client() prompt = GPT_JSON_PROMPT.format(keyword=keyword) logger.info("Gemini grounding with JSON prompt", prompt=prompt[:60] + "…") - - raw = await client.generate_content( - prompt=prompt, - temperature=0.3 - ) - + + raw = await client.generate_content(prompt=prompt, temperature=0.3) + + print(f"Raw Gemini response: {raw[:1000]}...") # Debug print + + text = raw.strip() + # Parse JSON safely - top_posts = json.loads(raw) + try: + top_posts = json.loads(text) + + except json.JSONDecodeError: + # try to extract a JSON array or object via regex + match = re.search(r"(\[.*\]|\{.*\})", text, re.DOTALL) + if match: + try: + top_posts = json.loads(match.group(1)) + except json.JSONDecodeError: + top_posts = [] + else: + top_posts = [] + if isinstance(top_posts, list) and len(top_posts) >= 5: - logger.info("Gemini returned valid JSON results", count=len(top_posts)) + for p in top_posts: + p["url"] = _sanitize_url(p["url"]) + return {"top_posts": top_posts} - else: - logger.warning("Gemini JSON is invalid or too small", payload=raw) except json.JSONDecodeError as je: logger.warning("Failed to parse Gemini JSON", error=str(je)) + + # fixing json decoding error + except Exception as e: logger.error("Gemini grounding error", error=str(e), exc_info=True) @@ -256,7 +84,7 @@ async def search_top_posts(state: GraphState) -> Dict[str, Any]: try: search_client = create_search_client() logger.info("Falling back to Custom Search API", keyword=keyword) - + posts = await search_client.search_top_posts(keyword, num_results=10) if posts: logger.info("Custom Search returned results", count=len(posts)) @@ -266,19 +94,28 @@ async def search_top_posts(state: GraphState) -> Dict[str, Any]: except Exception as e: logger.error("Unexpected Custom Search error", error=str(e), exc_info=True) - # 3️⃣ Last resort: mock data - logger.warning("Using mock results for testing", keyword=keyword) - return {"top_posts": _generate_mock_results(keyword)} + # # 3️⃣ Last resort: mock data + # logger.warning("Using mock results for testing", keyword=keyword) + # return {"top_posts": _generate_mock_results(keyword)} + # 3️⃣ No results found - return safe response that signals workflow to end + logger.error("All search methods failed - no results found", keyword=keyword) + return { + "top_posts": [], + "search_failed": True, + "error_message": f"No blog posts found for keyword: '{keyword}'", + "workflow_status": "search_failed", + } -def _generate_mock_results(keyword: str) -> List[Dict[str, Any]]: - """Keep your existing mock helper or inject via config.""" - # … same as before … - return [ - { - "url": f"https://example.com/{keyword.replace(' ', '-')}-{i+1}", - "title": f"{keyword.title()} Guide #{i+1}", - "snippet": f"Comprehensive guide about {keyword}" - } - for i in range(10) - ] + +# def _generate_mock_results(keyword: str) -> List[Dict[str, Any]]: +# """Keep your existing mock helper or inject via config.""" +# # … same as before … +# return [ +# { +# "url": f"https://example.com/{keyword.replace(' ', '-')}-{i+1}", +# "title": f"{keyword.title()} Guide #{i+1}", +# "snippet": f"Comprehensive guide about {keyword}" +# } +# for i in range(10) +# ] diff --git a/agent_system/src/agents/prompts/__init__.py b/agent_system/src/agents/prompts/__init__.py index 515c471..20aac9a 100644 --- a/agent_system/src/agents/prompts/__init__.py +++ b/agent_system/src/agents/prompts/__init__.py @@ -1 +1 @@ -"""Prompt templates for LangGraph nodes.""" \ No newline at end of file +"""Prompt templates for LangGraph nodes.""" diff --git a/agent_system/src/agents/prompts/blog_gen_prompt.txt b/agent_system/src/agents/prompts/blog_gen_prompt.txt index c066eb4..8ef55ae 100644 --- a/agent_system/src/agents/prompts/blog_gen_prompt.txt +++ b/agent_system/src/agents/prompts/blog_gen_prompt.txt @@ -6,7 +6,7 @@ REFERENCE POSTS: {reference_posts} INSTRUCTIONS: -1. Create a comprehensive 1500-word blog post that covers the topic thoroughly +1. Create a comprehensive 500-word blog post that covers the topic thoroughly 2. Structure the content with clear headings (H1, H2, H3) 3. Include an engaging introduction that hooks the reader 4. Provide actionable insights and practical tips diff --git a/agent_system/src/agents/prompts/search_prompt.txt b/agent_system/src/agents/prompts/search_prompt.txt index 41168a7..8da283a 100644 --- a/agent_system/src/agents/prompts/search_prompt.txt +++ b/agent_system/src/agents/prompts/search_prompt.txt @@ -4,7 +4,7 @@ Keyword: {keyword} Please search for comprehensive, high-quality blog posts, tutorials, and guides that cover this topic thoroughly. Focus on: - Well-structured articles with good SEO -- Recent content (preferably from the last 2 years) +- Recent content (preferably from the last 1 years) - Authoritative sources and established blogs - Content that provides practical value and insights diff --git a/agent_system/src/api/__init__.py b/agent_system/src/api/__init__.py index eda19f4..799bb31 100644 --- a/agent_system/src/api/__init__.py +++ b/agent_system/src/api/__init__.py @@ -1 +1 @@ -"""FastAPI application components.""" \ No newline at end of file +"""FastAPI application components.""" diff --git a/agent_system/src/api/app.py b/agent_system/src/api/app.py index 8d3e4ad..3526292 100644 --- a/agent_system/src/api/app.py +++ b/agent_system/src/api/app.py @@ -29,14 +29,14 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Application lifespan manager for startup and shutdown events.""" # Startup logger.info("Starting Gemini Blog Agent service") - + # Set uvloop as the event loop policy for better performance - if os.name != 'nt': # Not Windows + if os.name != "nt": # Not Windows try: asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) except ImportError: logger.warning("uvloop not available, using default event loop") - + # Initialize LangSmith client if API key is provided langsmith_api_key = os.getenv("LANGSMITH_API_KEY") if langsmith_api_key: @@ -48,27 +48,28 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: logger.warning("Failed to initialize LangSmith client", error=str(e)) else: logger.info("LangSmith API key not provided, skipping initialization") - + # Pre-compile the blog generation graph try: from src.agents.graph import get_blog_generation_graph + await get_blog_generation_graph() logger.info("Blog generation graph pre-compiled successfully") except Exception as e: logger.error("Failed to pre-compile blog generation graph", error=str(e)) - + logger.info("Service startup completed") - + yield - + # Shutdown logger.info("Shutting down Gemini Blog Agent service") - + # Cleanup resources if needed - if hasattr(app.state, 'langsmith_client'): + if hasattr(app.state, "langsmith_client"): # Close LangSmith client if it has cleanup methods pass - + logger.info("Service shutdown completed") @@ -82,9 +83,9 @@ def create_app() -> FastAPI: docs_url="/docs", redoc_url="/redoc", openapi_url="/openapi.json", - lifespan=lifespan + lifespan=lifespan, ) - + # Add CORS middleware app.add_middleware( CORSMiddleware, @@ -93,28 +94,22 @@ def create_app() -> FastAPI: allow_methods=["*"], allow_headers=["*"], ) - + # Add trusted host middleware for security - Fix: Allow test hosts environment = os.getenv("ENVIRONMENT", "development") if environment == "production": trusted_hosts = os.getenv("TRUSTED_HOSTS", "localhost,127.0.0.1").split(",") - app.add_middleware( - TrustedHostMiddleware, - allowed_hosts=trusted_hosts - ) + app.add_middleware(TrustedHostMiddleware, allowed_hosts=trusted_hosts) else: # In development/test, allow all hosts - app.add_middleware( - TrustedHostMiddleware, - allowed_hosts=["*"] - ) - + app.add_middleware(TrustedHostMiddleware, allowed_hosts=["*"]) + # Add request logging middleware @app.middleware("http") async def log_requests(request: Request, call_next): """Log all HTTP requests.""" start_time = asyncio.get_event_loop().time() - + # Log request start logger.info( "HTTP request started", @@ -122,15 +117,15 @@ async def log_requests(request: Request, call_next): url=str(request.url), client_ip=request.client.host if request.client else "unknown", user_agent=request.headers.get("user-agent", ""), - user="4darsh-Dev" + user="4darsh-Dev", ) - + try: response = await call_next(request) - + # Calculate request duration duration = asyncio.get_event_loop().time() - start_time - + # Log request completion logger.info( "HTTP request completed", @@ -138,15 +133,15 @@ async def log_requests(request: Request, call_next): url=str(request.url), status_code=response.status_code, duration_ms=round(duration * 1000, 2), - user="4darsh-Dev" + user="4darsh-Dev", ) - + return response - + except Exception as e: # Calculate request duration for failed requests duration = asyncio.get_event_loop().time() - start_time - + # Log request failure logger.error( "HTTP request failed", @@ -154,14 +149,16 @@ async def log_requests(request: Request, call_next): url=str(request.url), duration_ms=round(duration * 1000, 2), error=str(e), - user="4darsh-Dev" + user="4darsh-Dev", ) - + raise - + # Global exception handler @app.exception_handler(Exception) - async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse: + async def global_exception_handler( + request: Request, exc: Exception + ) -> JSONResponse: """Global exception handler for unhandled errors.""" logger.error( "Unhandled exception occurred", @@ -169,27 +166,29 @@ async def global_exception_handler(request: Request, exc: Exception) -> JSONResp url=str(request.url), error=str(exc), error_type=type(exc).__name__, - user="4darsh-Dev" + user="4darsh-Dev", ) - + # Don't expose internal error details in production if os.getenv("ENVIRONMENT", "development") == "production": detail = "Internal server error" else: detail = str(exc) - + return JSONResponse( status_code=500, content={ "detail": detail, "type": "internal_server_error", - "timestamp": "2025-07-19T20:32:49Z" - } + "timestamp": "2025-07-19T20:32:49Z", + }, ) - + # HTTP exception handler @app.exception_handler(HTTPException) - async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: + async def http_exception_handler( + request: Request, exc: HTTPException + ) -> JSONResponse: """Handle HTTP exceptions with proper logging.""" logger.warning( "HTTP exception occurred", @@ -197,21 +196,21 @@ async def http_exception_handler(request: Request, exc: HTTPException) -> JSONRe url=str(request.url), status_code=exc.status_code, detail=exc.detail, - user="4darsh-Dev" + user="4darsh-Dev", ) - + return JSONResponse( status_code=exc.status_code, content={ "detail": exc.detail, "type": "http_exception", - "timestamp": "2025-07-19T20:32:49Z" - } + "timestamp": "2025-07-19T20:32:49Z", + }, ) - + # Include routers app.include_router(blog_router) - + # Add root endpoint @app.get("/", summary="Root endpoint") async def root(): @@ -223,13 +222,13 @@ async def root(): "docs": "/docs", "health": "/api/v1/health", "timestamp": "2025-07-19T20:32:49Z", - "developer": "4darsh-Dev" + "developer": "4darsh-Dev", } - + logger.info("FastAPI application created successfully") - + return app # Application instance for direct import -app = create_app() \ No newline at end of file +app = create_app() diff --git a/agent_system/src/api/routes/__init__.py b/agent_system/src/api/routes/__init__.py index 8391848..21465b8 100644 --- a/agent_system/src/api/routes/__init__.py +++ b/agent_system/src/api/routes/__init__.py @@ -1 +1 @@ -"Blog routes initialization script for the Gemini Blog Agent.""" \ No newline at end of file +"Blog routes initialization script for the Gemini Blog Agent." "" diff --git a/agent_system/src/api/routes/blog.py b/agent_system/src/api/routes/blog.py index 2778d26..ad0f30c 100644 --- a/agent_system/src/api/routes/blog.py +++ b/agent_system/src/api/routes/blog.py @@ -8,7 +8,7 @@ from src.schemas.blog import ( BlogGenerationRequest, BlogGenerationResponse, - HealthResponse + HealthResponse, ) from src.agents.graph import get_blog_generation_graph from src.utils.logger import get_logger @@ -23,28 +23,28 @@ "/generate-blog", response_model=BlogGenerationResponse, summary="Generate SEO-optimized blog content", - description="Generate a comprehensive blog post using AI agents with Gemini and search integration" + description="Generate a comprehensive blog post using AI agents with Gemini and search integration", ) async def generate_blog(request: BlogGenerationRequest) -> BlogGenerationResponse: """Generate SEO-optimized blog content for a given keyword. - + This endpoint triggers a LangGraph workflow that: 1. Searches for top blog posts on the topic 2. Scrapes and analyzes content from those posts 3. Generates original, SEO-optimized content 4. Evaluates and iteratively improves the content - + Args: request: Blog generation request with keyword and parameters - + Returns: BlogGenerationResponse with generated content and metrics - + Raises: HTTPException: If generation fails or validation errors occur """ run_id = str(uuid.uuid4()) - + logger.info( "Blog generation request received", run_id=run_id, @@ -52,37 +52,34 @@ async def generate_blog(request: BlogGenerationRequest) -> BlogGenerationRespons max_attempts=request.max_attempts, seo_threshold=request.seo_threshold, user="4darsh-Dev", - timestamp=datetime.utcnow().isoformat() + timestamp=datetime.utcnow().isoformat(), ) - + try: # Validate keyword if not request.keyword.strip(): - raise HTTPException( - status_code=422, - detail="Keyword cannot be empty" - ) - + raise HTTPException(status_code=422, detail="Keyword cannot be empty") + # Get blog generation graph blog_graph = await get_blog_generation_graph() - + # Execute workflow result = await blog_graph.run_blog_generation( keyword=request.keyword.strip(), max_attempts=request.max_attempts or 3, seo_threshold=request.seo_threshold or 75.0, - thread_id=run_id + thread_id=run_id, ) - + # Check if generation was successful if not result["success"]: logger.warning( "Blog generation failed to meet requirements", run_id=run_id, final_score=result["final_score"], - attempts=result["attempts"] + attempts=result["attempts"], ) - + # Still return the best attempt if we have content if result["final_blog"]: logger.info("Returning best attempt despite low score", run_id=run_id) @@ -90,28 +87,28 @@ async def generate_blog(request: BlogGenerationRequest) -> BlogGenerationRespons raise HTTPException( status_code=500, detail=f"Failed to generate satisfactory content after {result['attempts']} attempts. " - f"Best score achieved: {result['final_score']}" + f"Best score achieved: {result['final_score']}", ) - + response = BlogGenerationResponse( run_id=run_id, final_blog=result["final_blog"], seo_scores=result["seo_scores"], attempts=result["attempts"], - success=result["success"] + success=result["success"], ) - + logger.info( "Blog generation completed successfully", run_id=run_id, keyword=request.keyword, final_score=result["final_score"], attempts=result["attempts"], - content_length=len(result["final_blog"]) + content_length=len(result["final_blog"]), ) - + return response - + except HTTPException: # Re-raise HTTP exceptions raise @@ -121,12 +118,12 @@ async def generate_blog(request: BlogGenerationRequest) -> BlogGenerationRespons run_id=run_id, keyword=request.keyword, error=str(e), - error_type=type(e).__name__ + error_type=type(e).__name__, ) - + raise HTTPException( status_code=500, - detail=f"Internal server error during blog generation: {str(e)}" + detail=f"Internal server error during blog generation: {str(e)}", ) @@ -134,11 +131,11 @@ async def generate_blog(request: BlogGenerationRequest) -> BlogGenerationRespons "/health", response_model=HealthResponse, summary="Health check endpoint", - description="Check the health status of the blog generation service" + description="Check the health status of the blog generation service", ) async def health_check() -> HealthResponse: """Health check endpoint for monitoring and load balancers. - + Returns: HealthResponse with service status and metadata """ @@ -148,36 +145,31 @@ async def health_check() -> HealthResponse: # - External API availability # - Memory usage # - Disk space - + return HealthResponse( - status="healthy", - timestamp=datetime.utcnow().isoformat(), - version="0.1.0" + status="healthy", timestamp=datetime.utcnow().isoformat(), version="0.1.0" ) - + except Exception as e: logger.error("Health check failed", error=str(e)) - raise HTTPException( - status_code=500, - detail=f"Health check failed: {str(e)}" - ) + raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}") @router.get( "/metrics", summary="Service metrics endpoint", - description="Get service metrics and statistics" + description="Get service metrics and statistics", ) async def get_metrics() -> Dict[str, Any]: """Get service metrics for monitoring. - + Returns: Dictionary with service metrics """ try: # In a production environment, you would collect real metrics # from your monitoring system (Prometheus, etc.) - + metrics = { "service": "gemini-blog-agent", "version": "0.1.0", @@ -186,14 +178,13 @@ async def get_metrics() -> Dict[str, Any]: "successful_generations": 0, # Implement success counter "failed_generations": 0, # Implement failure counter "average_generation_time": 0.0, # Implement timing metrics - "timestamp": datetime.utcnow().isoformat() + "timestamp": datetime.utcnow().isoformat(), } - + return metrics - + except Exception as e: logger.error("Failed to retrieve metrics", error=str(e)) raise HTTPException( - status_code=500, - detail=f"Failed to retrieve metrics: {str(e)}" - ) \ No newline at end of file + status_code=500, detail=f"Failed to retrieve metrics: {str(e)}" + ) diff --git a/agent_system/src/config/__init__.py b/agent_system/src/config/__init__.py index d7d8779..d932da7 100644 --- a/agent_system/src/config/__init__.py +++ b/agent_system/src/config/__init__.py @@ -1 +1 @@ -"Configuration for Gemini client and environment variable loading." \ No newline at end of file +"Configuration for Gemini client and environment variable loading." diff --git a/agent_system/src/config/settings.py b/agent_system/src/config/settings.py index 59e0202..f8b6a93 100644 --- a/agent_system/src/config/settings.py +++ b/agent_system/src/config/settings.py @@ -4,23 +4,28 @@ from pathlib import Path from dotenv import load_dotenv + def load_environment(): """Load environment variables from .env file.""" # Try multiple locations for .env file possible_paths = [ - Path(__file__).parent.parent.parent.parent.parent.parent / ".env", # Project root + Path(__file__).parent.parent.parent.parent.parent.parent + / ".env", # Project root Path(__file__).parent.parent.parent.parent / ".env", # v1_cop level - Path("../../.env"), # Current directory + Path("../../.env"), + Path("../.env"), + Path(".env"), # Current directory ] - + for env_path in possible_paths: if env_path.exists(): load_dotenv(env_path) print(f"Environment loaded from: {env_path}") return - + print("No .env file found, using system environment variables") + # Load environment variables when module is imported load_environment() @@ -34,4 +39,4 @@ def load_environment(): # Debug print -print(f"Config loaded - google api key set: {bool(GOOGLE_API_KEY)}, ") \ No newline at end of file +print(f"Config loaded - google api key set: {bool(GOOGLE_API_KEY)}, ") diff --git a/agent_system/src/memory/__init__.py b/agent_system/src/memory/__init__.py index 2a0cdc6..8c06aba 100644 --- a/agent_system/src/memory/__init__.py +++ b/agent_system/src/memory/__init__.py @@ -1 +1 @@ -"""Memory management for LangGraph.""" \ No newline at end of file +"""Memory management for LangGraph.""" diff --git a/agent_system/src/memory/checkpointer.py b/agent_system/src/memory/checkpointer.py index 2e39a6c..6c25227 100644 --- a/agent_system/src/memory/checkpointer.py +++ b/agent_system/src/memory/checkpointer.py @@ -1,68 +1,164 @@ -"""LangGraph memory checkpointer implementation - Latest version.""" +# """LangGraph memory checkpointer implementation - Latest version.""" +# import asyncio +# from typing import Optional, Dict, Any +# from langgraph.checkpoint.memory import MemorySaver +# from src.utils.logger import get_logger + +# logger = get_logger(__name__) + + +# class EnhancedMemorySaver(MemorySaver): +# """Enhanced memory saver with additional logging and error handling.""" + +# def __init__(self): +# """Initialize enhanced memory saver.""" +# super().__init__() +# self._lock = asyncio.Lock() +# logger.info("Enhanced memory saver initialized") + +# # async def aput(self, config: Dict[str, Any], checkpoint: Dict[str, Any], metadata: Dict[str, Any] = None) -> None: +# # """Async put with enhanced logging.""" +# # async with self._lock: +# # try: +# # thread_id = config.get("configurable", {}).get("thread_id", "unknown") +# # logger.debug( +# # "Saving checkpoint", +# # thread_id=thread_id, +# # checkpoint_keys=list(checkpoint.keys()) if checkpoint else [] +# # ) +# # # Call parent method with metadata parameter +# # await super().aput(config, checkpoint, metadata or {}) +# # except Exception as e: +# # logger.error("Failed to save checkpoint", error=str(e)) +# # raise + +# async def aput( +# self, +# config: Dict[str, Any], +# checkpoint: Dict[str, Any], +# new_versions: Optional[Any] = None, +# ) -> None: +# """Async put with enhanced logging.""" +# async with self._lock: +# try: +# # Assuming args[0] is config and args[1] is checkpoint +# # config = args[0] if len(args) > 0 else kwargs.get("config", {}) +# # checkpoint = args[1] if len(args) > 1 else kwargs.get("checkpoint", {}) +# thread_id = config.get("configurable", {}).get("thread_id", "unknown") + +# logger.debug( +# "Saving checkpoint", +# thread_id=thread_id, +# checkpoint_keys=list(checkpoint.keys()) if checkpoint else [], +# ) +# # Forward all args/kwargs (including metadata) to super() +# await super().aput(config, checkpoint, new_versions) +# except Exception as e: +# logger.error("Failed to save checkpoint", error=str(e)) +# raise + +# async def aget(self, config: Dict[str, Any]) -> Optional[Dict[str, Any]]: +# """Async get with enhanced logging.""" +# try: +# thread_id = config.get("configurable", {}).get("thread_id", "unknown") +# checkpoint = await super().aget(config) + +# if checkpoint: +# logger.debug("Retrieved checkpoint", thread_id=thread_id) +# else: +# logger.debug("No checkpoint found", thread_id=thread_id) + +# return checkpoint + +# except Exception as e: +# logger.error("Failed to retrieve checkpoint", error=str(e)) +# return None + + +# # Singleton instance +# _memory_saver: Optional[EnhancedMemorySaver] = None +# _memory_lock = asyncio.Lock() + + +# async def get_memory_saver() -> EnhancedMemorySaver: +# """Get singleton memory saver instance.""" +# global _memory_saver + +# if _memory_saver is None: +# async with _memory_lock: +# if _memory_saver is None: +# _memory_saver = EnhancedMemorySaver() + +# return _memory_saver + +# """LangGraph ≥ 0.5.x check-point adapter – drop-in replacement.""" + +# import asyncio +# from typing import Optional +# from langgraph.checkpoint.memory import MemorySaver +# from langgraph.checkpoint.base import RunnableConfig, Checkpoint, CheckpointMetadata +# from src.utils.logger import get_logger + +# logger = get_logger(__name__) + +# class EnhancedMemorySaver(MemorySaver): +# """Thread-safe, logging check-pointer.""" + +# def __init__(self) -> None: +# super().__init__() +# self._lock = asyncio.Lock() +# logger.info("EnhancedMemorySaver initialized") + +# async def aput( +# self, +# config: RunnableConfig, +# checkpoint: Checkpoint, +# metadata: CheckpointMetadata, +# ) -> None: +# async with self._lock: +# tid = config.get("configurable", {}).get("thread_id", "unknown") +# logger.debug("Saving checkpoint", thread_id=tid) +# await super().aput(config, checkpoint, metadata) + +# from langgraph.checkpoint.base import RunnableConfig, Checkpoint, CheckpointMetadata import asyncio from typing import Optional, Dict, Any from langgraph.checkpoint.memory import MemorySaver from src.utils.logger import get_logger logger = get_logger(__name__) +from langgraph.checkpoint.base import RunnableConfig, Checkpoint, CheckpointMetadata class EnhancedMemorySaver(MemorySaver): - """Enhanced memory saver with additional logging and error handling.""" - - def __init__(self): - """Initialize enhanced memory saver.""" - super().__init__() - self._lock = asyncio.Lock() - logger.info("Enhanced memory saver initialized") - - async def aput(self, config: Dict[str, Any], checkpoint: Dict[str, Any], metadata: Dict[str, Any] = None) -> None: - """Async put with enhanced logging.""" - async with self._lock: - try: - thread_id = config.get("configurable", {}).get("thread_id", "unknown") - logger.debug( - "Saving checkpoint", - thread_id=thread_id, - checkpoint_keys=list(checkpoint.keys()) if checkpoint else [] - ) - # Call parent method with metadata parameter - await super().aput(config, checkpoint, metadata or {}) - except Exception as e: - logger.error("Failed to save checkpoint", error=str(e)) - raise - - async def aget(self, config: Dict[str, Any]) -> Optional[Dict[str, Any]]: - """Async get with enhanced logging.""" - try: - thread_id = config.get("configurable", {}).get("thread_id", "unknown") - checkpoint = await super().aget(config) - - if checkpoint: - logger.debug("Retrieved checkpoint", thread_id=thread_id) - else: - logger.debug("No checkpoint found", thread_id=thread_id) - - return checkpoint - - except Exception as e: - logger.error("Failed to retrieve checkpoint", error=str(e)) - return None - - -# Singleton instance + async def aput( + self, + config: RunnableConfig, + checkpoint: Checkpoint, + metadata: CheckpointMetadata, + ) -> None: + """Persist checkpoint – 100 % LangGraph 0.5.x compatible.""" + tid = config.get("configurable", {}).get("thread_id", "unknown") + logger.debug("Saving checkpoint", thread_id=tid) + await super().aput(config, checkpoint, metadata) # ← keyword-only call + + async def aget(self, config: RunnableConfig) -> Optional[Checkpoint]: + tid = config.get("configurable", {}).get("thread_id", "unknown") + chk = await super().aget(config) + logger.debug("Checkpoint retrieved", thread_id=tid, found=bool(chk)) + return chk + + +# Singleton _memory_saver: Optional[EnhancedMemorySaver] = None -_memory_lock = asyncio.Lock() +_lock = asyncio.Lock() async def get_memory_saver() -> EnhancedMemorySaver: - """Get singleton memory saver instance.""" global _memory_saver - if _memory_saver is None: - async with _memory_lock: + async with _lock: if _memory_saver is None: _memory_saver = EnhancedMemorySaver() - - return _memory_saver \ No newline at end of file + return _memory_saver diff --git a/agent_system/src/schemas/__init__.py b/agent_system/src/schemas/__init__.py index 421c1ce..72c2f26 100644 --- a/agent_system/src/schemas/__init__.py +++ b/agent_system/src/schemas/__init__.py @@ -1 +1 @@ -"""Pydantic schemas for the application.""" \ No newline at end of file +"""Pydantic schemas for the application.""" diff --git a/agent_system/src/schemas/blog.py b/agent_system/src/schemas/blog.py index 8be0d68..33f8f05 100644 --- a/agent_system/src/schemas/blog.py +++ b/agent_system/src/schemas/blog.py @@ -6,43 +6,37 @@ class BlogGenerationRequest(BaseModel): """Request schema for blog generation endpoint.""" - + keyword: str = Field( ..., min_length=1, max_length=200, - description="Target keyword for blog content generation" + description="Target keyword for blog content generation", ) max_attempts: Optional[int] = Field( - default=3, - ge=1, - le=10, - description="Maximum number of generation attempts" + default=3, ge=1, le=10, description="Maximum number of generation attempts" ) seo_threshold: Optional[float] = Field( default=75.0, ge=0.0, le=100.0, - description="Minimum SEO score threshold for acceptance" + description="Minimum SEO score threshold for acceptance", ) class BlogGenerationResponse(BaseModel): """Response schema for blog generation endpoint.""" - + run_id: str = Field(..., description="Unique identifier for this generation run") final_blog: str = Field(..., description="Generated blog content") - seo_scores: Dict[str, float] = Field( - ..., - description="Breakdown of SEO scores" - ) + seo_scores: Dict[str, float] = Field(..., description="Breakdown of SEO scores") attempts: int = Field(..., description="Number of attempts made") success: bool = Field(..., description="Whether generation was successful") class HealthResponse(BaseModel): """Health check response schema.""" - + status: str = Field(..., description="Service status") timestamp: str = Field(..., description="Current timestamp") - version: str = Field(..., description="Application version") \ No newline at end of file + version: str = Field(..., description="Application version") diff --git a/agent_system/src/schemas/state.py b/agent_system/src/schemas/state.py index 4c240ee..09b7fb0 100644 --- a/agent_system/src/schemas/state.py +++ b/agent_system/src/schemas/state.py @@ -6,50 +6,33 @@ class GraphState(BaseModel): """State schema for the LangGraph workflow.""" - + # Use ConfigDict instead of Config class (Pydantic v2) model_config = ConfigDict(arbitrary_types_allowed=True) - + keyword: str = Field(..., description="Target keyword for blog generation") top_posts: List[Dict[str, Any]] = Field( default_factory=list, - description="Top-10 search results with URL, title, snippet" + description="Top-10 search results with URL, title, snippet", ) cleaned_posts: List[Dict[str, Any]] = Field( - default_factory=list, - description="Cleaned and validated post content" - ) - draft_blog: str = Field( - default="", - description="Generated blog content draft" + default_factory=list, description="Cleaned and validated post content" ) + draft_blog: str = Field(default="", description="Generated blog content draft") seo_scores: Dict[str, float] = Field( - default_factory=dict, - description="SEO evaluation scores breakdown" - ) - final_score: float = Field( - default=0.0, - description="Final aggregated SEO score" - ) - attempts: int = Field( - default=0, - description="Number of generation attempts made" + default_factory=dict, description="SEO evaluation scores breakdown" ) + final_score: float = Field(default=0.0, description="Final aggregated SEO score") + attempts: int = Field(default=0, description="Number of generation attempts made") max_attempts: int = Field( - default=3, - description="Maximum allowed generation attempts" + default=3, description="Maximum allowed generation attempts" ) seo_threshold: float = Field( - default=75.0, - description="Minimum SEO score threshold for acceptance" - ) - final_blog: str = Field( - default="", - description="Final optimized blog content" + default=75.0, description="Minimum SEO score threshold for acceptance" ) - + final_blog: str = Field(default="", description="Final optimized blog content") + # Add raw_html_content field for scraping results raw_html_content: Optional[Dict[str, str]] = Field( - default_factory=dict, - description="Raw HTML content from scraped URLs" - ) \ No newline at end of file + default_factory=dict, description="Raw HTML content from scraped URLs" + ) diff --git a/agent_system/src/tools/__init__.py b/agent_system/src/tools/__init__.py index de3e6c2..9c98f9f 100644 --- a/agent_system/src/tools/__init__.py +++ b/agent_system/src/tools/__init__.py @@ -1 +1 @@ -"""Tool modules for external integrations.""" \ No newline at end of file +"""Tool modules for external integrations.""" diff --git a/agent_system/src/tools/gemini_client.py b/agent_system/src/tools/gemini_client.py index 4f800f5..579b51d 100644 --- a/agent_system/src/tools/gemini_client.py +++ b/agent_system/src/tools/gemini_client.py @@ -11,6 +11,14 @@ logger = get_logger(__name__) +# api keys +GOOGLE_API_KEY = settings.GOOGLE_API_KEY +GEMINI_MODEL = settings.GEMINI_MODEL + + +# Updated Gemini client with search grounding + + @dataclass class GeminiConfig: api_key: str @@ -18,87 +26,164 @@ class GeminiConfig: temperature: float = 0.7 max_output_tokens: int = 8192 + class GeminiClient: - """Singleton async Gemini client using google-genai.""" _instance: Optional["GeminiClient"] = None def __init__(self, config: GeminiConfig): if not config.api_key: raise ValueError("API key is required for Gemini client.") - self.client = genai.Client(api_key=config.api_key) - self.model_name = config.model_name + self.client = genai.Client(api_key=GOOGLE_API_KEY) + self.model_name = GEMINI_MODEL self.base_config = types.GenerateContentConfig( - temperature=config.temperature, - max_output_tokens=config.max_output_tokens + temperature=config.temperature, max_output_tokens=config.max_output_tokens ) - # Precompute allowed config fields for filtering - self._config_fields = {f.name for f in fields(self.base_config)} + # self._config_fields = {f.name for f in fields(self.base_config)} + self._config_fields = set(self.base_config.__dict__.keys()) logger.info("GeminiClient initialized", model=self.model_name) @classmethod async def get_instance(cls) -> "GeminiClient": if cls._instance is None: api_key = os.getenv("GOOGLE_API_KEY") or settings.GOOGLE_API_KEY - if not api_key: - raise ValueError("GOOGLE_API_KEY environment variable is required") - config = GeminiConfig( - api_key=api_key, - model_name=settings.GEMINI_MODEL - ) + config = GeminiConfig(api_key=api_key, model_name=settings.GEMINI_MODEL) cls._instance = cls(config) return cls._instance async def generate_content( - self, - prompt: str, - use_search: bool = False, # kept for compatibility, but ignored - **overrides: Any + self, prompt: str, use_search: bool = False, **overrides: Any ) -> str: - """ - Generate text using the Gemini model asynchronously. - - Args: - prompt: the text prompt - use_search: flag (ignored at this level; you can implement grounding upstream) - **overrides: any GenerateContentConfig fields you wish to override - """ try: - # Filter overrides to only valid config fields - valid_overrides = { - k: v for k, v in overrides.items() if k in self._config_fields - } - if valid_overrides: - gen_config = _dataclass_replace(self.base_config, **valid_overrides) + if use_search: + # Enable Google Search grounding + grounding_tool = types.Tool(google_search=types.GoogleSearch()) + gen_config = types.GenerateContentConfig( + temperature=self.base_config.temperature, + max_output_tokens=self.base_config.max_output_tokens, + tools=[grounding_tool], + ) else: - gen_config = self.base_config + # Apply any overrides + valid = {k: v for k, v in overrides.items() if k in self._config_fields} + gen_config = ( + _dataclass_replace(self.base_config, **valid) + if valid + else self.base_config + ) - # Perform the async call response = await self.client.aio.models.generate_content( - model=self.model_name, - contents=prompt, - config=gen_config + model=self.model_name, contents=prompt, config=gen_config ) - text = response.text or "" if not text: raise ValueError("Empty response from Gemini API") - logger.info( - "Generated content", - prompt_len=len(prompt), - response_len=len(text) + "Generated content", prompt_len=len(prompt), response_len=len(text) ) return text - except Exception as e: logger.error("Gemini generation failed", error=str(e)) raise + def _dataclass_replace(dc_obj: Any, **kwargs: Any) -> Any: """Helper to copy and override dataclass fields.""" data = {**dc_obj.__dict__, **kwargs} return type(dc_obj)(**data) + async def get_gemini_client() -> GeminiClient: - """Factory for async GeminiClient instance.""" return await GeminiClient.get_instance() + + +# @dataclass +# class GeminiConfig: +# api_key: str +# model_name: str = "gemini-2.5-flash" +# temperature: float = 0.7 +# max_output_tokens: int = 8192 + +# class GeminiClient: +# """Singleton async Gemini client using google-genai.""" +# _instance: Optional["GeminiClient"] = None + +# def __init__(self, config: GeminiConfig): +# if not config.api_key: +# raise ValueError("API key is required for Gemini client.") +# self.client = genai.Client(api_key=config.api_key) +# self.model_name = config.model_name +# self.base_config = types.GenerateContentConfig( +# temperature=config.temperature, +# max_output_tokens=config.max_output_tokens +# ) +# # Precompute allowed config fields for filtering +# self._config_fields = {f.name for f in fields(self.base_config)} +# logger.info("GeminiClient initialized", model=self.model_name) + +# @classmethod +# async def get_instance(cls) -> "GeminiClient": +# if cls._instance is None: +# api_key = os.getenv("GOOGLE_API_KEY") or settings.GOOGLE_API_KEY +# if not api_key: +# raise ValueError("GOOGLE_API_KEY environment variable is required") +# config = GeminiConfig( +# api_key=api_key, +# model_name=settings.GEMINI_MODEL +# ) +# cls._instance = cls(config) +# return cls._instance + +# async def generate_content( +# self, +# prompt: str, +# use_search: bool = False, # kept for compatibility, but ignored +# **overrides: Any +# ) -> str: +# """ +# Generate text using the Gemini model asynchronously. + +# Args: +# prompt: the text prompt +# use_search: flag (ignored at this level; you can implement grounding upstream) +# **overrides: any GenerateContentConfig fields you wish to override +# """ +# try: +# # Filter overrides to only valid config fields +# valid_overrides = { +# k: v for k, v in overrides.items() if k in self._config_fields +# } +# if valid_overrides: +# gen_config = _dataclass_replace(self.base_config, **valid_overrides) +# else: +# gen_config = self.base_config + +# # Perform the async call +# response = await self.client.aio.models.generate_content( +# model=self.model_name, +# contents=prompt, +# config=gen_config +# ) + +# text = response.text or "" +# if not text: +# raise ValueError("Empty response from Gemini API") + +# logger.info( +# "Generated content", +# prompt_len=len(prompt), +# response_len=len(text) +# ) +# return text + +# except Exception as e: +# logger.error("Gemini generation failed", error=str(e)) +# raise + +# def _dataclass_replace(dc_obj: Any, **kwargs: Any) -> Any: +# """Helper to copy and override dataclass fields.""" +# data = {**dc_obj.__dict__, **kwargs} +# return type(dc_obj)(**data) + +# async def get_gemini_client() -> GeminiClient: +# """Factory for async GeminiClient instance.""" +# return await GeminiClient.get_instance() diff --git a/agent_system/src/tools/scraper.py b/agent_system/src/tools/scraper.py index c8484ff..2dd61c1 100644 --- a/agent_system/src/tools/scraper.py +++ b/agent_system/src/tools/scraper.py @@ -1,263 +1,208 @@ -"""Async web scraping utilities with BeautifulSoup.""" - import asyncio -import aiohttp +import os from typing import Dict, List, Optional, Tuple, Any + +from playwright.async_api import ( + async_playwright, + Browser, + Page, + TimeoutError as PlaywrightTimeoutError, +) +from urllib.parse import urlparse +from src.utils.logger import get_logger from bs4 import BeautifulSoup import trafilatura -from urllib.parse import urljoin, urlparse -import os -from src.utils.logger import get_logger - - logger = get_logger(__name__) - -class ScrapeError(Exception): - """Custom exception for scraping errors.""" - pass +# List of full browser User‑Agents to rotate +USER_AGENTS = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/116.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) " + "Version/16.5 Safari/605.1.15", + # add more as needed… +] -class WebScraper: - """Async web scraper with concurrent request handling.""" - +class PlaywrightScraper: def __init__( self, - max_concurrent: int = 10, - timeout: int = 10, - user_agents: Optional[List[str]] = None + max_concurrent: int = 5, + navigation_timeout: int = 15_000, # 15s + headless: bool = True, ): - """Initialize web scraper. - - Args: - max_concurrent: Maximum concurrent requests - timeout: Request timeout in seconds - user_agents: List of user agent strings to rotate - """ - self.max_concurrent = max_concurrent - self.timeout = timeout self.semaphore = asyncio.Semaphore(max_concurrent) - - self.user_agents = user_agents or [ - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", - "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36" - ] - - logger.info( - "Web scraper initialized", - max_concurrent=max_concurrent, - timeout=timeout - ) - - async def scrape_url( - self, - session: aiohttp.ClientSession, - url: str, - user_agent_idx: int = 0 + self.navigation_timeout = navigation_timeout + self.headless = headless + + async def _fetch( + self, browser: Browser, url: str, ua: str ) -> Tuple[str, Optional[str]]: - """Scrape content from a single URL. - - Args: - session: aiohttp session - url: URL to scrape - user_agent_idx: Index of user agent to use - - Returns: - Tuple of (url, html_content) - """ + """Load a page in Playwright and return its HTML or None.""" async with self.semaphore: - try: - headers = { - "User-Agent": self.user_agents[user_agent_idx % len(self.user_agents)], - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", - "Accept-Language": "en-US,en;q=0.5", - "Accept-Encoding": "gzip, deflate", - "Connection": "keep-alive", - } - - async with session.get( - url, - headers=headers, - timeout=aiohttp.ClientTimeout(total=self.timeout) - ) as response: - if response.status == 200: - html = await response.text() - logger.debug("Successfully scraped URL", url=url, size=len(html)) - return url, html - else: - logger.warning( - "Failed to scrape URL", - url=url, - status=response.status - ) - return url, None - - except asyncio.TimeoutError: - logger.warning("Timeout scraping URL", url=url) - return url, None - except Exception as e: - logger.warning("Error scraping URL", url=url, error=str(e)) - return url, None - - async def scrape_multiple_urls( - self, - urls: List[str] - ) -> Dict[str, Optional[str]]: - """Scrape multiple URLs concurrently. - - Args: - urls: List of URLs to scrape - - Returns: - Dictionary mapping URL to HTML content (None if failed) - - Raises: - ScrapeError: If more than 50% of URLs fail - """ - if not urls: - return {} - - connector = aiohttp.TCPConnector(limit=self.max_concurrent) - timeout = aiohttp.ClientTimeout(total=self.timeout) - - async with aiohttp.ClientSession( - connector=connector, - timeout=timeout - ) as session: - - # Create tasks for all URLs - tasks = [ - self.scrape_url(session, url, idx) - for idx, url in enumerate(urls) - ] - - # Execute all tasks concurrently - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Process results - scraped_content = {} - failed_count = 0 - - for result in results: - if isinstance(result, Exception): - failed_count += 1 - continue - - url, html = result - scraped_content[url] = html - if html is None: - failed_count += 1 - - success_rate = (len(urls) - failed_count) / len(urls) - - logger.info( - "Scraping completed", - total_urls=len(urls), - successful=len(urls) - failed_count, - failed=failed_count, - success_rate=f"{success_rate:.2%}" + page: Page = await browser.new_page( + user_agent=ua, + viewport={"width": 1280, "height": 800}, ) - - # Raise error if more than 50% failed - if success_rate < 0.5: - raise ScrapeError( - f"Scraping failed: {success_rate:.2%} success rate " - f"({failed_count}/{len(urls)} URLs failed)" + # Optionally block images/fonts/css to speed up + await page.route( + "**/*", + lambda route: ( + route.continue_() + if route.request.resource_type + not in ("image", "font", "stylesheet") + else route.abort() + ), + ) + # Set a Referer header + origin = f"{urlparse(url).scheme}://{urlparse(url).netloc}" + await page.set_extra_http_headers({"Referer": origin}) + + try: + await page.goto( + url, wait_until="networkidle", timeout=self.navigation_timeout ) - - return scraped_content - + html = await page.content() + logger.debug("Scraped successfully", url=url, length=len(html)) + return url, html + except PlaywrightTimeoutError: + logger.warning("Timeout loading page", url=url) + except Exception as e: + logger.warning("Error loading page", url=url, error=str(e)) + finally: + await page.close() + + return url, None + def clean_html_content(self, html: str, url: str) -> Optional[Dict[str, Any]]: """Clean and extract content from HTML. - + Args: html: Raw HTML content url: Source URL - + Returns: Cleaned content dictionary or None if extraction fails """ try: # Try BeautifulSoup first - soup = BeautifulSoup(html, 'html.parser') - + soup = BeautifulSoup(html, "html.parser") + # Remove unwanted elements - for element in soup(['script', 'style', 'nav', 'footer', 'aside', 'header']): + for element in soup( + ["script", "style", "nav", "footer", "aside", "header"] + ): element.decompose() - + # Extract title title = "" - title_tag = soup.find('title') + title_tag = soup.find("title") if title_tag: title = title_tag.get_text().strip() - + # Extract meta description meta_desc = "" - meta_tag = soup.find('meta', attrs={'name': 'description'}) - if meta_tag and meta_tag.get('content'): - meta_desc = meta_tag.get('content').strip() - + meta_tag = soup.find("meta", attrs={"name": "description"}) + if meta_tag and meta_tag.get("content"): + meta_desc = meta_tag.get("content").strip() + # Extract headings headings = [] - for heading in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']): + for heading in soup.find_all(["h1", "h2", "h3", "h4", "h5", "h6"]): text = heading.get_text().strip() if text: headings.append(text) - + # Extract paragraphs paragraphs = [] - for p in soup.find_all('p'): + for p in soup.find_all("p"): text = p.get_text().strip() if text and len(text) > 20: # Filter out short paragraphs paragraphs.append(text) - + # If BeautifulSoup didn't get enough content, try trafilatura if len(paragraphs) < 3: extracted_text = trafilatura.extract(html) if extracted_text: - paragraphs = [p.strip() for p in extracted_text.split('\n\n') if p.strip()] - + paragraphs = [ + p.strip() for p in extracted_text.split("\n\n") if p.strip() + ] + # Calculate word count - all_text = ' '.join(paragraphs) + all_text = " ".join(paragraphs) word_count = len(all_text.split()) - + if word_count < 100: # Skip articles that are too short logger.warning("Article too short", url=url, word_count=word_count) return None - + cleaned_content = { "url": url, "title": title, "meta_description": meta_desc, "headings": headings, "paragraphs": paragraphs, - "word_count": word_count + "word_count": word_count, } - + logger.debug( "Content cleaned successfully", url=url, word_count=word_count, paragraphs=len(paragraphs), - headings=len(headings) + headings=len(headings), ) - + return cleaned_content - + except Exception as e: logger.error("Failed to clean HTML content", url=url, error=str(e)) return None + async def scrape_multiple(self, urls: List[str]) -> Dict[str, Optional[str]]: + """Scrape a list of URLs using a single browser instance.""" + async with async_playwright() as pw: + browser = await pw.chromium.launch(headless=self.headless) + tasks = [] + for idx, url in enumerate(urls): + ua = USER_AGENTS[idx % len(USER_AGENTS)] + tasks.append(self._fetch(browser, url, ua)) + + results = await asyncio.gather(*tasks) + await browser.close() + + scraped = {url: html for url, html in results} + # Log summary + total = len(urls) + success = sum(1 for html in scraped.values() if html) + rate = success / total * 100 + logger.info( + "Playwright scraping completed", + total=total, + success=success, + failed=total - success, + success_rate=f"{rate:.1f}%", + ) + + return scraped + + +# Factory +def create_scraper() -> PlaywrightScraper: + max_conc = int(os.getenv("MAX_CONCURRENT_REQUESTS", "5")) + timeout = int(os.getenv("MAX_SCRAPE_TIMEOUT", "15")) * 1000 + headless = os.getenv("PLAYWRIGHT_HEADLESS", "true").lower() == "true" + + return PlaywrightScraper( + max_concurrent=max_conc, navigation_timeout=timeout, headless=headless + ) + -# Factory function -def create_web_scraper() -> WebScraper: - """Create web scraper instance.""" - max_concurrent = int(os.getenv("MAX_CONCURRENT_REQUESTS", "10")) - timeout = int(os.getenv("MAX_SCRAPE_TIMEOUT", "10")) - - return WebScraper( - max_concurrent=max_concurrent, - timeout=timeout - ) \ No newline at end of file +# Example usage inside your scrape_posts node +# -------------------------------------------------- +# scraper = create_scraper() +# html_map = await scraper.scrape_multiple(state.top_posts) +# # then feed html_map into your clean_validate step… diff --git a/agent_system/src/tools/scraper_playwright.py b/agent_system/src/tools/scraper_playwright.py new file mode 100644 index 0000000..da15893 --- /dev/null +++ b/agent_system/src/tools/scraper_playwright.py @@ -0,0 +1,123 @@ +# scraper.py + +import asyncio +import os +from typing import Dict, List, Optional, Tuple + +from playwright.async_api import ( + async_playwright, + Browser, + Page, + TimeoutError as PlaywrightTimeoutError, +) +from urllib.parse import urlparse +from src.utils.logger import get_logger + +logger = get_logger(__name__) + +# List of full browser User‑Agents to rotate +USER_AGENTS = [ + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/116.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_4) " + "AppleWebKit/605.1.15 (KHTML, like Gecko) " + "Version/16.5 Safari/605.1.15", + # add more as needed… +] + + +class PlaywrightScraper: + def __init__( + self, + max_concurrent: int = 5, + navigation_timeout: int = 15_000, # 15s + headless: bool = True, + ): + self.semaphore = asyncio.Semaphore(max_concurrent) + self.navigation_timeout = navigation_timeout + self.headless = headless + + async def _fetch( + self, browser: Browser, url: str, ua: str + ) -> Tuple[str, Optional[str]]: + """Load a page in Playwright and return its HTML or None.""" + async with self.semaphore: + page: Page = await browser.new_page( + user_agent=ua, + viewport={"width": 1280, "height": 800}, + ) + # Optionally block images/fonts/css to speed up + await page.route( + "**/*", + lambda route: ( + route.continue_() + if route.request.resource_type + not in ("image", "font", "stylesheet") + else route.abort() + ), + ) + # Set a Referer header + origin = f"{urlparse(url).scheme}://{urlparse(url).netloc}" + await page.set_extra_http_headers({"Referer": origin}) + + try: + await page.goto( + url, wait_until="networkidle", timeout=self.navigation_timeout + ) + html = await page.content() + logger.debug("Scraped successfully", url=url, length=len(html)) + return url, html + except PlaywrightTimeoutError: + logger.warning("Timeout loading page", url=url) + except Exception as e: + logger.warning("Error loading page", url=url, error=str(e)) + finally: + await page.close() + + return url, None + + async def scrape_multiple(self, urls: List[str]) -> Dict[str, Optional[str]]: + """Scrape a list of URLs using a single browser instance.""" + async with async_playwright() as pw: + browser = await pw.chromium.launch(headless=self.headless) + tasks = [] + for idx, url in enumerate(urls): + ua = USER_AGENTS[idx % len(USER_AGENTS)] + tasks.append(self._fetch(browser, url, ua)) + + results = await asyncio.gather(*tasks) + await browser.close() + + scraped = {url: html for url, html in results} + # Log summary + total = len(urls) + success = sum(1 for html in scraped.values() if html) + rate = success / total * 100 + logger.info( + "Playwright scraping completed", + total=total, + success=success, + failed=total - success, + success_rate=f"{rate:.1f}%", + ) + + return scraped + + +# Factory +def create_scraper() -> PlaywrightScraper: + max_conc = int(os.getenv("MAX_CONCURRENT_REQUESTS", "5")) + timeout = int(os.getenv("MAX_SCRAPE_TIMEOUT", "15")) * 1000 + headless = os.getenv("PLAYWRIGHT_HEADLESS", "true").lower() == "true" + + return PlaywrightScraper( + max_concurrent=max_conc, navigation_timeout=timeout, headless=headless + ) + + +# Example usage inside your scrape_posts node +# -------------------------------------------------- +# scraper = create_scraper() +# html_map = await scraper.scrape_multiple(state.top_posts) +# # then feed html_map into your clean_validate step… diff --git a/agent_system/src/tools/search_client.py b/agent_system/src/tools/search_client.py index c3b4e09..97746d1 100644 --- a/agent_system/src/tools/search_client.py +++ b/agent_system/src/tools/search_client.py @@ -1,5 +1,5 @@ # requirements.txt -# google-custom-search[async]>=3.0.0 +# google-custom-search[async]>=3.0.0 (# not working : DEBUGGING REQUIRED) import os, asyncio from typing import List, Dict, Any @@ -12,9 +12,11 @@ GOOGLE_API_KEY = settings.GOOGLE_API_KEY GOOGLE_SEARCH_ENGINE_ID = settings.GOOGLE_SEARCH_ENGINE_ID + class SearchError(Exception): pass + class SearchClient: def __init__(self, api_key: str, cx: str): self.client = google_custom_search.CustomSearch( @@ -22,16 +24,14 @@ def __init__(self, api_key: str, cx: str): ) logger.info("SearchClient initialized (async)") - async def search_top_posts(self, keyword: str, num_results: int = 10) -> List[Dict[str, str]]: + async def search_top_posts( + self, keyword: str, num_results: int = 10 + ) -> List[Dict[str, str]]: try: results = await self.client.search_async(keyword) output = [] for idx, r in enumerate(results[:num_results]): - output.append({ - "url": r.url, - "title": r.title, - "snippet": r.snippet - }) + output.append({"url": r.url, "title": r.title, "snippet": r.snippet}) logger.info("Async search completed", keyword=keyword, count=len(output)) return output except Exception as e: @@ -40,16 +40,19 @@ async def search_top_posts(self, keyword: str, num_results: int = 10) -> List[Di finally: await self.client.close() + def create_search_client() -> SearchClient: if not GOOGLE_API_KEY or not GOOGLE_SEARCH_ENGINE_ID: raise ValueError("GOOGLE_API_KEY and GOOGLE_SEARCH_ENGINE_ID must be set") return SearchClient(GOOGLE_API_KEY, GOOGLE_SEARCH_ENGINE_ID) + # Example usage async def main(): client = create_search_client() results = await client.search_top_posts("python asyncio tutorial", num_results=10) print(results) + if __name__ == "__main__": asyncio.run(main()) diff --git a/agent_system/src/utils/__init__.py b/agent_system/src/utils/__init__.py index 4fad706..183c974 100644 --- a/agent_system/src/utils/__init__.py +++ b/agent_system/src/utils/__init__.py @@ -1 +1 @@ -"""Utility modules.""" \ No newline at end of file +"""Utility modules.""" diff --git a/agent_system/src/utils/logger.py b/agent_system/src/utils/logger.py index c8f3daa..5749254 100644 --- a/agent_system/src/utils/logger.py +++ b/agent_system/src/utils/logger.py @@ -8,14 +8,14 @@ def configure_logging() -> None: """Configure structured logging for the application.""" - + # Configure standard library logging logging.basicConfig( format="%(message)s", stream=None, level=logging.INFO, ) - + # Configure structlog structlog.configure( processors=[ @@ -24,7 +24,7 @@ def configure_logging() -> None: structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), add_timestamp, - structlog.processors.JSONRenderer() + structlog.processors.JSONRenderer(), ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), @@ -33,7 +33,9 @@ def configure_logging() -> None: ) -def add_timestamp(logger: Any, method_name: str, event_dict: Dict[str, Any]) -> Dict[str, Any]: +def add_timestamp( + logger: Any, method_name: str, event_dict: Dict[str, Any] +) -> Dict[str, Any]: """Add timestamp to log events.""" # Fix: Use timezone-aware datetime instead of deprecated utcnow() event_dict["timestamp"] = datetime.now(timezone.utc).isoformat() @@ -42,4 +44,4 @@ def add_timestamp(logger: Any, method_name: str, event_dict: Dict[str, Any]) -> def get_logger(name: str) -> structlog.stdlib.BoundLogger: """Get a configured logger instance.""" - return structlog.get_logger(name) \ No newline at end of file + return structlog.get_logger(name) diff --git a/agent_system/tests/test_langgraph_fix.py b/agent_system/tests/test_langgraph_fix.py deleted file mode 100644 index d0f7ecf..0000000 --- a/agent_system/tests/test_langgraph_fix.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python3 -"""Test script to verify LangGraph fixes work correctly.""" - -import asyncio -import json -from src.agents.graph import get_blog_generation_graph -from src.utils.logger import configure_logging, get_logger - -configure_logging() -logger = get_logger(__name__) - - -async def test_blog_generation(): - """Test the blog generation workflow.""" - print("🧪 Testing Blog Generation Workflow") - print("=" * 40) - - try: - # Get the graph - graph = await get_blog_generation_graph() - print("✅ Graph compiled successfully") - - # Test with simple keyword - result = await graph.run_blog_generation( - keyword="python programming", - max_attempts=2, - seo_threshold=50.0, - thread_id="test-001" - ) - - print("✅ Workflow completed successfully") - print(f"📊 Results:") - print(f" Success: {result['success']}") - print(f" Final Score: {result['final_score']}") - print(f" Attempts: {result['attempts']}") - print(f" Content Length: {len(result['final_blog'])} characters") - - if result['final_blog']: - # Show first 200 characters of content - preview = result['final_blog'][:200] + "..." if len(result['final_blog']) > 200 else result['final_blog'] - print(f" Content Preview: {preview}") - - return True - - except Exception as e: - print(f"❌ Test failed: {e}") - import traceback - traceback.print_exc() - return False - - -async def main(): - """Run all tests.""" - success = await test_blog_generation() - - if success: - print("\n🎉 All tests passed! LangGraph is working correctly.") - return 0 - else: - print("\n❌ Tests failed. Check the errors above.") - return 1 - - -if __name__ == "__main__": - import sys - sys.exit(asyncio.run(main())) \ No newline at end of file diff --git a/assets/examples/.gitkeep b/assets/examples/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/assets/outputs/.gitkeep b/assets/outputs/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/docker-compose.yml b/docker-compose.yml index e69de29..774e1a9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -0,0 +1,46 @@ +version: "3.8" + +services: + gemini-blog-agent: + build: + context: . + dockerfile: Dockerfile + ports: + - "8000:8000" + environment: + - GOOGLE_API_KEY=${GOOGLE_API_KEY} + - GOOGLE_SEARCH_ENGINE_ID=${GOOGLE_SEARCH_ENGINE_ID} + - GEMINI_MODEL=${GEMINI_MODEL:-gemini-2.5-flash} + - LANGSMITH_API_KEY=${LANGSMITH_API_KEY} + - LANGSMITH_PROJECT=${LANGSMITH_PROJECT:-gemini-search-blog-agent} + - MAX_CONCURRENT_REQUESTS=${MAX_CONCURRENT_REQUESTS:-10} + - MAX_SCRAPE_TIMEOUT=${MAX_SCRAPE_TIMEOUT:-10} + - MAX_ATTEMPTS=${MAX_ATTEMPTS:-3} + - SEO_THRESHOLD=${SEO_THRESHOLD:-75} + - ENVIRONMENT=production + volumes: + - ./logs:/app/logs + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/health"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s + + # Optional: Redis for production memory storage + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 30s + timeout: 3s + retries: 3 + +volumes: + redis_data: diff --git a/src/api/endpoints.py b/src/api/endpoints.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/api/init.py b/src/api/init.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/api/schemas.py b/src/api/schemas.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/config/constants.py b/src/config/constants.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/config/init.py b/src/config/init.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/config/settings.py b/src/config/settings.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/content_processor/init.py b/src/content_processor/init.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/content_processor/seo_tools.py b/src/content_processor/seo_tools.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/content_processor/text_analyzer.py b/src/content_processor/text_analyzer.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/gemini_with_search/experiments/playground.py b/src/gemini_with_search/experiments/playground.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/gemini_with_search/experiments/test.py b/src/gemini_with_search/experiments/test.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/scraper/content_extractor.py b/src/scraper/content_extractor.py deleted file mode 100644 index c4eb147..0000000 --- a/src/scraper/content_extractor.py +++ /dev/null @@ -1,520 +0,0 @@ -""" -Content Extractor Module - -This module provides functionality for extracting clean, structured content -from HTML webpages using Trafilatura library. -""" - -import logging -import trafilatura -from typing import Dict, List, Optional, Any, Union -from bs4 import BeautifulSoup -import os -import json -from datetime import datetime -from urllib.parse import urlparse - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - - -class ContentExtractor: - """Class for extracting clean, structured content from HTML webpages.""" - - def __init__( - self, - output_dir: Optional[str] = None, - include_images: bool = True, - include_tables: bool = True, - include_links: bool = True, - ): - """ - Initialize the content extractor. - - Args: - output_dir: Directory to save extracted content. If None, content is not saved. - include_images: Whether to include image information in extraction - include_tables: Whether to include table information in extraction - include_links: Whether to include link information in extraction - """ - self.output_dir = output_dir - self.include_images = include_images - self.include_tables = include_tables - self.include_links = include_links - - # Create output directory if it doesn't exist - if self.output_dir and not os.path.exists(self.output_dir): - os.makedirs(self.output_dir) - - def extract_from_html(self, html: str, url: str = "") -> Dict[str, Any]: - """ - Extract content from HTML using Trafilatura. - - Args: - html: HTML content as string - url: Source URL of the HTML content - - Returns: - Dictionary containing the extracted content - """ - try: - logger.info(f"Extracting content from URL: {url}") - - # Extract content using Trafilatura - extracted = trafilatura.extract( - html, - output_format="json", - with_metadata=True, - include_images=self.include_images, - include_tables=self.include_tables, - include_links=self.include_links, - url=url, - ) - - if not extracted: - logger.warning(f"No content extracted from {url}") - return self._create_empty_result(url) - - # Parse the JSON output - content = json.loads(extracted) - - # Create result dictionary - result = { - "url": url, - "success": True, - "title": content.get("title", ""), - "author": content.get("author", ""), - "date": content.get("date", ""), - "description": content.get("description", ""), - "categories": content.get("categories", []), - "tags": content.get("tags", []), - "text": content.get("text", ""), - "language": content.get("language", ""), - "extracted_at": datetime.now().isoformat(), - } - - # Add optional extracted elements - if self.include_images and "images" in content: - result["images"] = content.get("images", []) - - if self.include_tables and "tables" in content: - result["tables"] = content.get("tables", []) - - if self.include_links and "links" in content: - result["links"] = content.get("links", []) - - # Save to file if output directory is specified - if self.output_dir: - self._save_to_file(result) - - return result - - except Exception as e: - logger.error(f"Error extracting content from {url}: {str(e)}") - return self._create_empty_result(url) - - def extract_from_url(self, url: str) -> Dict[str, Any]: - """ - Download and extract content from a URL. - - Args: - url: URL to fetch and extract - - Returns: - Dictionary containing the extracted content - """ - try: - logger.info(f"Downloading and extracting content from URL: {url}") - - # Download and extract content using Trafilatura - downloaded = trafilatura.fetch_url(url) - if not downloaded: - logger.warning(f"Failed to download content from {url}") - return self._create_empty_result(url) - - # Extract content - return self.extract_from_html(downloaded, url) - - except Exception as e: - logger.error(f"Error extracting content from {url}: {str(e)}") - return self._create_empty_result(url) - - def extract_structured_content(self, html: str, url: str = "") -> Dict[str, Any]: - """ - Extract structured content (headings, paragraphs) from HTML. - - Args: - html: HTML content as string - url: Source URL of the HTML content - - Returns: - Dictionary containing the structured content - """ - try: - # Extract main content - extracted_content = trafilatura.extract( - html, - output_format="xml", - with_metadata=True, - include_images=self.include_images, - include_tables=self.include_tables, - include_links=self.include_links, - url=url, - ) - - if not extracted_content: - logger.warning(f"No content extracted from {url}") - return self._create_empty_result(url) - - # Parse the XML output with BeautifulSoup for structured analysis - soup = BeautifulSoup(extracted_content, "xml") - - # Extract metadata - metadata = { - "title": self._get_element_text(soup, "title"), - "author": self._get_element_text(soup, "author"), - "date": self._get_element_text(soup, "date"), - "description": self._get_element_text(soup, "description"), - } - - # Extract headings - headings = { - "h1": [h.text for h in soup.find_all("head", {"type": "h1"})], - "h2": [h.text for h in soup.find_all("head", {"type": "h2"})], - "h3": [h.text for h in soup.find_all("head", {"type": "h3"})], - "h4": [h.text for h in soup.find_all("head", {"type": "h4"})], - } - - # Extract paragraphs - paragraphs = [p.text for p in soup.find_all("p")] - - # Extract quotes - quotes = [q.text for q in soup.find_all("quote")] - - # Extract lists - lists = [self._process_list(l) for l in soup.find_all("list")] - - # Extract images - images = [] - if self.include_images: - for img in soup.find_all("graphic"): - image_data = { - "url": img.get("url", ""), - "alt": img.get("alt", ""), - "type": img.get("type", ""), - } - images.append(image_data) - - # Extract tables - tables = [] - if self.include_tables: - for table in soup.find_all("table"): - tables.append(str(table)) - - # Extract links - links = [] - if self.include_links: - for ref in soup.find_all("ref"): - link_data = {"url": ref.get("target", ""), "text": ref.text} - links.append(link_data) - - # Create result - result = { - "url": url, - "success": True, - "metadata": metadata, - "headings": headings, - "paragraphs": paragraphs, - "quotes": quotes, - "lists": lists, - "extracted_at": datetime.now().isoformat(), - } - - if self.include_images: - result["images"] = images - - if self.include_tables: - result["tables"] = tables - - if self.include_links: - result["links"] = links - - # Save to file if output directory is specified - if self.output_dir: - self._save_to_file(result, suffix="_structured") - - return result - - except Exception as e: - logger.error(f"Error extracting structured content from {url}: {str(e)}") - return self._create_empty_result(url) - - def _get_element_text(self, soup: BeautifulSoup, element_name: str) -> str: - """ - Get text from a BeautifulSoup element if it exists. - - Args: - soup: BeautifulSoup object - element_name: Name of the element to extract - - Returns: - Text content of the element or empty string - """ - element = soup.find(element_name) - return element.text if element else "" - - def _process_list(self, list_element: Any) -> Dict[str, Any]: - """ - Process a list element into a structured format. - - Args: - list_element: BeautifulSoup list element - - Returns: - Dictionary containing the list type and items - """ - items = [item.text for item in list_element.find_all("item")] - list_type = list_element.get("type", "unordered") - return {"type": list_type, "items": items} - - def _create_empty_result(self, url: str) -> Dict[str, Any]: - """ - Create an empty result dictionary for failed extractions. - - Args: - url: URL that failed extraction - - Returns: - Dictionary with basic fields and failure indicator - """ - return { - "url": url, - "success": False, - "error": "Failed to extract content", - "title": "", - "author": "", - "date": "", - "description": "", - "text": "", - "extracted_at": datetime.now().isoformat(), - } - - def _save_to_file(self, content: Dict[str, Any], suffix: str = "") -> None: - """ - Save extracted content to a JSON file. - - Args: - content: Dictionary containing the extracted content - suffix: Optional suffix to add to the filename - """ - if not self.output_dir: - return - - try: - # Create a filename based on URL and timestamp - url_parts = urlparse(content.get("url", "")) - domain = url_parts.netloc.replace(".", "_") - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - - if not domain: - domain = "unknown_source" - - filename = f"{domain}_{timestamp}{suffix}.json" - filepath = os.path.join(self.output_dir, filename) - - # Save to file - with open(filepath, "w", encoding="utf-8") as f: - json.dump(content, f, ensure_ascii=False, indent=2) - - logger.info(f"Content saved to {filepath}") - - except Exception as e: - logger.error(f"Error saving content to file: {str(e)}") - - def extract_from_crawler_result( - self, crawler_result: Dict[str, Any] - ) -> Dict[str, Any]: - """ - Extract clean content from a crawler result. - - Args: - crawler_result: Result dictionary from WebpageCrawler - - Returns: - Dictionary containing the extracted content - """ - if not crawler_result.get("success", False) or "html" not in crawler_result: - logger.warning( - f"Invalid crawler result for URL: {crawler_result.get('url', 'unknown')}" - ) - return self._create_empty_result(crawler_result.get("url", "")) - - # Extract content from HTML - html = crawler_result["html"] - url = crawler_result["url"] - - return self.extract_from_html(html, url) - - def batch_extract(self, urls: List[str]) -> List[Dict[str, Any]]: - """ - Extract content from multiple URLs. - - Args: - urls: List of URLs to extract content from - - Returns: - List of dictionaries containing the extracted content for each URL - """ - results = [] - for i, url in enumerate(urls): - logger.info(f"Extracting content from URL {i+1}/{len(urls)}: {url}") - result = self.extract_from_url(url) - results.append(result) - - return results - - def extract_text_only(self, html: str, url: str = "") -> str: - """ - Extract plain text content from HTML. - - Args: - html: HTML content as string - url: Source URL of the HTML content - - Returns: - Plain text content - """ - try: - # Extract text using Trafilatura - text = trafilatura.extract(html, output_format="text", url=url) - return text if text else "" - - except Exception as e: - logger.error(f"Error extracting text from {url}: {str(e)}") - return "" - - def extract_readability_metrics(self, text: str) -> Dict[str, float]: - """ - Calculate readability metrics for the extracted text. - - Args: - text: Text content to analyze - - Returns: - Dictionary containing readability metrics - """ - try: - # Check if text is long enough to analyze - if len(text.split()) < 100: - logger.warning("Text is too short for reliable readability analysis") - return { - "word_count": len(text.split()), - "sentence_count": text.count(".") - + text.count("!") - + text.count("?"), - "avg_word_length": sum(len(word) for word in text.split()) - / max(1, len(text.split())), - "flesch_reading_ease": None, - "flesch_kincaid_grade": None, - } - - # Basic metrics - words = text.split() - word_count = len(words) - sentence_count = text.count(".") + text.count("!") + text.count("?") - avg_word_length = sum(len(word) for word in words) / max(1, word_count) - - # Count syllables (simple approximation) - def count_syllables(word): - word = word.lower() - # Count vowel groups - count = 0 - vowels = "aeiouy" - if word[0] in vowels: - count += 1 - for i in range(1, len(word)): - if word[i] in vowels and word[i - 1] not in vowels: - count += 1 - # Subtract silent 'e' at the end - if word.endswith("e"): - count -= 1 - # Ensure at least one syllable - return max(1, count) - - syllable_count = sum(count_syllables(word) for word in words) - - # Calculate Flesch Reading Ease - # Formula: 206.835 - 1.015 * (words/sentences) - 84.6 * (syllables/words) - flesch_reading_ease = ( - 206.835 - - 1.015 * (word_count / max(1, sentence_count)) - - 84.6 * (syllable_count / max(1, word_count)) - ) - - # Calculate Flesch-Kincaid Grade Level - # Formula: 0.39 * (words/sentences) + 11.8 * (syllables/words) - 15.59 - flesch_kincaid_grade = ( - 0.39 * (word_count / max(1, sentence_count)) - + 11.8 * (syllable_count / max(1, word_count)) - - 15.59 - ) - - return { - "word_count": word_count, - "sentence_count": sentence_count, - "syllable_count": syllable_count, - "avg_word_length": avg_word_length, - "avg_sentence_length": word_count / max(1, sentence_count), - "avg_syllables_per_word": syllable_count / max(1, word_count), - "flesch_reading_ease": flesch_reading_ease, - "flesch_kincaid_grade": flesch_kincaid_grade, - } - - except Exception as e: - logger.error(f"Error calculating readability metrics: {str(e)}") - return {"error": str(e)} - - def analyze_content(self, content: Dict[str, Any]) -> Dict[str, Any]: - """ - Perform comprehensive analysis on extracted content. - - Args: - content: Dictionary containing extracted content - - Returns: - Dictionary with the original content and additional analysis - """ - if not content.get("success", False) or not content.get("text"): - return content - - # Clone the content dictionary - result = dict(content) - - # Add readability metrics - result["readability"] = self.extract_readability_metrics(content["text"]) - - # Add text statistics - text = content["text"] - result["statistics"] = { - "character_count": len(text), - "word_count": len(text.split()), - "paragraph_count": text.count("\n\n") + 1, - "avg_paragraph_length": len(text.split()) / max(1, text.count("\n\n") + 1), - } - - # Add simple keyword extraction (based on word frequency) - words = [word.lower() for word in text.split() if len(word) > 3] - word_freq = {} - for word in words: - word_freq[word] = word_freq.get(word, 0) + 1 - - # Get top keywords - top_keywords = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:20] - result["keywords"] = [ - {"word": word, "frequency": freq} for word, freq in top_keywords - ] - - return result diff --git a/src/scraper/google_search.py b/src/scraper/google_search.py deleted file mode 100644 index d060c18..0000000 --- a/src/scraper/google_search.py +++ /dev/null @@ -1,152 +0,0 @@ -""" -Google Search API Integration Module - -This module provides functionality to retrieve top search results for keywords -using the Google Custom Search JSON API. -""" - -import os -import requests -import logging -from typing import List, Dict, Optional, Any -from urllib.parse import quote_plus - - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - - -class GoogleSearchClient: - """Client for interacting with Google Custom Search JSON API.""" - - BASE_URL = "https://www.googleapis.com/customsearch/v1" - - def __init__( - self, api_key: Optional[str] = None, search_engine_id: Optional[str] = None - ): - """ - Initialize the Google Search client. - - Args: - api_key: Google API key. If None, tries to get from environment variable. - search_engine_id: Custom Search Engine ID. If None, tries to get from environment variable. - """ - self.api_key = api_key - self.search_engine_id = search_engine_id - - if not self.api_key: - logger.warning( - "No Google API key provided. Set GOOGLE_API_KEY environment variable." - ) - - if not self.search_engine_id: - logger.warning( - "No search engine ID provided. Set GOOGLE_CSE_ID environment variable." - ) - - def search( - self, - query: str, - num_results: int = 10, - search_type: str = "blog", - language: str = "en", - ) -> List[Dict[str, Any]]: - """ - Search for query using Google Custom Search API. - - Args: - query: The search query string - num_results: Number of results to return (max 10 per request with free tier) - search_type: Type of content to search for ("blog", "news", etc.) - language: Language restriction for results - - Returns: - List of search result items with URL, title, and snippet - """ - if not self.api_key or not self.search_engine_id: - logger.error("API key or Search Engine ID not provided") - return [] - - # Prepare parameters for the API request - params = { - "key": self.api_key, - "cx": self.search_engine_id, - "q": quote_plus(query), - "num": min(num_results, 10), # API limit is 10 results per query - "lr": f"lang_{language}" if language else None, - } - - # Add search type if specified - if search_type == "blog": - params["sort"] = "date" - - try: - logger.info(f"Searching for: {query}") - response = requests.get(self.BASE_URL, params=params) - response.raise_for_status() - - search_results = response.json() - - if "items" not in search_results: - logger.warning(f"No results found for query: {query}") - return [] - - # Extract relevant information from results - processed_results = [] - for item in search_results["items"]: - processed_results.append( - { - "url": item.get("link"), - "title": item.get("title"), - "snippet": item.get("snippet"), - "source": "google_search", - } - ) - - logger.info(f"Found {len(processed_results)} results for query: {query}") - return processed_results - - except requests.exceptions.HTTPError as e: - logger.error(f"HTTP error occurred: {e}") - return [] - except requests.exceptions.RequestException as e: - logger.error(f"Request error occurred: {e}") - return [] - except Exception as e: - logger.error(f"Unexpected error: {e}") - return [] - - def get_top_urls(self, keyword: str, num_results: int = 10) -> List[str]: - """ - Fetch top URLs for a given keyword. - - Args: - keyword: The target keyword to search for - num_results: Number of URLs to retrieve - - Returns: - List of top-ranking URLs for the keyword - """ - search_results = self.search(keyword, num_results=num_results) - return [result["url"] for result in search_results if "url" in result] - - def batch_search( - self, keywords: List[str], num_results: int = 10 - ) -> Dict[str, List[Dict[str, Any]]]: - """ - Perform searches for multiple keywords. - - Args: - keywords: List of keywords to search for - num_results: Number of results per keyword - - Returns: - Dictionary mapping keywords to their search results - """ - results = {} - for keyword in keywords: - results[keyword] = self.search(keyword, num_results=num_results) - return results diff --git a/src/scraper/init.py b/src/scraper/init.py deleted file mode 100644 index 95f7d6d..0000000 --- a/src/scraper/init.py +++ /dev/null @@ -1,12 +0,0 @@ -""" -Scraper Package for AI-Powered Blog Post Generator - -This package contains modules for web scraping, search API integration, -and content extraction to gather data for blog post analysis and generation. -""" - -from .google_search import GoogleSearchClient -from .webpage_crawler import WebpageCrawler -from .content_extractor import ContentExtractor - -__all__ = ["GoogleSearchClient", "WebpageCrawler", "ContentExtractor"] diff --git a/src/scraper/webpage_crawler.py b/src/scraper/webpage_crawler.py deleted file mode 100644 index 6c35805..0000000 --- a/src/scraper/webpage_crawler.py +++ /dev/null @@ -1,459 +0,0 @@ -""" -Webpage Crawler Module - -This module provides functionality for crawling webpages and extracting -HTML content using BeautifulSoup. -""" - -import requests -import logging -from typing import Dict, Optional, Any, List, Tuple -from bs4 import BeautifulSoup -import time -import random -from urllib.parse import urlparse, urljoin - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) -logger = logging.getLogger(__name__) - - -class WebpageCrawler: - """Class for crawling webpages and extracting HTML content.""" - - def __init__( - self, - user_agent: Optional[str] = None, - timeout: int = 30, - respect_robots: bool = True, - ): - """ - Initialize the webpage crawler. - - Args: - user_agent: Custom user agent string. If None, uses a default. - timeout: Request timeout in seconds - respect_robots: Whether to respect robots.txt directives - """ - self.timeout = timeout - self.respect_robots = respect_robots - - # Set default user agent if none provided - if user_agent is None: - self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" - else: - self.user_agent = user_agent - - self.session = requests.Session() - self.session.headers.update({"User-Agent": self.user_agent}) - - # Cache for robots.txt rules - self._robots_cache = {} - - def fetch_url( - self, url: str, retry_attempts: int = 3, delay_between_attempts: float = 1.0 - ) -> Optional[str]: - """ - Fetch HTML content from a URL with retry logic. - - Args: - url: The URL to fetch - retry_attempts: Number of retry attempts if the request fails - delay_between_attempts: Delay between retry attempts in seconds - - Returns: - HTML content as string or None if all attempts fail - """ - if self.respect_robots and not self._is_allowed_by_robots(url): - logger.warning(f"URL {url} is disallowed by robots.txt. Skipping.") - return None - - for attempt in range(retry_attempts): - try: - # Add jitter to avoid detection - if attempt > 0: - jitter = random.uniform(0.5, 1.5) * delay_between_attempts - time.sleep(jitter) - - logger.info( - f"Fetching URL: {url} (Attempt {attempt + 1}/{retry_attempts})" - ) - response = self.session.get( - url, timeout=self.timeout, allow_redirects=True - ) - - # Check if the request was successful - if response.status_code == 200: - return response.text - else: - logger.warning( - f"Failed to fetch {url}. Status code: {response.status_code}" - ) - - except requests.exceptions.Timeout: - logger.warning(f"Timeout occurred while fetching {url}") - except requests.exceptions.TooManyRedirects: - logger.warning(f"Too many redirects for {url}") - break # Don't retry on redirect issues - except requests.exceptions.RequestException as e: - logger.warning(f"Error fetching {url}: {str(e)}") - - logger.error(f"All attempts to fetch {url} failed") - return None - - def parse_html(self, html_content: str) -> Optional[BeautifulSoup]: - """ - Parse HTML content using BeautifulSoup. - - Args: - html_content: HTML content as string - - Returns: - BeautifulSoup object or None if parsing fails - """ - try: - return BeautifulSoup(html_content, "html.parser") - except Exception as e: - logger.error(f"Error parsing HTML: {str(e)}") - return None - - def extract_metadata(self, soup: BeautifulSoup) -> Dict[str, str]: - """ - Extract metadata from BeautifulSoup object. - - Args: - soup: BeautifulSoup object representing the HTML document - - Returns: - Dictionary of metadata (title, description, etc.) - """ - metadata = { - "title": "", - "description": "", - "canonical_url": "", - "keywords": "", - "author": "", - "published_date": "", - } - - # Extract title - title_tag = soup.find("title") - if title_tag and title_tag.string: - metadata["title"] = title_tag.string.strip() - - # Extract meta tags - for meta in soup.find_all("meta"): - name = meta.get("name", "").lower() - property = meta.get("property", "").lower() - content = meta.get("content", "") - - if name == "description" or property == "og:description": - metadata["description"] = content - elif name == "keywords": - metadata["keywords"] = content - elif name == "author": - metadata["author"] = content - elif ( - name == "article:published_time" or property == "article:published_time" - ): - metadata["published_date"] = content - - # Extract canonical URL - canonical = soup.find("link", rel="canonical") - if canonical and canonical.get("href"): - metadata["canonical_url"] = canonical.get("href") - - return metadata - - def extract_text_by_tag(self, soup: BeautifulSoup, tag_name: str) -> List[str]: - """ - Extract text content from specific HTML tags. - - Args: - soup: BeautifulSoup object - tag_name: HTML tag name to extract (e.g., 'p', 'h1', 'h2') - - Returns: - List of text content from the specified tags - """ - elements = soup.find_all(tag_name) - return [ - element.get_text().strip() - for element in elements - if element.get_text().strip() - ] - - def extract_headings(self, soup: BeautifulSoup) -> Dict[str, List[str]]: - """ - Extract all headings (h1-h6) from the page. - - Args: - soup: BeautifulSoup object - - Returns: - Dictionary mapping heading levels to lists of heading text - """ - headings = {} - for level in range(1, 7): - tag_name = f"h{level}" - headings[tag_name] = self.extract_text_by_tag(soup, tag_name) - return headings - - def extract_links(self, soup: BeautifulSoup, base_url: str) -> List[Dict[str, str]]: - """ - Extract all links from the page. - - Args: - soup: BeautifulSoup object - base_url: Base URL for resolving relative links - - Returns: - List of dictionaries containing link URL and text - """ - links = [] - for a_tag in soup.find_all("a", href=True): - href = a_tag.get("href", "").strip() - if href and not href.startswith(("#", "javascript:", "mailto:")): - # Resolve relative URLs - absolute_url = urljoin(base_url, href) - links.append({"url": absolute_url, "text": a_tag.get_text().strip()}) - return links - - def extract_images( - self, soup: BeautifulSoup, base_url: str - ) -> List[Dict[str, str]]: - """ - Extract all images from the page. - - Args: - soup: BeautifulSoup object - base_url: Base URL for resolving relative image URLs - - Returns: - List of dictionaries containing image URL and alt text - """ - images = [] - for img_tag in soup.find_all("img"): - src = img_tag.get("src", "").strip() - if src: - # Resolve relative URLs - absolute_url = urljoin(base_url, src) - images.append( - {"url": absolute_url, "alt": img_tag.get("alt", "").strip()} - ) - return images - - def _is_allowed_by_robots(self, url: str) -> bool: - """ - Check if the URL is allowed by the website's robots.txt. - - Args: - url: URL to check - - Returns: - True if allowed, False if disallowed - """ - if not self.respect_robots: - return True - - try: - parsed_url = urlparse(url) - robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" - - # Check cache first - if robots_url in self._robots_cache: - rules = self._robots_cache[robots_url] - else: - # Fetch and parse robots.txt - try: - response = self.session.get(robots_url, timeout=self.timeout) - if response.status_code == 200: - rules = self._parse_robots_txt(response.text) - else: - # If robots.txt doesn't exist or can't be fetched, allow all - rules = [] - self._robots_cache[robots_url] = rules - except Exception: - # If there's an error fetching robots.txt, allow all - self._robots_cache[robots_url] = [] - return True - - # Check if URL is allowed - path = parsed_url.path or "/" - return not any(self._is_path_matched(path, rule) for rule in rules) - - except Exception as e: - logger.warning(f"Error checking robots.txt for {url}: {str(e)}") - return True # Allow in case of error - - def _parse_robots_txt(self, content: str) -> List[str]: - """ - Parse robots.txt content and extract disallow rules for our user agent. - - Args: - content: Content of robots.txt file - - Returns: - List of disallowed paths - """ - disallowed = [] - agent_rules = False - - for line in content.split("\n"): - line = line.strip() - - if not line or line.startswith("#"): - continue - - parts = [part.strip() for part in line.split(":", 1)] - if len(parts) != 2: - continue - - key, value = parts - key = key.lower() - - if key == "user-agent": - if value == "*" or value in self.user_agent: - agent_rules = True - else: - agent_rules = False - - elif key == "disallow" and agent_rules and value: - disallowed.append(value) - - return disallowed - - def _is_path_matched(self, path: str, pattern: str) -> bool: - """ - Check if a path matches a robots.txt pattern. - - Args: - path: URL path to check - pattern: Disallow pattern from robots.txt - - Returns: - True if path matches pattern, False otherwise - """ - # Simple pattern matching for now - if pattern.endswith("*"): - return path.startswith(pattern[:-1]) - return path.startswith(pattern) - - def crawl(self, url: str) -> Dict[str, Any]: - """ - Crawl a webpage and extract all relevant content. - - Args: - url: URL to crawl - - Returns: - Dictionary containing the extracted content and metadata - """ - html_content = self.fetch_url(url) - if not html_content: - return {"url": url, "success": False, "error": "Failed to fetch content"} - - soup = self.parse_html(html_content) - if not soup: - return {"url": url, "success": False, "error": "Failed to parse HTML"} - - # Extract content - metadata = self.extract_metadata(soup) - headings = self.extract_headings(soup) - paragraphs = self.extract_text_by_tag(soup, "p") - links = self.extract_links(soup, url) - images = self.extract_images(soup, url) - - # Clean main content area (focus on content sections, remove navigation, sidebars, etc.) - main_content = self._extract_main_content(soup) - - return { - "url": url, - "success": True, - "metadata": metadata, - "headings": headings, - "paragraphs": paragraphs, - "links": links, - "images": images, - "main_content": main_content, - "html": html_content, # Include raw HTML for further processing - } - - def _extract_main_content(self, soup: BeautifulSoup) -> str: - """ - Extract the main content area of the webpage. - Uses heuristics to find the most content-rich part of the page. - - Args: - soup: BeautifulSoup object - - Returns: - String containing the main content HTML - """ - # Try common content container IDs and classes - content_selectors = [ - "article", - "main", - ".post-content", - ".entry-content", - "#content", - ".content", - ".post", - ".entry", - ".blog-post", - ] - - # Try each selector - for selector in content_selectors: - try: - content_element = soup.select_one(selector) - if content_element and len(content_element.get_text().strip()) > 200: - return str(content_element) - except Exception: - continue - - # Fallback: Find the div with the most paragraph content - paragraphs_by_container = {} - for div in soup.find_all("div"): - p_tags = div.find_all("p") - if p_tags: - text_length = sum(len(p.get_text()) for p in p_tags) - paragraphs_by_container[div] = text_length - - if paragraphs_by_container: - main_div = max(paragraphs_by_container.items(), key=lambda x: x[1])[0] - return str(main_div) - - # Last resort: Return the body content - body = soup.find("body") - if body: - return str(body) - - # If all else fails, return the entire HTML - return str(soup) - - def batch_crawl(self, urls: List[str], delay: float = 1.0) -> List[Dict[str, Any]]: - """ - Crawl multiple URLs with a delay between requests. - - Args: - urls: List of URLs to crawl - delay: Delay between requests in seconds - - Returns: - List of dictionaries containing the extracted content for each URL - """ - results = [] - for i, url in enumerate(urls): - logger.info(f"Crawling URL {i+1}/{len(urls)}: {url}") - result = self.crawl(url) - results.append(result) - - # Add delay between requests (except for the last one) - if i < len(urls) - 1: - jitter = random.uniform(0.8, 1.2) * delay - time.sleep(jitter) - - return results