Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions app-store/arxiv/arxiv_background.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from __future__ import annotations

import atexit
import logging
import os
import sys

from app_runtime.background import BackgroundRunContext, run_background
from truffle.app.background_pb2 import BackgroundContext

from arxiv_bg_worker import ArxivBackgroundWorker

logger = logging.getLogger("arxiv.background")
logger.setLevel(logging.INFO)

_worker: ArxivBackgroundWorker | None = None
_PRIORITY_DEFAULT = getattr(
BackgroundContext,
"PRIORITY_DEFAULT",
getattr(BackgroundContext, "PRIORITY_HIGH", 1),
)


def _is_verify_mode() -> bool:
return bool(sys.argv and len(sys.argv) > 1 and "verify" in sys.argv[1].lower())


def _ensure_worker() -> ArxivBackgroundWorker:
global _worker
if _worker is None:
interests = str(os.getenv("ARXIV_RESEARCH_INTERESTS", "")).strip()
_worker = ArxivBackgroundWorker(interests_raw=interests)
return _worker


def _submit(ctx: BackgroundRunContext, content: str) -> None:
ctx.bg.submit_context(
content=content,
uris=[],
priority=_PRIORITY_DEFAULT,
)


def arxiv_ambient(ctx: BackgroundRunContext) -> None:
worker = _ensure_worker()
result = worker.run_cycle()

if result.error:
logger.error("ArXiv background cycle failed", extra={"error": result.error})
return
if not result.content:
logger.info("ArXiv background cycle produced no new recommendations")
return

_submit(ctx, result.content)


def verify() -> int:
worker = _ensure_worker()
ok, message = worker.verify()
if ok:
logger.info(message)
return 0
logger.error(message)
return 1


def _cleanup() -> None:
global _worker
_worker = None


if __name__ == "__main__":
atexit.register(_cleanup)
if _is_verify_mode():
sys.exit(verify())
run_background(arxiv_ambient)
134 changes: 134 additions & 0 deletions app-store/arxiv/arxiv_bg_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from __future__ import annotations

import json
from dataclasses import dataclass
from datetime import timezone
import logging
from typing import Any

import arxiv

from arxiv_common import get_bg_state_path, parse_research_interests

logger = logging.getLogger("arxiv.bg_worker")
logger.setLevel(logging.INFO)


@dataclass
class ArxivRecommendation:
interest: str
paper_id: str
title: str
published: str
abs_url: str
summary: str


@dataclass
class BgRunResult:
content: str | None = None
error: str | None = None


class ArxivBackgroundWorker:
def __init__(self, interests_raw: str) -> None:
self._interests_raw = interests_raw
self._client = arxiv.Client()
self._state_path = get_bg_state_path()

@property
def interests(self) -> list[str]:
return parse_research_interests(self._interests_raw)

def verify(self) -> tuple[bool, str]:
interests = self.interests
if not interests:
return False, "No research interests configured. Provide at least one interest."
return True, f"ArXiv background configured with {len(interests)} interest(s)."

def run_cycle(self) -> BgRunResult:
interests = self.interests
if not interests:
return BgRunResult(error="no_interests")

state = self._load_state()
seen_ids: set[str] = set(state.get("seen_ids") or [])
recommendations: list[ArxivRecommendation] = []

for interest in interests:
for paper in self._search_interest(interest, max_results=8):
paper_id = paper.get_short_id()
if not paper_id or paper_id in seen_ids:
continue
published_iso = paper.published.astimezone(timezone.utc).date().isoformat()
recommendations.append(
ArxivRecommendation(
interest=interest,
paper_id=paper_id,
title=paper.title.strip(),
published=published_iso,
abs_url=f"https://arxiv.org/abs/{paper_id}",
summary=" ".join((paper.summary or "").split())[:450],
)
)
seen_ids.add(paper_id)
if len(recommendations) >= 3:
break
if len(recommendations) >= 3:
break

if not recommendations:
return BgRunResult(content=None)

state["seen_ids"] = sorted(list(seen_ids))[-1500:]
self._save_state(state)
return BgRunResult(content=self._build_context(recommendations))

def _search_interest(self, interest: str, *, max_results: int) -> list[arxiv.Result]:
try:
search = arxiv.Search(
query=interest,
max_results=max_results,
sort_by=arxiv.SortCriterion.SubmittedDate,
)
return list(self._client.results(search))
except Exception as exc:
logger.warning("arXiv search failed for interest '%s': %s", interest, exc)
return []

def _load_state(self) -> dict[str, Any]:
path = self._state_path
try:
if not path.exists():
return {"seen_ids": []}
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, dict):
return data
except Exception as exc:
logger.warning("Failed to load BG state: %s", exc)
return {"seen_ids": []}

