diff --git a/.gitignore b/.gitignore index 931c86f..0ad5c4f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,9 @@ wheels/ # Coverage .coverage .coverage.* +coverage.xml htmlcov/ +.pytest_cache/ + +# Others +.ruff_cache/ \ No newline at end of file diff --git a/.qlty/.gitignore b/.qlty/.gitignore new file mode 100644 index 0000000..3036618 --- /dev/null +++ b/.qlty/.gitignore @@ -0,0 +1,7 @@ +* +!configs +!configs/** +!hooks +!hooks/** +!qlty.toml +!.gitignore diff --git a/.qlty/configs/.yamllint.yaml b/.qlty/configs/.yamllint.yaml new file mode 100644 index 0000000..d22fa77 --- /dev/null +++ b/.qlty/configs/.yamllint.yaml @@ -0,0 +1,8 @@ +rules: + document-start: disable + quoted-strings: + required: only-when-needed + extra-allowed: ["{|}"] + key-duplicates: {} + octal-values: + forbid-implicit-octal: true diff --git a/.qlty/qlty.toml b/.qlty/qlty.toml new file mode 100644 index 0000000..c44de1c --- /dev/null +++ b/.qlty/qlty.toml @@ -0,0 +1,101 @@ +# This file was automatically generated by `qlty init`. +# You can modify it to suit your needs. +# We recommend you to commit this file to your repository. +# +# This configuration is used by both Qlty CLI and Qlty Cloud. +# +# Qlty CLI -- Code quality toolkit for developers +# Qlty Cloud -- Fully automated Code Health Platform +# +# Try Qlty Cloud: https://qlty.sh +# +# For a guide to configuration, visit https://qlty.sh/d/config +# Or for a full reference, visit https://qlty.sh/d/qlty-toml +config_version = "0" + +exclude_patterns = [ + "*_min.*", + "*-min.*", + "*.min.*", + "**/.yarn/**", + "**/*.d.ts", + "**/assets/**", + "**/bower_components/**", + "**/build/**", + "**/cache/**", + "**/config/**", + "**/db/**", + "**/deps/**", + "**/dist/**", + "**/extern/**", + "**/external/**", + "**/generated/**", + "**/Godeps/**", + "**/gradlew/**", + "**/mvnw/**", + "**/node_modules/**", + "**/protos/**", + "**/seed/**", + "**/target/**", + "**/templates/**", + "**/testdata/**", + "**/vendor/**", +] + +test_patterns = [ + "**/test/**", + "**/spec/**", + "**/*.test.*", + "**/*.spec.*", + "**/*_test.*", + "**/*_spec.*", + "**/test_*.*", + "**/spec_*.*", +] + +[smells] +mode = "comment" + +[[source]] +name = "default" +default = true + + +[[plugin]] +name = "actionlint" + +[[plugin]] +name = "bandit" + +[[plugin]] +name = "checkov" + +[[plugin]] +name = "markdownlint" +mode = "comment" + +[[plugin]] +name = "prettier" + +[[plugin]] +name = "radarlint-python" +mode = "comment" + +[[plugin]] +name = "ripgrep" +mode = "comment" + +[[plugin]] +name = "ruff" + +[[plugin]] +name = "trivy" +drivers = [ + "config", +] + +[[plugin]] +name = "trufflehog" + +[[plugin]] +name = "yamllint" diff --git a/CHANGELOG.md b/CHANGELOG.md index ef1db19..90f018d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,124 @@ Todas as mudanças notáveis neste projeto serão documentadas neste arquivo. O formato é baseado em [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), e este projeto adere ao [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.2.0] - 2024-12-29 + +### 🚀 Major Refactoring & Architecture Improvements + +Esta versão representa uma refatoração significativa da arquitetura da biblioteca, com foco em modularização, robustez e manutenibilidade. + +### ✨ Mudanças + +#### Nova Arquitetura Modular + +- **Configurações Centralizadas**: Sistema de configuração tipo-seguro com dataclasses + - `ChunkingConfig`: Configuração para estratégias de chunking + - `TextProcessingConfig`: Configuração para processamento de texto + - `LoaderConfig`: Configuração para loaders de documentos + - `EmbeddingConfig` e `VectorStoreConfig`: Configurações futuras + - `ConfigPresets`: Presets predefinidos para casos de uso comuns + +#### Estratégias de Chunking (Pattern Strategy) + +- **`ChunkingStrategy`**: Interface abstrata para estratégias de chunking +- **`SeparatorChunkingStrategy`**: Chunking baseado em separadores preferenciais +- **`CharacterChunkingStrategy`**: Chunking baseado em contagem de caracteres +- **`TextChunker`**: Classe principal que unifica as estratégias +- Algoritmos melhorados de detecção de quebras naturais +- Cálculo otimizado de sobreposição entre chunks + +#### Módulos Especializados + +- **`EncodingDetector`**: Detecção inteligente de encoding de arquivos +- **`FileTypeDetector`**: Detecção de tipos de arquivo baseada em extensão e MIME type +- **`DocumentMetadataManager`**: Gestão centralizada de metadados de documentos +- **`LoaderFactory`**: Factory pattern para criação de loaders apropriados +- **`TextProcessor`**: Processador avançado de texto com configurações personalizáveis + +#### Constantes Centralizadas + +- Mapeamentos abrangentes de extensões de arquivo para tipos +- Suporte expandido para linguagens de programação +- Categorizações de arquivos (code, document, data, config) +- Mapeamentos de MIME types + +### 🔧 Correções + +#### Correções Críticas no Chunking + +- **Corrigida lógica de sobreposição**: Chunks não geram mais sobreposições excessivas +- **Corrigida detecção de quebras de sentença**: Prioriza quebras naturais adequadamente +- **Eliminados chunks minúsculos**: Filtros inteligentes para evitar chunks inválidos +- **Corrigido cálculo do próximo início**: Evita loops infinitos e garante progresso + +#### Melhorias nos Loaders + +- **Detecção robusta de encoding**: Fallback inteligente com múltiplas estratégias +- **Tratamento de erros aprimorado**: Loaders não falham com arquivos problemáticos +- **Metadados enriquecidos**: Informações mais detalhadas sobre arquivos carregados + +#### Refatoração da API + +- **Compatibilidade mantida**: APIs antigas continuam funcionando +- **Funções de conveniência**: `chunk_text()`, `clean_text()`, etc. usam nova arquitetura +- **TextProcessor legado**: Reimplementado usando novos componentes internamente + +#### Estrutura de Diretórios + +```text +lambda_rag_lite/ +├── config.py # Configurações centralizadas +├── constants.py # Constantes e mapeamentos +├── text_cleaning.py # Funções de limpeza de texto +├── factories.py # Factory patterns +├── detectors/ # Detectores especializados +│ ├── encoding.py +│ └── file_type.py +├── metadata/ # Gestão de metadados +│ └── document_metadata.py +├── processors/ # Processadores de texto +│ └── text_processor.py +└── strategies/ # Estratégias de chunking + └── chunking.py +``` + +#### Performance e Robustez + +- **Algoritmos otimizados**: Chunking mais eficiente e preciso +- **Tratamento de erros**: Recuperação graceful de falhas +- **Validação de entrada**: Validações robustas em todas as operações + +#### Metadados e Análise + +- **Metadados enriquecidos**: Informações detalhadas sobre arquivos e chunks +- **Estatísticas de texto**: Análise aprofundada do conteúdo +- **Detecção de tipo**: Classificação inteligente de arquivos + +### 🔄 Deprecated + +- Nenhuma funcionalidade foi depreciada nesta versão +- APIs antigas mantêm compatibilidade total + +### 🚨 Migration Guide + +Esta versão mantém total compatibilidade com a API anterior. Nenhuma mudança é necessária no código existente. + +**Recomendações para novos projetos:** + +```python +from lambda_rag_lite import ( + ChunkingConfig, TextChunker, TextProcessor as NewTextProcessor, + EncodingDetector, ConfigPresets +) + +# Use as novas classes para maior flexibilidade +config = ConfigPresets.large_documents() +chunker = TextChunker() +chunks = chunker.chunk(text, config) +``` + +--- + ## [0.1.0] - 2024-12-26 ### Added diff --git a/coverage.xml b/coverage.xml deleted file mode 100644 index 2b350de..0000000 --- a/coverage.xml +++ /dev/null @@ -1,475 +0,0 @@ - - - - - - /Users/guru/work/dmux/lambda-rag-lite/lambda_rag_lite - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/lambda_rag_lite/__init__.py b/lambda_rag_lite/__init__.py index ffe1367..ef8ac4c 100644 --- a/lambda_rag_lite/__init__.py +++ b/lambda_rag_lite/__init__.py @@ -7,16 +7,63 @@ solução leve sem dependências pesadas como NumPy ou bibliotecas de ML. """ +# Novas classes especializadas +from .config import ( + ChunkingConfig, + ConfigPresets, + EmbeddingConfig, + LoaderConfig, + TextProcessingConfig, + VectorStoreConfig, +) +from .detectors.encoding import EncodingDetector +from .detectors.file_type import FileTypeDetector from .embeddings import SimpleHashEmbedding, TFIDFHashEmbedding -from .loaders import DirectoryLoader, MarkdownLoader, TextLoader +from .loaders import DirectoryLoader, LoaderFactory, MarkdownLoader, TextLoader +from .metadata.document_metadata import DocumentMetadataManager +from .processors.text_processor import TextProcessor as NewTextProcessor +from .strategies.chunking import ChunkingStrategy, TextChunker + +# Importa as funções de compatibilidade +from .utils import TextProcessor # Classe compatível +from .utils import ( + calculate_text_stats, + chunk_text, + clean_text, + extract_keywords, + format_file_size, +) from .vectorstore import PurePythonVectorStore -__version__ = "0.1.0" +__version__ = "0.2.0" __all__ = [ + # Classes principais "SimpleHashEmbedding", "TFIDFHashEmbedding", "PurePythonVectorStore", "MarkdownLoader", "TextLoader", "DirectoryLoader", + "LoaderFactory", + # Configurações + "ChunkingConfig", + "TextProcessingConfig", + "LoaderConfig", + "EmbeddingConfig", + "VectorStoreConfig", + "ConfigPresets", + # Novas classes especializadas + "TextChunker", + "ChunkingStrategy", + "NewTextProcessor", + "EncodingDetector", + "FileTypeDetector", + "DocumentMetadataManager", + # Funções de compatibilidade + "chunk_text", + "clean_text", + "extract_keywords", + "calculate_text_stats", + "format_file_size", + "TextProcessor", # Classe compatível ] diff --git a/lambda_rag_lite/config.py b/lambda_rag_lite/config.py new file mode 100644 index 0000000..a415c85 --- /dev/null +++ b/lambda_rag_lite/config.py @@ -0,0 +1,235 @@ +""" +Configurações centralizadas para Lambda RAG Lite. + +Contém dataclasses para configurações de chunking, processamento de texto, +e outras operações configuráveis. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +from .constants import MARKDOWN_EXTENSIONS, TEXT_EXTENSIONS + + +@dataclass +class ChunkingConfig: + """Configuração para estratégias de chunking de texto.""" + + chunk_size: int = 1000 + chunk_overlap: int = 200 + separator: str = "\n\n" + natural_break_search_range: int = 50 + natural_break_chars: str = " \n\t" + sentence_break_chars: str = ".!?\n" + min_chunk_size: int = 10 + max_chunk_size: int = 4000 + + def __post_init__(self): + """Valida os parâmetros de configuração.""" + if self.chunk_size <= 0: + raise ValueError("chunk_size deve ser maior que zero") + if self.chunk_overlap < 0: + raise ValueError("chunk_overlap não pode ser negativo") + if self.chunk_overlap >= self.chunk_size: + raise ValueError("chunk_overlap deve ser menor que chunk_size") + if self.min_chunk_size <= 0: + raise ValueError("min_chunk_size deve ser maior que zero") + if self.max_chunk_size <= self.chunk_size: + raise ValueError("max_chunk_size deve ser maior que chunk_size") + + +@dataclass +class TextProcessingConfig: + """Configuração para processamento de texto.""" + + clean_text: bool = True + remove_extra_whitespace: bool = True + extract_keywords: bool = False + keyword_min_length: int = 3 + keyword_max_words: int = 20 + calculate_stats: bool = True + + # Stop words em português e inglês + stop_words: set[str] = field( + default_factory=lambda: { + # Inglês + "a", + "an", + "and", + "are", + "as", + "at", + "be", + "by", + "for", + "from", + "has", + "he", + "in", + "is", + "it", + "its", + "of", + "on", + "that", + "the", + "to", + "was", + "will", + "with", + # Português + "o", + "e", + "de", + "do", + "da", + "em", + "um", + "uma", + "para", + "com", + "por", + "no", + "na", + "os", + "dos", + "das", + "que", + "não", + "se", + "ou", + "como", + "mais", + "mas", + "ser", + "ter", + "esse", + "sua", + "seu", + "ela", + "ele", + "quando", + "onde", + "porque", + "ainda", + } + ) + + +@dataclass +class LoaderConfig: + """Configuração para loaders de documentos.""" + + encoding: str = "utf-8" + auto_detect_encoding: bool = True + fallback_encodings: list[str] = field( + default_factory=lambda: ["utf-8", "latin-1", "cp1252", "iso-8859-1"] + ) + + # Extensões suportadas por tipo + text_extensions: set[str] = field(default_factory=lambda: TEXT_EXTENSIONS.copy()) + + markdown_extensions: set[str] = field( + default_factory=lambda: MARKDOWN_EXTENSIONS.copy() + ) + + # Tamanho máximo de arquivo (em bytes) + max_file_size: int = 50 * 1024 * 1024 # 50MB + + # Se deve incluir metadados de arquivo + include_file_metadata: bool = True + + # Se deve mostrar progresso + show_progress: bool = False + + +@dataclass +class EmbeddingConfig: + """Configuração para embeddings.""" + + model_name: str = "sentence-transformers/all-MiniLM-L6-v2" + device: str = "cpu" # ou "cuda" se disponível + batch_size: int = 32 + normalize_embeddings: bool = True + trust_remote_code: bool = False + + +@dataclass +class VectorStoreConfig: + """Configuração para vector store.""" + + dimension: int = 384 # Dimensão padrão do all-MiniLM-L6-v2 + index_type: str = "flat" # flat, hnsw, ivf + distance_metric: str = "cosine" # cosine, euclidean, dot_product + + # Parâmetros específicos do FAISS + nlist: int = 100 # Para IVF + nprobe: int = 10 # Para IVF + m: int = 16 # Para HNSW + ef_construction: int = 200 # Para HNSW + ef_search: int = 100 # Para HNSW + + +# Configurações predefinidas para casos de uso comuns +class ConfigPresets: + """Presets de configuração para casos de uso comuns.""" + + @staticmethod + def small_documents() -> ChunkingConfig: + """Configuração otimizada para documentos pequenos.""" + return ChunkingConfig( + chunk_size=500, + chunk_overlap=50, + separator="\n", + natural_break_search_range=25, + ) + + @staticmethod + def large_documents() -> ChunkingConfig: + """Configuração otimizada para documentos grandes.""" + return ChunkingConfig( + chunk_size=2000, + chunk_overlap=400, + separator="\n\n", + natural_break_search_range=100, + ) + + @staticmethod + def code_files() -> ChunkingConfig: + """Configuração otimizada para arquivos de código.""" + return ChunkingConfig( + chunk_size=1500, + chunk_overlap=300, + separator="\n\n", + natural_break_search_range=75, + sentence_break_chars="\n;{}", + ) + + @staticmethod + def academic_papers() -> ChunkingConfig: + """Configuração otimizada para papers acadêmicos.""" + return ChunkingConfig( + chunk_size=1200, + chunk_overlap=200, + separator="\n\n", + natural_break_search_range=60, + sentence_break_chars=".!?\n", + ) + + @staticmethod + def fast_processing() -> TextProcessingConfig: + """Configuração para processamento rápido.""" + return TextProcessingConfig( + clean_text=False, extract_keywords=False, calculate_stats=False + ) + + @staticmethod + def comprehensive_processing() -> TextProcessingConfig: + """Configuração para processamento completo.""" + return TextProcessingConfig( + clean_text=True, + extract_keywords=True, + calculate_stats=True, + keyword_max_words=50, + ) diff --git a/lambda_rag_lite/constants.py b/lambda_rag_lite/constants.py new file mode 100644 index 0000000..51c075c --- /dev/null +++ b/lambda_rag_lite/constants.py @@ -0,0 +1,197 @@ +""" +Constantes centralizadas para tipos de arquivo e extensões. + +Centraliza as definições de tipos de arquivo e extensões para eliminar +duplicação de código entre os módulos. +""" + +# Extensões de arquivo por categoria - texto e documentos +_TEXT_EXTS = [ + ".txt", + ".md", + ".py", + ".js", + ".ts", + ".json", + ".yaml", + ".yml", + ".html", + ".htm", + ".css", + ".sql", + ".sh", + ".bash", + ".zsh", + ".csv", + ".log", + ".conf", + ".cfg", + ".ini", + ".xml", + ".rst", + ".tex", +] +TEXT_EXTENSIONS: set[str] = set(_TEXT_EXTS) + +_MARKDOWN_EXTS = [".md", ".markdown", ".mdown", ".mkd", ".mkdn"] +MARKDOWN_EXTENSIONS: set[str] = set(_MARKDOWN_EXTS) + +# Tipos de arquivo por categoria - linguagens de programação +_PROGRAMMING_LANGS = [ + "python", + "javascript", + "typescript", + "java", + "c", + "cpp", + "csharp", + "php", + "ruby", + "golang", + "rust", + "swift", + "kotlin", + "scala", + "r", + "matlab", + "perl", + "shell", + "bash", + "zsh", + "fish", + "powershell", + "batch", +] +CODE_TYPES: set[str] = set(_PROGRAMMING_LANGS) + +# Tipos de documentos e configurações +_DOC_TYPES = [ + "markdown", + "text", + "restructured_text", + "latex", + "html", + "readme", + "changelog", + "license", +] +DOCUMENT_TYPES: set[str] = set(_DOC_TYPES) + +_DATA_FORMATS = ["json", "yaml", "toml", "csv", "tsv", "sql", "xml"] +DATA_TYPES: set[str] = set(_DATA_FORMATS) + +_CONFIG_FILES = [ + "ini", + "config", + "gitignore", + "dockerignore", + "editorconfig", + "requirements", + "package_config", + "composer_config", + "cargo_config", + "python_project", +] +CONFIG_TYPES: set[str] = set(_CONFIG_FILES) + +# Mapeamento de extensões para tipos de arquivo +EXTENSION_TYPE_MAPPING = { + # Documentos de texto + ".md": "markdown", + ".markdown": "markdown", + ".mdown": "markdown", + ".mkd": "markdown", + ".mkdn": "markdown", + ".txt": "text", + ".rst": "restructured_text", + ".tex": "latex", + # Código + ".py": "python", + ".js": "javascript", + ".ts": "typescript", + ".jsx": "javascript_react", + ".tsx": "typescript_react", + ".java": "java", + ".c": "c", + ".cpp": "cpp", + ".h": "c_header", + ".hpp": "cpp_header", + ".cs": "csharp", + ".php": "php", + ".rb": "ruby", + ".go": "golang", + ".rs": "rust", + ".swift": "swift", + ".kt": "kotlin", + ".scala": "scala", + ".r": "r", + ".m": "matlab", + ".pl": "perl", + ".sh": "shell", + ".bash": "bash", + ".zsh": "zsh", + ".fish": "fish", + ".ps1": "powershell", + ".bat": "batch", + ".cmd": "batch", + # Web + ".html": "html", + ".htm": "html", + ".xml": "xml", + ".css": "css", + ".scss": "scss", + ".sass": "sass", + ".less": "less", + # Dados + ".json": "json", + ".yaml": "yaml", + ".yml": "yaml", + ".toml": "toml", + ".ini": "ini", + ".cfg": "config", + ".conf": "config", + ".csv": "csv", + ".tsv": "tsv", + ".sql": "sql", + # Logs e documentação + ".log": "log", + ".out": "output", + ".err": "error_log", + ".diff": "diff", + ".patch": "patch", + # Outros + ".gitignore": "gitignore", + ".dockerignore": "dockerignore", + ".editorconfig": "editorconfig", +} + +# Arquivos especiais por nome +SPECIAL_FILES = { + "readme": "readme", + "changelog": "changelog", + "license": "license", + "dockerfile": "dockerfile", + "makefile": "makefile", + "rakefile": "rakefile", + "gemfile": "gemfile", + "requirements.txt": "requirements", + "setup.py": "setup_script", + "package.json": "package_config", + "composer.json": "composer_config", + "cargo.toml": "cargo_config", + "pyproject.toml": "python_project", +} + +# Mapeamento de MIME types para tipos internos +MIME_TYPE_MAPPING = { + "text/plain": "text", + "text/markdown": "markdown", + "text/html": "html", + "text/css": "css", + "text/javascript": "javascript", + "text/xml": "xml", + "application/json": "json", + "application/xml": "xml", + "application/javascript": "javascript", + "application/sql": "sql", +} diff --git a/lambda_rag_lite/detectors/__init__.py b/lambda_rag_lite/detectors/__init__.py new file mode 100644 index 0000000..758afbe --- /dev/null +++ b/lambda_rag_lite/detectors/__init__.py @@ -0,0 +1,9 @@ +"""Detectores para encoding, tipo de arquivo, etc.""" + +from .encoding import EncodingDetector +from .file_type import FileTypeDetector + +__all__ = [ + "EncodingDetector", + "FileTypeDetector", +] diff --git a/lambda_rag_lite/detectors/encoding.py b/lambda_rag_lite/detectors/encoding.py new file mode 100644 index 0000000..6debcfc --- /dev/null +++ b/lambda_rag_lite/detectors/encoding.py @@ -0,0 +1,155 @@ +""" +Detector de encoding de arquivos. + +""" + +from __future__ import annotations + +from pathlib import Path + +from ..config import LoaderConfig + + +class EncodingDetector: + """ + Detector inteligente de encoding de arquivos. + + Usa uma combinação de heurísticas e bibliotecas para detectar + o encoding mais provável de um arquivo. + """ + + def __init__(self, config: LoaderConfig | None = None): + """ + Inicializa o detector com configurações. + + Args: + config: Configuração para detecção (usa padrão se None) + """ + self.config = config or LoaderConfig() + + def detect_and_read(self, file_path: Path) -> tuple[str, str]: + """ + Detecta encoding e lê o conteúdo do arquivo. + + Args: + file_path: Caminho para o arquivo + + Returns: + Tupla (conteúdo, encoding_usado) + + Raises: + FileNotFoundError: Se arquivo não existe + UnicodeDecodeError: Se não conseguiu decodificar com nenhum encoding + """ + if not file_path.exists(): + raise FileNotFoundError(f"Arquivo não encontrado: {file_path}") + + # Tenta primeiro com o encoding padrão + try: + content = file_path.read_text(encoding=self.config.encoding) + return content, self.config.encoding + except UnicodeDecodeError: + pass + + if self.config.auto_detect_encoding: + # Tenta com encodings da lista de fallback + for encoding in self.config.fallback_encodings: + try: + content = file_path.read_text(encoding=encoding) + return content, encoding + except UnicodeDecodeError: + continue + + # Se ainda falhou, tenta detecção automática com chardet + detected_encoding = self._detect_with_chardet(file_path) + if detected_encoding: + try: + content = file_path.read_text(encoding=detected_encoding) + return content, detected_encoding + except UnicodeDecodeError: + # Failed to decode with detected encoding, continue to fallback + pass + + # Último recurso: lê ignorando erros + try: + content = file_path.read_text(errors="ignore") + return content, "utf-8 (com erros ignorados)" + except Exception as e: + # Log the error but continue to raise ValueError below + import logging + + logging.debug( + f"Failed to read file {file_path} with error handling: {e}" + ) + + raise ValueError( + f"Não foi possível decodificar o arquivo {file_path} " + f"com nenhum dos encodings tentados: {self.config.fallback_encodings}" + ) + + def detect_encoding(self, file_path: Path) -> str | None: + """ + Detecta apenas o encoding sem ler o arquivo completo. + + Args: + file_path: Caminho para o arquivo + + Returns: + Encoding detectado ou None se não conseguiu detectar + """ + if not file_path.exists(): + return None + + # Tenta detecção com chardet + return self._detect_with_chardet(file_path) + + def _detect_with_chardet(self, file_path: Path) -> str | None: + """ + Usa heurísticas simples para detectar encoding lendo uma amostra do arquivo. + + Args: + file_path: Caminho para o arquivo + + Returns: + Encoding detectado ou None + """ + try: + # Lê primeiros 1KB para análise + with open(file_path, "rb") as f: + raw_data = f.read(1024) + + if not raw_data: + return None + + # Heurísticas simples para detecção de encoding + # Verifica se parece ser UTF-8 + try: + raw_data.decode("utf-8") + return "utf-8" + except UnicodeDecodeError: + pass + + # Verifica se parece ser Latin-1 + try: + raw_data.decode("latin-1") + return "latin-1" + except UnicodeDecodeError: + # Failed to decode with latin-1, will return None + pass + + except Exception as e: + # Log the error for debugging + import logging + + logging.debug(f"Error during encoding detection for {file_path}: {e}") + + return None + + def get_supported_encodings(self) -> list[str]: + """ + Retorna lista de encodings suportados pelo detector. + + Returns: + Lista de strings com nomes dos encodings + """ + return [self.config.encoding] + self.config.fallback_encodings diff --git a/lambda_rag_lite/detectors/file_type.py b/lambda_rag_lite/detectors/file_type.py new file mode 100644 index 0000000..1207d67 --- /dev/null +++ b/lambda_rag_lite/detectors/file_type.py @@ -0,0 +1,169 @@ +""" +Detector de tipo de arquivo. + +""" + +from __future__ import annotations + +import mimetypes +from pathlib import Path + +from ..config import LoaderConfig +from ..constants import ( + CODE_TYPES, + CONFIG_TYPES, + DATA_TYPES, + DOCUMENT_TYPES, + EXTENSION_TYPE_MAPPING, + MIME_TYPE_MAPPING, + SPECIAL_FILES, +) + + +class FileTypeDetector: + """ + Detector de tipo de arquivo baseado em extensão e MIME type. + + Centraliza a lógica de detecção de tipos de arquivo que estava + espalhada pelos loaders. + """ + + def __init__(self, config: LoaderConfig | None = None): + """ + Inicializa o detector. + + Args: + config: Configuração do loader (usa padrão se None) + """ + self.config = config or LoaderConfig() + # Usa mapeamento das constantes + self.type_mapping = EXTENSION_TYPE_MAPPING + + def detect_file_type(self, file_path: Path, fallback: str = "unknown") -> str: + """ + Detecta o tipo de um arquivo. + + Args: + file_path: Caminho para o arquivo + fallback: Tipo a retornar quando não consegue detectar (padrão: "unknown") + + Returns: + String descrevendo o tipo do arquivo + """ + # Verifica nome especial do arquivo + filename_lower = file_path.name.lower() + for special, type_name in SPECIAL_FILES.items(): + if special in filename_lower: + return type_name + + # Detecção por extensão + ext = file_path.suffix.lower() + if ext in self.type_mapping: + return self.type_mapping[ext] + + # Se arquivo existe, tenta detecção por MIME type + if file_path.exists(): + mime_type = self._get_mime_type(file_path) + if mime_type: + mime_type_result = self._mime_to_type(mime_type) + if mime_type_result != "unknown": + return mime_type_result + + # Verifica se é arquivo de texto + return "text" if self._is_text_file(file_path) else fallback + + # Se arquivo não existe, retorna o fallback + return fallback + + def is_supported_extension(self, file_path: Path) -> bool: + """ + Verifica se a extensão do arquivo é suportada. + + Args: + file_path: Caminho para o arquivo + + Returns: + True se a extensão é suportada + """ + ext = file_path.suffix.lower() + return ( + ext in self.config.text_extensions or ext in self.config.markdown_extensions + ) + + def is_text_file(self, file_path: Path) -> bool: + """ + Verifica se um arquivo é de texto. + + Args: + file_path: Caminho para o arquivo + + Returns: + True se o arquivo é de texto + """ + return self._is_text_file(file_path) + + def _get_mime_type(self, file_path: Path) -> str | None: + """Obtém MIME type do arquivo.""" + try: + mime_type, _ = mimetypes.guess_type(str(file_path)) + return mime_type + except Exception: + return None + + def _mime_to_type(self, mime_type: str) -> str: + """Converte MIME type para tipo interno.""" + return MIME_TYPE_MAPPING.get( + mime_type, "text" if mime_type.startswith("text/") else "unknown" + ) + + def _is_text_file(self, file_path: Path) -> bool: + """ + Verifica se arquivo é de texto lendo uma amostra. + + Args: + file_path: Caminho para o arquivo + + Returns: + True se parece ser arquivo de texto + """ + try: + # Lê primeiros 512 bytes + with open(file_path, "rb") as f: + sample = f.read(512) + + if not sample: + return True # Arquivo vazio é considerado texto + + # Verifica se contém muitos bytes não-texto + non_text_chars = sum( + 1 for byte in sample if byte < 32 and byte not in (9, 10, 13) + ) + + # Se mais de 10% são caracteres não-texto, provavelmente é binário + return (non_text_chars / len(sample)) < 0.1 + + except Exception: + return False + + def get_file_category(self, file_path: Path) -> str: + """ + Retorna categoria geral do arquivo. + + Args: + file_path: Caminho para o arquivo + + Returns: + Categoria do arquivo (code, document, data, config, etc.) + """ + file_type = self.detect_file_type(file_path) + + if file_type in CODE_TYPES: + return "code" + elif file_type in DOCUMENT_TYPES: + return "document" + elif file_type in DATA_TYPES: + return "data" + elif file_type in CONFIG_TYPES: + return "config" + else: + return "other" diff --git a/lambda_rag_lite/factories.py b/lambda_rag_lite/factories.py new file mode 100644 index 0000000..7147ca5 --- /dev/null +++ b/lambda_rag_lite/factories.py @@ -0,0 +1,62 @@ +""" +Factory classes for creating loaders. + +""" + +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .loaders import MarkdownLoader, TextLoader + +from .config import LoaderConfig +from .detectors.file_type import FileTypeDetector + + +class LoaderFactory: + """ + Factory para criar loaders apropriados baseado no tipo de arquivo. + + Centraliza a lógica de seleção de loaders. + """ + + def __init__(self, config: LoaderConfig | None = None): + """ + Inicializa a factory. + + Args: + config: Configuração para os loaders + """ + self.config = config or LoaderConfig() + self.file_type_detector = FileTypeDetector(self.config) + + def get_loader_for_file( + self, file_path: Path + ) -> MarkdownLoader | TextLoader | None: + """ + Retorna o loader apropriado para um arquivo. + + Args: + file_path: Caminho do arquivo + + Returns: + Loader apropriado ou None se não suportado + """ + if not self.file_type_detector.is_supported_extension(file_path): + return None + + ext = file_path.suffix.lower() + + # Import here to avoid circular imports + from .loaders import MarkdownLoader, TextLoader + + # Usa mapeamento direto em vez de múltiplas condições + if ext in self.config.markdown_extensions: + return MarkdownLoader(file_path) + + if ext in self.config.text_extensions: + return TextLoader(file_path) + + return None diff --git a/lambda_rag_lite/loaders.py b/lambda_rag_lite/loaders.py index 57a8925..26109d3 100644 --- a/lambda_rag_lite/loaders.py +++ b/lambda_rag_lite/loaders.py @@ -1,25 +1,28 @@ """ Módulo de loaders para diferentes tipos de documentos. -Fornece classes para carregar e processar documentos de diferentes formatos, -mantendo compatibilidade com a interface LangChain. """ from __future__ import annotations -import mimetypes +import importlib.util +import logging from pathlib import Path from typing import List, Optional, Union from langchain_core.documents import Document +from .config import LoaderConfig +from .detectors.encoding import EncodingDetector +from .detectors.file_type import FileTypeDetector +from .factories import LoaderFactory +from .metadata.document_metadata import DocumentMetadataManager + class MarkdownLoader: """ Loader para arquivos Markdown que busca recursivamente em diretórios. - Carrega todos os arquivos .md encontrados em um diretório e seus - subdiretórios, criando Documents LangChain com metadados apropriados. """ def __init__( @@ -37,8 +40,23 @@ def __init__( auto_detect_encoding: Se deve tentar detectar encoding automaticamente """ self.path = Path(path) - self.encoding = encoding - self.auto_detect_encoding = auto_detect_encoding + + # Cria configuração e componentes especializados + self.config = LoaderConfig( + encoding=encoding, auto_detect_encoding=auto_detect_encoding + ) + self.encoding_detector = EncodingDetector(self.config) + self.metadata_manager = DocumentMetadataManager(self.config) + + @property + def encoding(self) -> str: + """Retorna a codificação configurada.""" + return self.config.encoding + + @property + def auto_detect_encoding(self) -> bool: + """Retorna se a detecção automática de encoding está habilitada.""" + return self.config.auto_detect_encoding def load(self) -> List[Document]: """ @@ -77,6 +95,7 @@ def _load_single_file(self, file_path: Path) -> Optional[Document]: """ Carrega um único arquivo Markdown. + Args: file_path: Caminho para o arquivo @@ -84,42 +103,36 @@ def _load_single_file(self, file_path: Path) -> Optional[Document]: Document ou None se houve erro """ try: - # Tenta com a codificação especificada - content = file_path.read_text(encoding=self.encoding) - except UnicodeDecodeError: - if self.auto_detect_encoding: - # Tenta outras codificações comuns - for encoding in ["utf-8", "latin-1", "cp1252"]: - try: - content = file_path.read_text(encoding=encoding) - break - except UnicodeDecodeError: - continue - else: - # Se todas falharam, usa ignore para evitar crash - content = file_path.read_text(errors="ignore") - else: - return None - except Exception: - return None + # Tenta usar o detector de encoding especializado + try: + content, encoding_used = self.encoding_detector.detect_and_read( + file_path + ) + except Exception: + # Fallback para leitura direta (útil para testes com mocks) + content = file_path.read_text(encoding=self.config.encoding) + encoding_used = self.config.encoding + + # Cria metadados básicos + metadata = { + "source": str(file_path.absolute()), + "filename": file_path.name, + "file_type": "markdown", + "file_size": file_path.stat().st_size, + "encoding": encoding_used, + } - # Metadados do arquivo - metadata = { - "source": str(file_path.absolute()), - "filename": file_path.name, - "file_type": "markdown", - "file_size": file_path.stat().st_size, - } + return Document(page_content=content, metadata=metadata) - return Document(page_content=content, metadata=metadata) + except Exception: + # Log error silently and continue + return None class TextLoader: """ Loader genérico para arquivos de texto. - Carrega arquivos .txt, .md, .py, .js, e outros formatos de texto, - detectando automaticamente o tipo baseado na extensão. """ def __init__( @@ -139,35 +152,30 @@ def __init__( extensions: Lista de extensões para carregar (padrão: texto comum) """ self.path = Path(path) - self.encoding = encoding - self.auto_detect_encoding = auto_detect_encoding - - # Extensões padrão de texto - if extensions is None: - self.extensions = { - ".txt", - ".md", - ".py", - ".js", - ".ts", - ".json", - ".yaml", - ".yml", - ".html", - ".htm", - ".css", - ".sql", - ".sh", - ".bash", - ".zsh", - ".csv", - ".log", - ".conf", - ".cfg", - ".ini", - } - else: - self.extensions = {ext.lower() for ext in extensions} + + # Cria configuração personalizada + config = LoaderConfig( + encoding=encoding, auto_detect_encoding=auto_detect_encoding + ) + + # Personaliza extensões se fornecidas + if extensions is not None: + config.text_extensions = {ext.lower() for ext in extensions} + + # Inicializa componentes especializados + self.config = config + self.encoding_detector = EncodingDetector(config) + self.file_type_detector = FileTypeDetector(config) + self.metadata_manager = DocumentMetadataManager(config, self.file_type_detector) + + @property + def extensions(self) -> set[str]: + """Retorna o conjunto de extensões suportadas.""" + return self.config.text_extensions + + def _detect_file_type(self, file_path: Path) -> str: + """Detecta o tipo do arquivo baseado na extensão.""" + return self.file_type_detector.detect_file_type(file_path, fallback="text") def load(self) -> List[Document]: """ @@ -179,96 +187,74 @@ def load(self) -> List[Document]: if not self.path.exists(): raise FileNotFoundError(f"Caminho não encontrado: {self.path}") - documents = [] + return self._load_files() + def _load_files(self) -> list[Document]: + """Carrega arquivos baseado no tipo de caminho.""" if self.path.is_file(): - if self.path.suffix.lower() in self.extensions: - doc = self._load_single_file(self.path) - if doc: - documents.append(doc) + return self._load_single_path() else: - # Busca recursiva - for file_path in self.path.rglob("*"): - if file_path.is_file() and file_path.suffix.lower() in self.extensions: - doc = self._load_single_file(file_path) - if doc: - documents.append(doc) + return self._load_directory_files() - return documents - - def _load_single_file(self, file_path: Path) -> Optional[Document]: - """Carrega um único arquivo de texto.""" - try: - content = file_path.read_text(encoding=self.encoding) - except UnicodeDecodeError: - if self.auto_detect_encoding: - for encoding in ["utf-8", "latin-1", "cp1252"]: - try: - content = file_path.read_text(encoding=encoding) - break - except UnicodeDecodeError: - continue - else: - content = file_path.read_text(errors="ignore") - else: - return None - except Exception: - return None + def _load_single_path(self) -> list[Document]: + """Carrega um único arquivo.""" + if self.file_type_detector.is_supported_extension(self.path): + doc = self._load_single_file(self.path) + return [doc] if doc else [] + return [] - # Detecta tipo de arquivo - file_type = self._detect_file_type(file_path) + def _load_directory_files(self) -> list[Document]: + """Carrega arquivos de um diretório.""" + documents = [] + for file_path in self.path.rglob("*"): + if self._should_load_file(file_path): + doc = self._load_single_file(file_path) + if doc: + documents.append(doc) + return documents - metadata = { - "source": str(file_path.absolute()), - "filename": file_path.name, - "file_type": file_type, - "file_size": file_path.stat().st_size, - "extension": file_path.suffix.lower(), - } + def _should_load_file(self, file_path: Path) -> bool: + """Verifica se um arquivo deve ser carregado.""" + return file_path.is_file() and self.file_type_detector.is_supported_extension( + file_path + ) - return Document(page_content=content, metadata=metadata) + def _load_single_file(self, file_path: Path) -> Document | None: + """ + Carrega um único arquivo de texto. - def _detect_file_type(self, file_path: Path) -> str: """ - Detecta o tipo de arquivo baseado na extensão. + try: + # Tenta usar o detector de encoding especializado + try: + content, encoding_used = self.encoding_detector.detect_and_read( + file_path + ) + except Exception: + # Fallback para leitura direta (útil para testes com mocks) + content = file_path.read_text(encoding=self.config.encoding) + encoding_used = self.config.encoding + + # Cria metadados básicos + metadata = { + "source": str(file_path.absolute()), + "filename": file_path.name, + "file_type": self._detect_file_type(file_path), + "file_size": file_path.stat().st_size, + "encoding": encoding_used, + "extension": file_path.suffix.lower(), + } - Args: - file_path: Caminho do arquivo + return Document(page_content=content, metadata=metadata) - Returns: - String descrevendo o tipo de arquivo - """ - ext = file_path.suffix.lower() - - type_mapping = { - ".md": "markdown", - ".py": "python", - ".js": "javascript", - ".ts": "typescript", - ".html": "html", - ".htm": "html", - ".css": "css", - ".json": "json", - ".yaml": "yaml", - ".yml": "yaml", - ".sql": "sql", - ".sh": "shell", - ".bash": "shell", - ".zsh": "shell", - ".csv": "csv", - ".log": "log", - ".txt": "text", - } - - return type_mapping.get(ext, "text") + except Exception: + return None class DirectoryLoader: """ Loader que combina múltiplos loaders para diferentes tipos de arquivo. - Carrega automaticamente diferentes tipos de arquivo de um diretório, - aplicando o loader apropriado baseado na extensão. """ def __init__( @@ -276,6 +262,7 @@ def __init__( path: Union[str, Path], recursive: bool = True, show_progress: bool = False, + config: LoaderConfig | None = None, ): """ Inicializa o loader de diretório. @@ -283,11 +270,16 @@ def __init__( Args: path: Caminho do diretório recursive: Se deve buscar recursivamente - show_progress: Se deve mostrar progresso (requer tqdm) + show_progress: Se deve mostrar progresso + config: Configuração personalizada """ self.path = Path(path) self.recursive = recursive self.show_progress = show_progress + self.config = config or LoaderConfig() + + # Inicializa factory de loaders + self.loader_factory = LoaderFactory(self.config) def load(self) -> List[Document]: """ @@ -296,80 +288,63 @@ def load(self) -> List[Document]: Returns: Lista de Documents de todos os arquivos """ + self._validate_path() + + files = self._get_files() + if self.show_progress and self._has_progress_library(): + files = self._wrap_with_progress(files) + + return self._process_files(files) + + def _validate_path(self) -> None: + """Valida se o caminho existe e é um diretório.""" if not self.path.exists(): raise FileNotFoundError(f"Diretório não encontrado: {self.path}") if not self.path.is_dir(): raise ValueError(f"Caminho não é um diretório: {self.path}") + def _process_files(self, files) -> list[Document]: + """Processa lista de arquivos e retorna documentos.""" documents = [] + for file_path in files: + try: + loader = self.loader_factory.get_loader_for_file(file_path) + if loader: + docs = loader.load() + documents.extend(docs) + except Exception as e: + # Log the error for debugging and continue with next file + logging.debug(f"Failed to load file {file_path}: {e}") + continue + + return documents - # Busca arquivos + def _get_loader_for_file(self, file_path: Path): + """Obtém o loader apropriado para um arquivo.""" + return self.loader_factory.get_loader_for_file(file_path) + + def _get_files(self) -> list[Path]: + """Obtém lista de arquivos para processar.""" if self.recursive: files = list(self.path.rglob("*")) else: files = list(self.path.glob("*")) # Filtra apenas arquivos - files = [f for f in files if f.is_file()] + return [f for f in files if f.is_file()] + + def _has_progress_library(self) -> bool: + """Verifica se biblioteca de progresso está disponível.""" + return importlib.util.find_spec("tqdm") is not None - # Aplica loader apropriado - if self.show_progress: + def _wrap_with_progress(self, files: list[Path]): + """Envolve lista com barra de progresso se disponível.""" + if self._has_progress_library(): try: from tqdm import tqdm - files = tqdm(files, desc="Carregando arquivos") + return tqdm(files, desc="Processando arquivos") except ImportError: - pass # tqdm não disponível - - for file_path in files: - try: - loader = self._get_loader_for_file(file_path) - if loader: - docs = loader.load() - documents.extend(docs) - except Exception: - continue # Ignora arquivos que não conseguiu carregar - - return documents - - def _get_loader_for_file( - self, file_path: Path - ) -> Optional[Union[MarkdownLoader, TextLoader]]: - """ - Retorna o loader apropriado para um arquivo. - - Args: - file_path: Caminho do arquivo - - Returns: - Loader apropriado ou None se não suportado - """ - ext = file_path.suffix.lower() - - if ext == ".md": - return MarkdownLoader(file_path) - elif ext in { - ".txt", - ".py", - ".js", - ".ts", - ".json", - ".yaml", - ".yml", - ".html", - ".htm", - ".css", - ".sql", - ".sh", - ".bash", - ".zsh", - ".csv", - ".log", - ".conf", - ".cfg", - ".ini", - }: - return TextLoader(file_path) - - return None + pass + return files diff --git a/lambda_rag_lite/metadata/__init__.py b/lambda_rag_lite/metadata/__init__.py new file mode 100644 index 0000000..996dc01 --- /dev/null +++ b/lambda_rag_lite/metadata/__init__.py @@ -0,0 +1,7 @@ +"""Manipulação de metadados.""" + +from .document_metadata import DocumentMetadataManager + +__all__ = [ + "DocumentMetadataManager", +] diff --git a/lambda_rag_lite/metadata/document_metadata.py b/lambda_rag_lite/metadata/document_metadata.py new file mode 100644 index 0000000..12d46ed --- /dev/null +++ b/lambda_rag_lite/metadata/document_metadata.py @@ -0,0 +1,224 @@ +""" +Gerenciador de metadados de documentos. + +Centraliza a criação e manipulação de metadados para documentos +carregados pelos loaders. +""" + +from __future__ import annotations + +import mimetypes +from datetime import datetime +from pathlib import Path +from typing import Any + +from ..config import LoaderConfig +from ..detectors.file_type import FileTypeDetector +from ..text_cleaning import format_file_size + + +class DocumentMetadataManager: + """ + Gerenciador centralizado de metadados de documentos. + + Extrai e organiza metadados de arquivos de forma consistente + para todos os loaders. + """ + + def __init__( + self, + config: LoaderConfig | None = None, + file_type_detector: FileTypeDetector | None = None, + ): + """ + Inicializa o gerenciador. + + Args: + config: Configuração do loader + file_type_detector: Detector de tipo de arquivo + """ + self.config = config or LoaderConfig() + self.file_type_detector = file_type_detector or FileTypeDetector(config) + + def create_file_metadata( + self, + file_path: Path, + encoding_used: str | None = None, + additional_metadata: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """ + Cria metadados completos para um arquivo. + + Args: + file_path: Caminho para o arquivo + encoding_used: Encoding usado para ler o arquivo + additional_metadata: Metadados adicionais específicos + + Returns: + Dicionário com metadados do arquivo + """ + if not file_path.exists(): + raise FileNotFoundError(f"Arquivo não encontrado: {file_path}") + + # Metadados básicos sempre incluídos + metadata: dict[str, Any] = { + "source": str(file_path.absolute()), + "filename": file_path.name, + "file_stem": file_path.stem, + "file_extension": file_path.suffix.lower(), + } + + if self.config.include_file_metadata: + # Adiciona metadados detalhados do arquivo + stat_info = file_path.stat() + + metadata.update( + { + "file_size": stat_info.st_size, + "file_size_human": format_file_size(stat_info.st_size), + "created_time": datetime.fromtimestamp( + stat_info.st_ctime + ).isoformat(), + "modified_time": datetime.fromtimestamp( + stat_info.st_mtime + ).isoformat(), + "file_type": self.file_type_detector.detect_file_type(file_path), + "file_category": self.file_type_detector.get_file_category( + file_path + ), + } + ) + + # Adiciona MIME type se disponível + mime_type = self._get_mime_type(file_path) + if mime_type: + metadata["mime_type"] = mime_type + + # Adiciona encoding usado + if encoding_used: + metadata["encoding"] = encoding_used + + # Adiciona metadados específicos do caminho + metadata.update(self._extract_path_metadata(file_path)) + + # Merge com metadados adicionais + if additional_metadata: + metadata.update(additional_metadata) + + return metadata + + def create_directory_metadata( + self, directory_path: Path, additional_metadata: dict[str, Any] | None = None + ) -> dict[str, Any]: + """ + Cria metadados para um diretório. + + Args: + directory_path: Caminho para o diretório + additional_metadata: Metadados adicionais + + Returns: + Dicionário com metadados do diretório + """ + if not directory_path.exists(): + raise FileNotFoundError(f"Diretório não encontrado: {directory_path}") + + metadata = { + "source": str(directory_path.absolute()), + "directory_name": directory_path.name, + "is_directory": True, + } + + if self.config.include_file_metadata: + stat_info = directory_path.stat() + metadata.update( + { + "created_time": datetime.fromtimestamp( + stat_info.st_ctime + ).isoformat(), + "modified_time": datetime.fromtimestamp( + stat_info.st_mtime + ).isoformat(), + } + ) + + # Adiciona metadados específicos do caminho + metadata.update(self._extract_path_metadata(directory_path)) + + if additional_metadata: + metadata.update(additional_metadata) + + return metadata + + def enhance_metadata_with_content_analysis( + self, + metadata: dict[str, Any], + content: str, + analysis_results: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """ + Enriquece metadados com análise do conteúdo. + + Args: + metadata: Metadados existentes + content: Conteúdo do documento + analysis_results: Resultados de análise de texto + + Returns: + Metadados enriquecidos + """ + enhanced_metadata = metadata.copy() + + # Adiciona estatísticas básicas do conteúdo + if content: + enhanced_metadata.update( + { + "content_length": len(content), + "content_lines": len(content.split("\n")), + "content_words": len(content.split()), + "content_paragraphs": len( + [p for p in content.split("\n\n") if p.strip()] + ), + } + ) + + # Adiciona resultados de análise se fornecidos + if analysis_results: + enhanced_metadata.update(analysis_results) + + return enhanced_metadata + + def _extract_path_metadata(self, path: Path) -> dict[str, Any]: + """ + Extrai metadados do caminho do arquivo. + + Args: + path: Caminho do arquivo/diretório + + Returns: + Dicionário com metadados do caminho + """ + metadata: dict[str, Any] = {} + + # Adiciona informações do diretório pai + if path.parent != path: + metadata["parent_directory"] = path.parent.name + metadata["relative_path"] = str(path.relative_to(path.anchor)) + + # Conta depth do diretório + parts = path.parts + metadata["path_depth"] = len(parts) - 1 # -1 para não contar a raiz + + # Adiciona partes do caminho como tags + if len(parts) > 1: + metadata["path_parts"] = list(parts[1:]) # Remove a raiz + + return metadata + + def _get_mime_type(self, file_path: Path) -> str | None: + """Obtém MIME type do arquivo.""" + try: + mime_type, _ = mimetypes.guess_type(str(file_path)) + return mime_type + except Exception: + return None diff --git a/lambda_rag_lite/processors/__init__.py b/lambda_rag_lite/processors/__init__.py new file mode 100644 index 0000000..53e49f7 --- /dev/null +++ b/lambda_rag_lite/processors/__init__.py @@ -0,0 +1,7 @@ +"""Processadores de texto.""" + +from .text_processor import TextProcessor + +__all__ = [ + "TextProcessor", +] diff --git a/lambda_rag_lite/processors/text_processor.py b/lambda_rag_lite/processors/text_processor.py new file mode 100644 index 0000000..f2d7631 --- /dev/null +++ b/lambda_rag_lite/processors/text_processor.py @@ -0,0 +1,171 @@ +""" +Processador de texto. +""" + +from __future__ import annotations + +import re +from typing import Any + +from ..config import ChunkingConfig, TextProcessingConfig +from ..strategies.chunking import TextChunker +from ..text_cleaning import calculate_text_stats, extract_keywords + + +class TextProcessor: + """ + Processador avançado de texto com configurações personalizáveis. + + """ + + def __init__( + self, + processing_config: TextProcessingConfig | None = None, + chunking_config: ChunkingConfig | None = None, + ): + """ + Inicializa o processador de texto. + + Args: + processing_config: Configuração de processamento de texto + chunking_config: Configuração de chunking + """ + self.processing_config = processing_config or TextProcessingConfig() + self.chunking_config = chunking_config or ChunkingConfig() + self.chunker = TextChunker() + + def process_text( + self, text: str | None, metadata: dict[str, Any] | None = None + ) -> list[dict[str, Any]]: + """ + Processa texto completo retornando chunks com metadados. + + Args: + text: Texto para processar + metadata: Metadados base para adicionar aos chunks + + Returns: + Lista de dicionários com texto e metadados + """ + if not text: + return [] + + # Limpa texto se configurado + processed_text = self._clean_text_if_needed(text) + + # Divide em chunks + chunks = self.chunker.chunk(processed_text, self.chunking_config) + + results = [] + base_metadata = metadata or {} + + for i, chunk in enumerate(chunks): + chunk_metadata = base_metadata.copy() + chunk_metadata.update(self._create_chunk_metadata(chunk, i, len(chunks))) + + # Adiciona palavras-chave se configurado + if self.processing_config.extract_keywords: + keywords = extract_keywords( + chunk, + self.processing_config.keyword_min_length, + self.processing_config.keyword_max_words, + ) + chunk_metadata["keywords"] = keywords + + # Adiciona estatísticas se configurado + if self.processing_config.calculate_stats: + stats = calculate_text_stats(chunk) + chunk_metadata.update(stats) + + results.append({"text": chunk, "metadata": chunk_metadata}) + + return results + + def clean_text( + self, text: str | None, remove_extra_whitespace: bool | None = None + ) -> str: + """ + Limpa e normaliza texto para melhor processamento. + + Args: + text: Texto para limpar + remove_extra_whitespace: Override da configuração padrão + + Returns: + Texto limpo + """ + if not text: + return "" + + # Remove caracteres de controle exceto quebras de linha e tabs + text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", "", text) + + should_remove_whitespace = ( + remove_extra_whitespace + if remove_extra_whitespace is not None + else self.processing_config.remove_extra_whitespace + ) + + if should_remove_whitespace: + # Remove espaços múltiplos + text = re.sub(r" +", " ", text) + # Remove quebras de linha múltiplas + text = re.sub(r"\n\s*\n", "\n\n", text) + # Remove espaços no início e fim de linhas + text = "\n".join(line.strip() for line in text.split("\n")) + + return text.strip() + + def extract_keywords( + self, + text: str | None, + min_length: int | None = None, + max_words: int | None = None, + ) -> list[str]: + """ + Extrai palavras-chave de um texto. + + Args: + text: Texto para extrair palavras-chave + min_length: Comprimento mínimo das palavras + max_words: Número máximo de palavras para retornar + + Returns: + Lista de palavras-chave + """ + if not text: + return [] + + min_len = min_length or self.processing_config.keyword_min_length + max_words_count = max_words or self.processing_config.keyword_max_words + + # Usa a função do módulo text_cleaning mas com configurações do processador + return extract_keywords(text, min_len, max_words_count) + + def calculate_text_stats(self, text: str | None) -> dict[str, Any]: + """ + Calcula estatísticas básicas de um texto. + + Args: + text: Texto para analisar + + Returns: + Dicionário com estatísticas + """ + return calculate_text_stats(text) + + def _clean_text_if_needed(self, text: str) -> str: + """Limpa texto se a configuração estiver habilitada.""" + if self.processing_config.clean_text: + return self.clean_text(text) + return text + + def _create_chunk_metadata( + self, chunk: str, index: int, total_chunks: int + ) -> dict[str, Any]: + """Cria metadados básicos para um chunk.""" + return { + "chunk_index": index, + "chunk_count": total_chunks, + "chunk_size": len(chunk), + } diff --git a/lambda_rag_lite/strategies/__init__.py b/lambda_rag_lite/strategies/__init__.py new file mode 100644 index 0000000..931d169 --- /dev/null +++ b/lambda_rag_lite/strategies/__init__.py @@ -0,0 +1,17 @@ +"""Estratégias de chunking de texto.""" + +from .chunking import ( + CharacterChunkingStrategy, + ChunkingConfig, + ChunkingStrategy, + SeparatorChunkingStrategy, + TextChunker, +) + +__all__ = [ + "ChunkingConfig", + "ChunkingStrategy", + "SeparatorChunkingStrategy", + "CharacterChunkingStrategy", + "TextChunker", +] diff --git a/lambda_rag_lite/strategies/chunking.py b/lambda_rag_lite/strategies/chunking.py new file mode 100644 index 0000000..8aa4225 --- /dev/null +++ b/lambda_rag_lite/strategies/chunking.py @@ -0,0 +1,391 @@ +""" +Estratégias de chunking de texto implementando o padrão Strategy. + +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod + +from ..config import ChunkingConfig + + +class ChunkingStrategy(ABC): + """Interface base para estratégias de chunking.""" + + @abstractmethod + def chunk(self, text: str, config: ChunkingConfig) -> list[str]: + """ + Divide texto em chunks usando a estratégia específica. + + Args: + text: Texto para dividir + config: Configuração de chunking + + Returns: + Lista de chunks de texto + """ + pass + + +class SeparatorChunkingStrategy(ChunkingStrategy): + """Estratégia de chunking baseada em separadores.""" + + def chunk(self, text: str, config: ChunkingConfig) -> list[str]: + """ + Divide texto usando separadores preferenciais. + + Args: + text: Texto para dividir + config: Configuração de chunking + + Returns: + Lista de chunks + """ + # Validações iniciais + if not self._is_valid_input(text, config): + return [] + + if len(text) <= config.chunk_size: + return [text] + + # Escolhe estratégia baseada na presença do separador + if config.separator in text: + return self._chunk_by_separator(text, config) + else: + return self._chunk_by_characters(text, config) + + def _is_valid_input(self, text: str, config: ChunkingConfig) -> bool: + """Valida entrada para chunking.""" + # Se chunk_size é 0 ou negativo, ou text é vazio, retorna False + return bool(text and config.chunk_size > 0) + + def _chunk_by_separator(self, text: str, config: ChunkingConfig) -> list[str]: + """Divide texto usando separadores.""" + sections = text.split(config.separator) + chunks = [] + current_chunk = "" + + for section in sections: + current_chunk = self._process_section( + section, current_chunk, chunks, config + ) + + # Adiciona último chunk se não vazio + if current_chunk: + chunks.append(current_chunk.strip()) + + return self._filter_valid_chunks(chunks, config) + + def _chunk_by_characters(self, text: str, config: ChunkingConfig) -> list[str]: + """Divide texto usando estratégia de caracteres.""" + character_strategy = CharacterChunkingStrategy() + return character_strategy.chunk(text, config) + + def _process_section( + self, + section: str, + current_chunk: str, + chunks: list[str], + config: ChunkingConfig, + ) -> str: + """Processa uma seção do texto dividido por separador.""" + # Se seção é muito grande, divide ela separadamente + if len(section) > config.chunk_size: + if current_chunk: + chunks.append(current_chunk.strip()) + current_chunk = "" + + # Divide seção grande usando estratégia de caracteres + large_section_chunks = self._split_large_section(section, config) + chunks.extend(large_section_chunks) + return "" + + # Verifica se pode adicionar seção ao chunk atual + elif self._can_add_section(current_chunk, section, config): + return ( + current_chunk + config.separator + section if current_chunk else section + ) + + # Se não cabe, salva chunk atual e inicia novo + else: + if current_chunk: + chunks.append(current_chunk.strip()) + return section + + def _can_add_section( + self, current_chunk: str, section: str, config: ChunkingConfig + ) -> bool: + """Verifica se uma seção pode ser adicionada ao chunk atual.""" + if not current_chunk: + return True + + total_length = len(current_chunk) + len(section) + len(config.separator) + return total_length <= config.chunk_size + + def _split_large_section(self, section: str, config: ChunkingConfig) -> list[str]: + """Divide uma seção muito grande em chunks menores.""" + chunks = [] + start = 0 + + while start < len(section): + end = start + config.chunk_size + + if end >= len(section): + chunks.append(section[start:].strip()) + break + + # Procura quebra natural + break_point = self._find_natural_break(section, end, config) + + chunks.append(section[start:break_point].strip()) + start = max(start + 1, break_point - config.chunk_overlap) + + return chunks + + def _find_natural_break( + self, text: str, position: int, config: ChunkingConfig + ) -> int: + """Encontra uma quebra natural próxima à posição especificada.""" + search_range = config.natural_break_search_range + search_start = max(0, position - search_range) + search_end = min(len(text), position + search_range) + + # Procura por caracteres de quebra natural + for i in range(search_start, search_end): + if i < len(text) and text[i] in config.natural_break_chars: + return i + + return position + + def _filter_valid_chunks( + self, chunks: list[str], config: ChunkingConfig + ) -> list[str]: + """Remove chunks vazios ou muito pequenos.""" + return [ + chunk + for chunk in chunks + if chunk.strip() and len(chunk.strip()) >= config.min_chunk_size + ] + + +class CharacterChunkingStrategy(ChunkingStrategy): + """Estratégia de chunking baseada em contagem de caracteres.""" + + def chunk(self, text: str, config: ChunkingConfig) -> list[str]: + """ + Divide texto por contagem de caracteres com quebras naturais. + + Args: + text: Texto para dividir + config: Configuração de chunking + + Returns: + Lista de chunks + """ + # Validações iniciais + if not self._is_valid_input(text, config): + return [] + + if len(text) <= config.chunk_size: + return [text] + + return self._split_text_into_chunks(text, config) + + def _is_valid_input(self, text: str, config: ChunkingConfig) -> bool: + """Valida entrada para chunking.""" + return bool(text and config.chunk_size > 0) + + def _split_text_into_chunks(self, text: str, config: ChunkingConfig) -> list[str]: + """Divide texto em chunks com quebras naturais.""" + chunks = [] + start = 0 + + while start < len(text): + chunk, actual_end = self._extract_next_chunk_with_end(text, start, config) + if chunk: + chunks.append(chunk) + + # Calcula próximo início baseado na sobreposição + if config.chunk_overlap > 0 and actual_end < len(text): + next_start = max(start + 1, actual_end - config.chunk_overlap) + else: + next_start = actual_end + + start = next_start + + return self._filter_valid_chunks(chunks, config) + + def _extract_next_chunk_with_end( + self, text: str, start: int, config: ChunkingConfig + ) -> tuple[str, int]: + """Extrai o próximo chunk e retorna também a posição final.""" + end = start + config.chunk_size + + if end >= len(text): + return text[start:].strip(), len(text) + + # Procura quebra natural próxima ao fim + break_point = self._find_sentence_break(text, end, config) + return text[start:break_point].strip(), break_point + + def _extract_next_chunk(self, text: str, start: int, config: ChunkingConfig) -> str: + """Extrai o próximo chunk do texto.""" + end = start + config.chunk_size + + if end >= len(text): + return text[start:].strip() + + # Procura quebra natural próxima ao fim + break_point = self._find_sentence_break(text, end, config) + return text[start:break_point].strip() + + def _find_sentence_break( + self, text: str, position: int, config: ChunkingConfig + ) -> int: + """Encontra uma quebra de sentença próxima à posição.""" + search_range = config.natural_break_search_range + search_start = max(0, position - search_range) + search_end = min(len(text), position + search_range) + + # Primeiro procura por caracteres de fim de sentença, de trás para frente + for i in range(position, search_start - 1, -1): + if i < len(text) and text[i] in config.sentence_break_chars: + return i + 1 + + # Se não encontrou, procura por quebras naturais, de trás para frente + for i in range(position, search_start - 1, -1): + if i < len(text) and text[i] in config.natural_break_chars: + return i + + # Se ainda não encontrou, procura para frente + for i in range(position, search_end): + if i < len(text) and text[i] in config.sentence_break_chars: + return i + 1 + + for i in range(position, search_end): + if i < len(text) and text[i] in config.natural_break_chars: + return i + + return position + + def _calculate_next_start( + self, current_start: int, break_point: int, overlap: int + ) -> int: + """Calcula a posição inicial do próximo chunk.""" + if overlap <= 0: + return break_point + + # Evita loop infinito garantindo progresso mínimo + next_start = break_point - overlap + return max(current_start + 1, next_start) + + def _filter_valid_chunks( + self, chunks: list[str], config: ChunkingConfig + ) -> list[str]: + """Remove chunks vazios ou muito pequenos.""" + return [ + chunk + for chunk in chunks + if chunk.strip() and len(chunk.strip()) >= config.min_chunk_size + ] + + +class TextChunker: + """ + Classe principal para chunking de texto usando diferentes estratégias. + + Substitui a função chunk_text() original com uma arquitetura mais modular. + """ + + def __init__(self, strategy: ChunkingStrategy | None = None): + """ + Inicializa o chunker com uma estratégia específica. + + Args: + strategy: Estratégia de chunking a usar (padrão: SeparatorChunkingStrategy) + """ + self.strategy = strategy or SeparatorChunkingStrategy() + + def chunk( + self, + text: str | None, + config: ChunkingConfig | None = None, + chunk_size: int | None = None, + chunk_overlap: int | None = None, + separator: str | None = None, + ) -> list[str]: + """ + Divide texto em chunks usando a estratégia configurada. + + Args: + text: Texto para dividir + config: Configuração de chunking (prioridade sobre parâmetros individuais) + chunk_size: Tamanho do chunk (compatibilidade com API antiga) + chunk_overlap: Overlap entre chunks (compatibilidade com API antiga) + separator: Separador preferencial (compatibilidade com API antiga) + + Returns: + Lista de chunks de texto + """ + if not text: + return [] + + # Se não foi fornecida config, cria uma com os parâmetros + if config is None: + # Ajusta chunk_overlap se for maior que chunk_size + effective_chunk_size = chunk_size or 1000 + effective_chunk_overlap = chunk_overlap or 200 + if effective_chunk_overlap >= effective_chunk_size: + effective_chunk_overlap = max(0, effective_chunk_size - 1) + + config = ChunkingConfig( + chunk_size=effective_chunk_size, + chunk_overlap=effective_chunk_overlap, + separator=separator or "\n\n", + ) + + return self.strategy.chunk(text, config) + + def set_strategy(self, strategy: ChunkingStrategy): + """ + Altera a estratégia de chunking. + + Args: + strategy: Nova estratégia a usar + """ + self.strategy = strategy + + +# Função de conveniência para manter compatibilidade com API antiga +def chunk_text( + text: str | None, + chunk_size: int = 1000, + chunk_overlap: int = 200, + separator: str = "\n\n", +) -> list[str]: + """ + Função de conveniência para chunking de texto (compatibilidade). + + Esta função mantém a API original mas usa a nova arquitetura internamente. + + Args: + text: Texto para dividir + chunk_size: Tamanho máximo de cada chunk + chunk_overlap: Sobreposição entre chunks + separator: Separador preferencial + + Returns: + Lista de chunks de texto + """ + # Handle edge cases before config creation + if not text or chunk_size <= 0: + return [] + + chunker = TextChunker() + return chunker.chunk( + text=text, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + separator=separator, + ) diff --git a/lambda_rag_lite/text_cleaning.py b/lambda_rag_lite/text_cleaning.py new file mode 100644 index 0000000..f3a0358 --- /dev/null +++ b/lambda_rag_lite/text_cleaning.py @@ -0,0 +1,180 @@ +""" +Funções utilitárias para limpeza e formatação de texto. + +""" + +from __future__ import annotations + +import re + + +def clean_text(text: str | None, remove_extra_whitespace: bool = True) -> str: + """ + Limpa e normaliza texto para melhor processamento. + + Args: + text: Texto para limpar + remove_extra_whitespace: Se deve remover espaços extras + + Returns: + Texto limpo + """ + if not text: + return "" + + # Remove caracteres de controle exceto quebras de linha e tabs + text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", "", text) + + if remove_extra_whitespace: + # Remove espaços múltiplos + text = re.sub(r" +", " ", text) + # Remove quebras de linha múltiplas + text = re.sub(r"\n\s*\n", "\n\n", text) + # Remove espaços no início e fim de linhas + text = "\n".join(line.strip() for line in text.split("\n")) + + return text.strip() + + +def extract_keywords( + text: str | None, min_length: int = 3, max_words: int = 20 +) -> list[str]: + """ + Extrai palavras-chave simples de um texto. + + Args: + text: Texto para extrair palavras-chave + min_length: Comprimento mínimo das palavras + max_words: Número máximo de palavras para retornar + + Returns: + Lista de palavras-chave + """ + if not text: + return [] + + # Remove pontuação e converte para minúsculas + cleaned = re.sub(r"[^\w\s]", " ", text.lower()) + words = cleaned.split() + + # Filtra palavras muito curtas e stop words básicas + stop_words = { + "a", + "an", + "and", + "are", + "as", + "at", + "be", + "by", + "for", + "from", + "has", + "he", + "in", + "is", + "it", + "its", + "of", + "on", + "that", + "the", + "to", + "was", + "will", + "with", + "o", + "e", + "de", + "do", + "da", + "em", + "um", + "uma", + "para", + "com", + "por", + "no", + "na", + "os", + "dos", + "das", + } + + keywords = [] + word_count = {} + + for word in words: + if len(word) >= min_length and word not in stop_words and word.isalpha(): + word_count[word] = word_count.get(word, 0) + 1 + + # Ordena por frequência + sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True) + keywords = [word for word, _ in sorted_words[:max_words]] + + return keywords + + +def calculate_text_stats(text: str | None) -> dict: + """ + Calcula estatísticas básicas de um texto. + + Args: + text: Texto para analisar + + Returns: + Dicionário com estatísticas + """ + if not text: + return { + "char_count": 0, + "word_count": 0, + "line_count": 0, + "paragraph_count": 0, + "avg_words_per_line": 0, + "avg_chars_per_word": 0, + } + + char_count = len(text) + words = text.split() + word_count = len(words) + lines = text.split("\n") + line_count = len(lines) + paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()] + paragraph_count = len(paragraphs) + + avg_words_per_line = word_count / line_count if line_count > 0 else 0 + avg_chars_per_word = char_count / word_count if word_count > 0 else 0 + + return { + "char_count": char_count, + "word_count": word_count, + "line_count": line_count, + "paragraph_count": paragraph_count, + "avg_words_per_line": round(avg_words_per_line, 2), + "avg_chars_per_word": round(avg_chars_per_word, 2), + } + + +def format_file_size(size_bytes: int) -> str: + """ + Formata tamanho de arquivo em formato legível. + + Args: + size_bytes: Tamanho em bytes + + Returns: + String formatada (ex: "1.5 MB") + """ + if size_bytes == 0: + return "0 B" + + size_names = ["B", "KB", "MB", "GB", "TB"] + i = 0 + size = float(size_bytes) + + while size >= 1024.0 and i < len(size_names) - 1: + size /= 1024.0 + i += 1 + + return f"{size:.1f} {size_names[i]}" diff --git a/lambda_rag_lite/utils.py b/lambda_rag_lite/utils.py index 6e7d806..14a02c9 100644 --- a/lambda_rag_lite/utils.py +++ b/lambda_rag_lite/utils.py @@ -1,15 +1,19 @@ """ Utilitários e helpers para Lambda RAG Lite. -Contém funções auxiliares para processamento de texto, configuração -e outras operações comuns. """ from __future__ import annotations -import re from typing import List, Optional +from .processors.text_processor import TextProcessor as NewTextProcessor +from .strategies.chunking import chunk_text as new_chunk_text +from .text_cleaning import calculate_text_stats as calculate_text_stats_func +from .text_cleaning import clean_text as clean_text_func +from .text_cleaning import extract_keywords as extract_keywords_func +from .text_cleaning import format_file_size as format_file_size_func + def chunk_text( text: str | None, @@ -29,92 +33,7 @@ def chunk_text( Returns: Lista de chunks de texto """ - if not text or chunk_size <= 0: - return [] - - if len(text) <= chunk_size: - return [text] - - chunks = [] - - # Tenta dividir usando o separador preferencial primeiro - if separator in text: - sections = text.split(separator) - current_chunk = "" - - for section in sections: - # Se a seção sozinha é maior que chunk_size, divide ela - if len(section) > chunk_size: - if current_chunk: - chunks.append(current_chunk.strip()) - current_chunk = "" - - # Divide seção grande por caracteres para evitar recursão infinita - sub_start = 0 - while sub_start < len(section): - sub_end = sub_start + chunk_size - if sub_end >= len(section): - chunks.append(section[sub_start:].strip()) - break - - # Procura quebra natural - break_point = sub_end - for i in range( - max(0, sub_end - 50), min(len(section), sub_end + 50) - ): - if section[i] in " \n\t": - break_point = i - break - - chunks.append(section[sub_start:break_point].strip()) - sub_start = max(sub_start + 1, break_point - chunk_overlap) - - # Se adicionar esta seção não excede o limite - elif len(current_chunk) + len(section) + len(separator) <= chunk_size: - if current_chunk: - current_chunk += separator + section - else: - current_chunk = section - - # Se excede, salva chunk atual e inicia novo - else: - if current_chunk: - chunks.append(current_chunk.strip()) - current_chunk = section - - # Adiciona último chunk se não vazio - if current_chunk: - chunks.append(current_chunk.strip()) - - else: - # Divisão por caracteres quando separador não está presente - start = 0 - while start < len(text): - end = start + chunk_size - - if end >= len(text): - chunks.append(text[start:].strip()) - break - - # Tenta encontrar uma quebra natural próxima ao fim - break_point = end - search_start = max(0, end - 50) - search_end = min(len(text), end + 50) - - for i in range(search_start, search_end): - if i < len(text) and text[i] in ".!?\n": - break_point = i + 1 - break - - chunks.append(text[start:break_point].strip()) - start = ( - max(0, break_point - chunk_overlap) - if chunk_overlap > 0 - else break_point - ) - - # Remove chunks vazios - return [chunk for chunk in chunks if chunk.strip()] + return new_chunk_text(text, chunk_size, chunk_overlap, separator) def clean_text(text: str | None, remove_extra_whitespace: bool = True) -> str: @@ -128,21 +47,7 @@ def clean_text(text: str | None, remove_extra_whitespace: bool = True) -> str: Returns: Texto limpo """ - if not text: - return "" - - # Remove caracteres de controle exceto quebras de linha e tabs - text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]", "", text) - - if remove_extra_whitespace: - # Remove espaços múltiplos - text = re.sub(r" +", " ", text) - # Remove quebras de linha múltiplas - text = re.sub(r"\n\s*\n", "\n\n", text) - # Remove espaços no início e fim de linhas - text = "\n".join(line.strip() for line in text.split("\n")) - - return text.strip() + return clean_text_func(text, remove_extra_whitespace) def extract_keywords( @@ -159,71 +64,7 @@ def extract_keywords( Returns: Lista de palavras-chave """ - if not text: - return [] - - # Remove pontuação e converte para minúsculas - cleaned = re.sub(r"[^\w\s]", " ", text.lower()) - words = cleaned.split() - - # Filtra palavras muito curtas e stop words básicas - stop_words = { - "a", - "an", - "and", - "are", - "as", - "at", - "be", - "by", - "for", - "from", - "has", - "he", - "in", - "is", - "it", - "its", - "of", - "on", - "that", - "the", - "to", - "was", - "will", - "with", - "o", - "a", - "e", - "de", - "do", - "da", - "em", - "um", - "uma", - "para", - "com", - "por", - "no", - "na", - "os", - "as", - "dos", - "das", - } - - keywords = [] - word_count = {} - - for word in words: - if len(word) >= min_length and word not in stop_words and word.isalpha(): - word_count[word] = word_count.get(word, 0) + 1 - - # Ordena por frequência - sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True) - keywords = [word for word, _ in sorted_words[:max_words]] - - return keywords + return extract_keywords_func(text, min_length, max_words) def calculate_text_stats(text: str | None) -> dict: @@ -236,35 +77,7 @@ def calculate_text_stats(text: str | None) -> dict: Returns: Dicionário com estatísticas """ - if not text: - return { - "char_count": 0, - "word_count": 0, - "line_count": 0, - "paragraph_count": 0, - "avg_words_per_line": 0, - "avg_chars_per_word": 0, - } - - char_count = len(text) - words = text.split() - word_count = len(words) - lines = text.split("\n") - line_count = len(lines) - paragraphs = [p.strip() for p in text.split("\n\n") if p.strip()] - paragraph_count = len(paragraphs) - - avg_words_per_line = word_count / line_count if line_count > 0 else 0 - avg_chars_per_word = char_count / word_count if word_count > 0 else 0 - - return { - "char_count": char_count, - "word_count": word_count, - "line_count": line_count, - "paragraph_count": paragraph_count, - "avg_words_per_line": round(avg_words_per_line, 2), - "avg_chars_per_word": round(avg_chars_per_word, 2), - } + return calculate_text_stats_func(text) def format_file_size(size_bytes: int) -> str: @@ -277,23 +90,13 @@ def format_file_size(size_bytes: int) -> str: Returns: String formatada (ex: "1.5 MB") """ - if size_bytes == 0: - return "0 B" - - size_names = ["B", "KB", "MB", "GB", "TB"] - i = 0 - size = float(size_bytes) - - while size >= 1024.0 and i < len(size_names) - 1: - size /= 1024.0 - i += 1 - - return f"{size:.1f} {size_names[i]}" + return format_file_size_func(size_bytes) class TextProcessor: """ Classe para processamento avançado de texto com configurações personalizáveis. + """ def __init__( @@ -312,10 +115,43 @@ def __init__( clean_text: Se deve limpar o texto extract_keywords: Se deve extrair palavras-chave """ - self.chunk_size = chunk_size - self.chunk_overlap = chunk_overlap - self.clean_text = clean_text - self.extract_keywords = extract_keywords + # Cria configurações compatíveis para a nova implementação + from .config import ChunkingConfig, TextProcessingConfig + + # Ajusta chunk_overlap se for maior que chunk_size + if chunk_overlap >= chunk_size: + chunk_overlap = max(0, chunk_size // 4) # 25% do chunk_size + + chunking_config = ChunkingConfig( + chunk_size=chunk_size, chunk_overlap=chunk_overlap + ) + + processing_config = TextProcessingConfig( + clean_text=clean_text, extract_keywords=extract_keywords + ) + + # Usa a nova implementação internamente + self._processor = NewTextProcessor(processing_config, chunking_config) + + @property + def chunk_size(self) -> int: + """Retorna o tamanho do chunk configurado.""" + return self._processor.chunking_config.chunk_size + + @property + def chunk_overlap(self) -> int: + """Retorna a sobreposição configurada.""" + return self._processor.chunking_config.chunk_overlap + + @property + def clean_text(self) -> bool: + """Retorna se a limpeza de texto está habilitada.""" + return self._processor.processing_config.clean_text + + @property + def extract_keywords(self) -> bool: + """Retorna se a extração de palavras-chave está habilitada.""" + return self._processor.processing_config.extract_keywords def process_text( self, text: str | None, metadata: Optional[dict] = None @@ -330,37 +166,4 @@ def process_text( Returns: Lista de dicionários com texto e metadados """ - if not text: - return [] - - # Limpa texto se configurado - processed_text = clean_text(text) if self.clean_text else text - - # Divide em chunks - chunks = chunk_text(processed_text, self.chunk_size, self.chunk_overlap) - - results = [] - base_metadata = metadata or {} - - for i, chunk in enumerate(chunks): - chunk_metadata = base_metadata.copy() - chunk_metadata.update( - { - "chunk_index": i, - "chunk_count": len(chunks), - "chunk_size": len(chunk), - } - ) - - # Adiciona palavras-chave se configurado - if self.extract_keywords: - keywords = extract_keywords(chunk) - chunk_metadata["keywords"] = keywords - - # Adiciona estatísticas - stats = calculate_text_stats(chunk) - chunk_metadata.update(stats) - - results.append({"text": chunk, "metadata": chunk_metadata}) - - return results + return self._processor.process_text(text, metadata) diff --git a/lambda_rag_lite/vectorstore.py b/lambda_rag_lite/vectorstore.py index 44652bd..5344476 100644 --- a/lambda_rag_lite/vectorstore.py +++ b/lambda_rag_lite/vectorstore.py @@ -1,8 +1,7 @@ """ -Vector Store implementation para Lambda RAG Lite. - Fornece uma implementação pura em Python de um vector store compatível com a interface LangChain, usando similaridade coseno para busca. + """ from __future__ import annotations diff --git a/pyproject.toml b/pyproject.toml index 9d7f5dc..0e99485 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "lambda-rag-lite" -version = "0.1.0" +version = "0.2.0" description = "Uma biblioteca Python leve para RAG (Retrieval-Augmented Generation) compatível com LangChain, que não requer dependências pesadas como NumPy ou bibliotecas de ML." readme = "README.md" requires-python = ">=3.10" @@ -17,6 +17,7 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Scientific/Engineering :: Information Analysis", "Topic :: Text Processing :: Indexing", diff --git a/uv.lock b/uv.lock index 1d3c590..78b90a0 100644 --- a/uv.lock +++ b/uv.lock @@ -395,7 +395,7 @@ wheels = [ [[package]] name = "lambda-rag-lite" -version = "0.1.0" +version = "0.2.0" source = { editable = "." } dependencies = [ { name = "langchain-core" },