diff --git a/migrate.sql b/migrate.sql new file mode 100644 index 0000000..0459ba0 --- /dev/null +++ b/migrate.sql @@ -0,0 +1,122 @@ +-- Migration: create full database schema +-- Run with: psql earlytech < migrate.sql + +BEGIN; + +-- Extensions +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +CREATE EXTENSION IF NOT EXISTS vector; + +-- Articles (scrapper columns + Rust alias columns) +CREATE TABLE IF NOT EXISTS articles ( + id TEXT PRIMARY KEY, + source_site TEXT NOT NULL, + title TEXT, + description TEXT, + full_content TEXT, + content_hash TEXT UNIQUE, + author_info TEXT, + keywords TEXT, + content_url TEXT NOT NULL, + published_date TIMESTAMPTZ, + item_type TEXT, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + primary_subject TEXT, + secondary_subject TEXT, + primary_organizations JSONB, + secondary_organizations JSONB, + primary_event_type TEXT, + secondary_event_type TEXT, + cluster_id INTEGER, + -- Rust server alias columns + url TEXT, + source TEXT, + summary TEXT, + authors TEXT[], + content TEXT, + scraped_at TIMESTAMPTZ +); + +-- Embeddings +CREATE TABLE IF NOT EXISTS embeddings ( + id SERIAL PRIMARY KEY, + article_id TEXT NOT NULL UNIQUE REFERENCES articles(id) ON DELETE CASCADE, + embedding vector(1536) NOT NULL, + embedding_model TEXT, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX IF NOT EXISTS idx_cluster_id ON articles(cluster_id); + +-- Users (UUID, auth-ready) +DROP TABLE IF EXISTS user_article_delivery CASCADE; +DROP TABLE IF EXISTS user_keyword_embeddings CASCADE; +DROP TABLE IF EXISTS user_keywords CASCADE; +DROP TABLE IF EXISTS users CASCADE; + +CREATE TABLE users ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + name TEXT NOT NULL, + email TEXT NOT NULL UNIQUE, + password_hash TEXT NOT NULL DEFAULT '', + role TEXT NOT NULL DEFAULT 'user', + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP +); + +-- User keywords +CREATE TABLE user_keywords ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + keyword TEXT NOT NULL, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_id, keyword) +); + +-- Keyword embeddings +CREATE TABLE user_keyword_embeddings ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + keyword_id UUID NOT NULL UNIQUE REFERENCES user_keywords(id) ON DELETE CASCADE, + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + keyword TEXT NOT NULL, + embedding vector(1536) NOT NULL, + embedding_model TEXT, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP +); + +-- Article delivery +CREATE TABLE user_article_delivery ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + article_id TEXT NOT NULL REFERENCES articles(id) ON DELETE CASCADE, + keyword_id UUID REFERENCES user_keywords(id) ON DELETE SET NULL, + similarity_score FLOAT NOT NULL, + delivered_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + UNIQUE(user_id, article_id) +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_user_keywords ON user_keywords(user_id); +CREATE INDEX IF NOT EXISTS idx_keyword_embeddings ON user_keyword_embeddings(user_id); +CREATE INDEX IF NOT EXISTS idx_user_article_delivery ON user_article_delivery(user_id); + +-- Trigger: auto-populate Rust alias columns on article insert/update +CREATE OR REPLACE FUNCTION sync_article_aliases() RETURNS TRIGGER AS $$ +BEGIN + NEW.url := COALESCE(NEW.url, NEW.content_url); + NEW.source := COALESCE(NEW.source, NEW.source_site); + NEW.summary := COALESCE(NEW.summary, NEW.description); + NEW.content := COALESCE(NEW.content, NEW.full_content); + NEW.scraped_at := COALESCE(NEW.scraped_at, NEW.created_at, CURRENT_TIMESTAMP); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS trg_sync_article_aliases ON articles; +CREATE TRIGGER trg_sync_article_aliases + BEFORE INSERT OR UPDATE ON articles + FOR EACH ROW + EXECUTE FUNCTION sync_article_aliases(); + +COMMIT; diff --git a/scrapper/database.py b/scrapper/database.py index a8ba3e4..e1317d8 100644 --- a/scrapper/database.py +++ b/scrapper/database.py @@ -46,12 +46,14 @@ def setup_database(self): cur = conn.cursor() cur.execute("CREATE EXTENSION IF NOT EXISTS vector") + cur.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp"') + cur.execute( """ CREATE TABLE IF NOT EXISTS articles ( id TEXT PRIMARY KEY, source_site TEXT NOT NULL, - title TEXT NOT NULL, + title TEXT, description TEXT, full_content TEXT, content_hash TEXT UNIQUE, @@ -68,7 +70,13 @@ def setup_database(self): secondary_organizations JSONB, primary_event_type TEXT, secondary_event_type TEXT, - cluster_id INTEGER + cluster_id INTEGER, + url TEXT, + source TEXT, + summary TEXT, + authors TEXT[], + content TEXT, + scraped_at TIMESTAMPTZ ) """ ) @@ -90,9 +98,11 @@ def setup_database(self): cur.execute( """ CREATE TABLE IF NOT EXISTS users ( - id SERIAL PRIMARY KEY, - username TEXT NOT NULL UNIQUE, - email TEXT UNIQUE, + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + name TEXT NOT NULL, + email TEXT NOT NULL UNIQUE, + password_hash TEXT NOT NULL DEFAULT '', + role TEXT NOT NULL DEFAULT 'user', created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ) @@ -102,8 +112,8 @@ def setup_database(self): cur.execute( """ CREATE TABLE IF NOT EXISTS user_keywords ( - id SERIAL PRIMARY KEY, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, keyword TEXT NOT NULL, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, UNIQUE(user_id, keyword) @@ -114,9 +124,9 @@ def setup_database(self): cur.execute( f""" CREATE TABLE IF NOT EXISTS user_keyword_embeddings ( - id SERIAL PRIMARY KEY, - keyword_id INTEGER NOT NULL UNIQUE REFERENCES user_keywords(id) ON DELETE CASCADE, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + keyword_id UUID NOT NULL UNIQUE REFERENCES user_keywords(id) ON DELETE CASCADE, + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, keyword TEXT NOT NULL, embedding vector({self.embedding_dimension}) NOT NULL, embedding_model TEXT, @@ -128,10 +138,10 @@ def setup_database(self): cur.execute( """ CREATE TABLE IF NOT EXISTS user_article_delivery ( - id SERIAL PRIMARY KEY, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, article_id TEXT NOT NULL REFERENCES articles(id) ON DELETE CASCADE, - keyword_id INTEGER REFERENCES user_keywords(id) ON DELETE SET NULL, + keyword_id UUID REFERENCES user_keywords(id) ON DELETE SET NULL, similarity_score FLOAT NOT NULL, delivered_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, UNIQUE(user_id, article_id) @@ -143,6 +153,125 @@ def setup_database(self): cur.execute("CREATE INDEX IF NOT EXISTS idx_keyword_embeddings ON user_keyword_embeddings(user_id)") cur.execute("CREATE INDEX IF NOT EXISTS idx_user_article_delivery ON user_article_delivery(user_id)") + cur.execute( + """ + CREATE TABLE IF NOT EXISTS sync_log ( + id SERIAL PRIMARY KEY, + source TEXT NOT NULL, + mode TEXT NOT NULL, + articles_count INTEGER NOT NULL DEFAULT 0, + synced_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + + def article_exists(self, article_id: str) -> bool: + """Check if an article already exists in the database.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute("SELECT 1 FROM articles WHERE id = %s", (article_id,)) + return cur.fetchone() is not None + + def article_exists_by_hash(self, content_hash: str) -> bool: + """Check if an article with the same content hash exists.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute("SELECT 1 FROM articles WHERE content_hash = %s", (content_hash,)) + return cur.fetchone() is not None + + def save_article(self, article: dict) -> bool: + """Save an article to the database. + + Args: + article: Normalized article dict from scraper. + + Returns: + True if saved successfully, False otherwise. + """ + import hashlib + + full_content = article.get("full_content", article.get("description", "")) + content_hash = hashlib.sha256(full_content.encode()).hexdigest() if full_content else None + + try: + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO articles ( + id, source_site, title, description, full_content, + content_hash, author_info, keywords, content_url, + published_date, item_type + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (id) DO NOTHING + """, + ( + article["id"], + article.get("source_site", ""), + article.get("title"), + article.get("description"), + full_content, + content_hash, + article.get("author_info"), + article.get("keywords"), + article.get("content_url", ""), + article.get("published_date"), + article.get("item_type", "article"), + ), + ) + return cur.rowcount > 0 + except Exception: + return False + + def save_embedding( + self, article_id: str, embedding: "np.ndarray", model: str = "text-embedding-3-small" + ) -> None: + """Save article embedding to the database.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO embeddings (article_id, embedding, embedding_model) + VALUES (%s, %s::vector, %s) + ON CONFLICT (article_id) DO UPDATE + SET embedding = EXCLUDED.embedding, embedding_model = EXCLUDED.embedding_model + """, + (article_id, embedding.tolist(), model), + ) + + def record_sync(self, source: str, mode: str, articles_count: int) -> None: + """Record a sync event.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute( + "INSERT INTO sync_log (source, mode, articles_count) VALUES (%s, %s, %s)", + (source, mode, articles_count), + ) + + def get_stats(self) -> dict: + """Get database statistics.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute("SELECT COUNT(*) as count FROM articles") + total_articles = cur.fetchone()["count"] + + cur.execute( + """ + SELECT COUNT(*) as count FROM articles a + WHERE NOT EXISTS (SELECT 1 FROM embeddings e WHERE e.article_id = a.id) + """ + ) + without_embeddings = cur.fetchone()["count"] + + cur.execute("SELECT COUNT(*) as count FROM users") + total_users = cur.fetchone()["count"] + + return { + "total_articles": total_articles, + "articles_without_embeddings": without_embeddings, + "total_users": total_users, + } + def assign_cluster_with_similarity( self, article_id: str, @@ -288,70 +417,70 @@ def assign_cluster_with_similarity( # USER AND KEYWORD FILTERING METHODS : - def add_user(self, username: str, email: Optional[str] = None) -> int: + def add_user(self, name: str, email: str) -> str: """ Add a new user to the system. - + Args: - username: Unique username - email: Optional email address - + name: User display name + email: Email address (unique) + Returns: - User ID + User ID (UUID string) """ with self.get_connection() as conn: cur = conn.cursor() cur.execute( """ - INSERT INTO users (username, email) + INSERT INTO users (name, email) VALUES (%s, %s) - ON CONFLICT (username) DO UPDATE SET updated_at = CURRENT_TIMESTAMP + ON CONFLICT (email) DO UPDATE SET updated_at = CURRENT_TIMESTAMP RETURNING id """, - (username, email), + (name, email), ) user_id = cur.fetchone()["id"] - return user_id + return str(user_id) - def add_user_keyword(self, user_id: int, keyword: str) -> int: + def add_user_keyword(self, user_id: str, keyword: str) -> str: """ Add a keyword for a user (without embedding yet). - + Args: - user_id: User ID + user_id: User ID (UUID string) keyword: Keyword text - + Returns: - Keyword ID + Keyword ID (UUID string) """ with self.get_connection() as conn: cur = conn.cursor() cur.execute( """ INSERT INTO user_keywords (user_id, keyword) - VALUES (%s, %s) + VALUES (%s::uuid, %s) ON CONFLICT (user_id, keyword) DO UPDATE SET created_at = CURRENT_TIMESTAMP RETURNING id """, (user_id, keyword), ) keyword_id = cur.fetchone()["id"] - return keyword_id + return str(keyword_id) def store_keyword_embedding( - self, - keyword_id: int, - user_id: int, + self, + keyword_id: str, + user_id: str, keyword: str, - embedding: np.ndarray, - embedding_model: str = "text-embedding-3-small" + embedding: np.ndarray, + embedding_model: str = "text-embedding-3-small", ) -> None: """ Store embedding for a user keyword. - + Args: - keyword_id: Keyword ID - user_id: User ID + keyword_id: Keyword ID (UUID string) + user_id: User ID (UUID string) keyword: Keyword text embedding: Numpy embedding vector embedding_model: Model used for embedding @@ -361,11 +490,11 @@ def store_keyword_embedding( cur.execute( f""" INSERT INTO user_keyword_embeddings (keyword_id, user_id, keyword, embedding, embedding_model) - VALUES (%s, %s, %s, %s::vector, %s) - ON CONFLICT (keyword_id) DO UPDATE + VALUES (%s::uuid, %s::uuid, %s, %s::vector, %s) + ON CONFLICT (keyword_id) DO UPDATE SET embedding = EXCLUDED.embedding, embedding_model = EXCLUDED.embedding_model """, - (keyword_id, user_id, keyword, embedding.tobytes(), embedding_model), + (keyword_id, user_id, keyword, embedding.tolist(), embedding_model), ) def find_matching_keywords(self, article_id: str, similarity_threshold: float = 0.7) -> List[Dict[str, Any]]: @@ -383,9 +512,9 @@ def find_matching_keywords(self, article_id: str, similarity_threshold: float = cur = conn.cursor() cur.execute( """ - SELECT + SELECT u.id as user_id, - u.username, + u.name, u.email, uk.id as keyword_id, uk.keyword, @@ -403,19 +532,19 @@ def find_matching_keywords(self, article_id: str, similarity_threshold: float = return results if results else [] def record_article_delivery( - self, - user_id: int, - article_id: str, - keyword_id: int, - similarity_score: float + self, + user_id: str, + article_id: str, + keyword_id: str, + similarity_score: float, ) -> None: """ Record that an article was delivered to a user due to a matching keyword. - + Args: - user_id: User ID + user_id: User ID (UUID string) article_id: Article ID - keyword_id: Keyword ID that matched + keyword_id: Keyword ID (UUID string) that matched similarity_score: Embedding similarity score """ with self.get_connection() as conn: @@ -423,14 +552,14 @@ def record_article_delivery( cur.execute( """ INSERT INTO user_article_delivery (user_id, article_id, keyword_id, similarity_score) - VALUES (%s, %s, %s, %s) - ON CONFLICT (user_id, article_id) DO UPDATE + VALUES (%s::uuid, %s, %s::uuid, %s) + ON CONFLICT (user_id, article_id) DO UPDATE SET delivered_at = CURRENT_TIMESTAMP, similarity_score = EXCLUDED.similarity_score """, (user_id, article_id, keyword_id, similarity_score), ) - def get_user_articles(self, user_id: int, limit: int = 50) -> List[Dict[str, Any]]: + def get_user_articles(self, user_id: str, limit: int = 50) -> List[Dict[str, Any]]: """ Get all articles delivered to a user, sorted by delivery date. @@ -467,7 +596,7 @@ def get_user_articles(self, user_id: int, limit: int = 50) -> List[Dict[str, Any results = cur.fetchall() return results if results else [] - def get_user_stats(self, user_id: int) -> Dict[str, Any]: + def get_user_stats(self, user_id: str) -> Dict[str, Any]: """ Get statistics for a user (keywords count, articles delivered). @@ -481,7 +610,7 @@ def get_user_stats(self, user_id: int) -> Dict[str, Any]: cur = conn.cursor() # Get user info - cur.execute("SELECT username, email FROM users WHERE id = %s", (user_id,)) + cur.execute("SELECT name, email FROM users WHERE id = %s::uuid", (user_id,)) user = cur.fetchone() if not user: @@ -504,7 +633,7 @@ def get_user_stats(self, user_id: int) -> Dict[str, Any]: return { "user_id": user_id, - "username": user["username"], + "name": user["name"], "email": user["email"], "keywords_count": keywords_count, "articles_delivered": articles_count, diff --git a/scrapper/keyword_matcher.py b/scrapper/keyword_matcher.py index 77e5657..29cc7cb 100644 --- a/scrapper/keyword_matcher.py +++ b/scrapper/keyword_matcher.py @@ -145,7 +145,7 @@ def dispatch_article_to_users(self, article_id: str) -> Dict[int, List[str]]: delivery_summary[user_id].append(match["keyword"]) logger.info( - f"✓ Article {article_id} delivered to user {user_id} ({match['username']}) " + f"✓ Article {article_id} delivered to user {user_id} ({match['name']}) " f"via keyword '{match['keyword']}' (similarity: {match['similarity_score']:.3f})" ) @@ -197,7 +197,7 @@ def print_matching_summary(self, article_id: str) -> None: user_id = match["user_id"] if user_id not in user_matches: user_matches[user_id] = { - "username": match["username"], + "name": match["name"], "keywords": [] } user_matches[user_id]["keywords"].append({ diff --git a/scrapper/main.py b/scrapper/main.py index c5c9085..739ba67 100644 --- a/scrapper/main.py +++ b/scrapper/main.py @@ -264,16 +264,13 @@ def get_stats(self) -> Dict: def print_stats(self): """Display database statistics.""" stats = self.get_stats() - + print("\n" + "=" * 60) - print("📊 DATABASE STATISTICS") + print("DATABASE STATISTICS") print("=" * 60) print(f"Total articles: {stats['total_articles']}") - print(f"Articles with embedding: {stats['total_embeddings']}") print(f"Articles without embedding: {stats['articles_without_embeddings']}") - print("\nArticles per source:") - for source, count in stats['articles_by_source'].items(): - print(f" - {source}: {count}") + print(f"Total users: {stats['total_users']}") print("=" * 60) diff --git a/scrapper/quick_test_dispatch.py b/scrapper/quick_test_dispatch.py index a3ed4f4..a7f2a58 100644 --- a/scrapper/quick_test_dispatch.py +++ b/scrapper/quick_test_dispatch.py @@ -11,6 +11,9 @@ # Add scrapper to path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from dotenv import load_dotenv +load_dotenv() + from database import DatabaseManager from embeddings import EmbeddingManager, OpenAIEmbeddingProvider from keyword_matcher import KeywordMatcher diff --git a/scrapper/sync_keyword_embeddings.py b/scrapper/sync_keyword_embeddings.py new file mode 100644 index 0000000..047503e --- /dev/null +++ b/scrapper/sync_keyword_embeddings.py @@ -0,0 +1,77 @@ +"""One-shot script to compute embeddings for all user keywords missing embeddings.""" + +import os +import logging + +from database import DatabaseManager +from embeddings import EmbeddingManager, OpenAIEmbeddingProvider + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +DB_URL = os.getenv("DATABASE_URL", "postgresql://sachahenneveux@localhost:5432/earlytech") +EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "text-embedding-3-small") + + +def main(): + provider = OpenAIEmbeddingProvider(model=EMBEDDING_MODEL) + dim = provider.get_dimension() or 1536 + db = DatabaseManager(DB_URL, embedding_dimension=dim) + em = EmbeddingManager(provider, expected_dimension=dim) + + with db.get_connection() as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT uk.id, uk.user_id, uk.keyword + FROM user_keywords uk + LEFT JOIN user_keyword_embeddings uke ON uk.id = uke.keyword_id + WHERE uke.id IS NULL + """ + ) + keywords = cur.fetchall() + + if not keywords: + logger.info("All keywords already have embeddings.") + return + + logger.info(f"Found {len(keywords)} keywords without embeddings.") + + for kw in keywords: + try: + embedding = em.embed_text(kw["keyword"]) + db.store_keyword_embedding( + keyword_id=str(kw["id"]), + user_id=str(kw["user_id"]), + keyword=kw["keyword"], + embedding=embedding, + ) + logger.info(f"Embedded keyword: '{kw['keyword']}'") + except Exception as e: + logger.error(f"Failed to embed '{kw['keyword']}': {e}") + + # Now dispatch all articles to users + logger.info("Dispatching articles to users...") + from keyword_matcher import KeywordMatcher + + matcher = KeywordMatcher(db_manager=db, embedding_manager=em, similarity_threshold=0.25) + + with db.get_connection() as conn: + cur = conn.cursor() + cur.execute("SELECT id FROM articles") + article_ids = [row["id"] for row in cur.fetchall()] + + total_deliveries = 0 + for article_id in article_ids: + try: + result = matcher.dispatch_article_to_users(article_id) + if result: + total_deliveries += len(result) + except Exception as e: + logger.error(f"Dispatch error for {article_id}: {e}") + + logger.info(f"Done. {total_deliveries} deliveries recorded.") + + +if __name__ == "__main__": + main()