def _save_state(self, state: dict[str, Any]) -> None:
path = self._state_path
try:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(state, indent=2, sort_keys=True), encoding="utf-8")
except Exception as exc:
logger.warning("Failed to save BG state: %s", exc)

def _build_context(self, items: list[ArxivRecommendation]) -> str:
lines: list[str] = [
"These are research papers the user likes.",
"Please use a research tool like Exa or web search to read each paper and provide the user a summary and notes.",
"",
"Recommended papers:",
]
for idx, item in enumerate(items, start=1):
lines.append(
f"{idx}. {item.title} (arXiv:{item.paper_id}, published {item.published})"
)
lines.append(f" Interest match: {item.interest}")
lines.append(f" URL: {item.abs_url}")
if item.summary:
lines.append(f" Abstract snippet: {item.summary}")
return "\n".join(lines)
42 changes: 42 additions & 0 deletions app-store/arxiv/arxiv_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import annotations

import os
from pathlib import Path

MAX_RESULTS = 50
DEFAULT_STORAGE_PATH = Path.home() / ".arxiv-mcp-server" / "papers"
DEFAULT_BG_STATE_PATH = Path("/root/.arxiv-truffle/arxiv_bg_state.json")


def get_storage_path() -> Path:
raw = str(os.getenv("ARXIV_STORAGE_PATH", "")).strip()
path = Path(raw) if raw else DEFAULT_STORAGE_PATH
path = path.resolve()
path.mkdir(parents=True, exist_ok=True)
return path


def get_bg_state_path() -> Path:
raw = str(os.getenv("ARXIV_BG_STATE_PATH", "")).strip()
path = Path(raw) if raw else DEFAULT_BG_STATE_PATH
path.parent.mkdir(parents=True, exist_ok=True)
return path


def parse_research_interests(raw: str | None) -> list[str]:
if not raw:
return []
normalized = raw.replace("\n", ",")
out: list[str] = []
seen: set[str] = set()
for part in normalized.split(","):
interest = part.strip()
if not interest:
continue
key = interest.lower()
if key in seen:
continue
seen.add(key)
out.append(interest)
return out

92 changes: 92 additions & 0 deletions app-store/arxiv/arxiv_foreground.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from __future__ import annotations

import logging
from typing import Any

from app_runtime.mcp import create_mcp_server, run_mcp_server

from arxiv_tools import (
build_deep_analysis_prompt,
download_paper,
list_papers,
read_paper,
search_papers,
)
from mcp.types import Icon

logger = logging.getLogger("arxiv.foreground")
logger.setLevel(logging.INFO)

mcp = create_mcp_server("arxiv")


@mcp.tool(
"search_papers",
description=(
"Search arXiv papers with optional date/category filters. "
"Use quoted phrases for exact matches (for example: \"multi-agent systems\") "
"and categories for precision (for example: cs.AI, cs.LG, cs.CL)."
),
icons=[Icon(src="https://raw.githubusercontent.com/phosphor-icons/core/main/assets/regular/magnifying-glass.svg")],
)
async def tool_search_papers(
query: str,
max_results: int = 10,
date_from: str | None = None,
date_to: str | None = None,
categories: list[str] | None = None,
sort_by: str = "relevance",
) -> dict[str, Any]:
return await search_papers(
query=query,
max_results=max_results,
date_from=date_from,
date_to=date_to,
categories=categories,
sort_by=sort_by,
)


@mcp.tool(
"download_paper",
description=(
"Download an arXiv paper by ID and convert it to markdown for local reading. "
"Use check_status=true to poll conversion progress."
),
icons=[Icon(src="https://raw.githubusercontent.com/phosphor-icons/core/main/assets/regular/download-simple.svg")],
)
async def tool_download_paper(
paper_id: str,
check_status: bool = False,
) -> dict[str, Any]:
return await download_paper(paper_id=paper_id, check_status=check_status)


@mcp.tool(
"list_papers",
description="List papers currently downloaded to local storage.",
icons=[Icon(src="https://raw.githubusercontent.com/phosphor-icons/core/main/assets/regular/list.svg")],
)
async def tool_list_papers() -> dict[str, Any]:
return await list_papers()


@mcp.tool(
"read_paper",
description="Read full markdown content for a downloaded paper by arXiv ID.",
icons=[Icon(src="https://raw.githubusercontent.com/phosphor-icons/core/main/assets/regular/book-open-text.svg")],
)
async def tool_read_paper(paper_id: str) -> dict[str, Any]:
return await read_paper(paper_id=paper_id)


@mcp.prompt(
"deep-paper-analysis",
description="Generate a structured deep analysis instruction set for a paper ID.",
)
async def prompt_deep_paper_analysis(paper_id: str) -> list[dict[str, str]]:
return [{"role": "user", "content": build_deep_analysis_prompt(paper_id)}]


if __name__ == "__main__":
run_mcp_server(mcp, logger)
Loading
Loading