|
| 1 | +import psycopg2 |
| 2 | +import os |
1 | 3 | from mcp.server.fastmcp import FastMCP |
2 | 4 | from datetime import datetime, timezone |
3 | | -import os |
| 5 | +from psycopg2.extras import RealDictCursor |
| 6 | +from typing import List, Dict, Any, Optional |
4 | 7 |
|
| 8 | +DATABASE_URL = os.environ.get("DATABASE_URL") |
5 | 9 | SSE_PATH = os.environ.get("SSE_PATH", "/mcp/sse") |
6 | 10 | MESSAGE_PATH = os.environ.get("MESSAGE_PATH", "/mcp/messages/") |
7 | 11 |
|
8 | 12 | mcp = FastMCP("Objectified", sse_path=SSE_PATH, message_path=MESSAGE_PATH) |
9 | 13 |
|
| 14 | +def connect_to_postgres(database_url: str) -> psycopg2.extensions.connection: |
| 15 | + """ |
| 16 | + Establishes a connection to a PostgreSQL database using a DATABASE_URL. |
| 17 | +
|
| 18 | + Args: |
| 19 | + database_url (str): The PostgreSQL connection string in URL format |
| 20 | + (e.g., 'postgresql://username:password@hostname:port/database') |
| 21 | +
|
| 22 | + Returns: |
| 23 | + psycopg2.extensions.connection: A connection object to the PostgreSQL database |
| 24 | +
|
| 25 | + Raises: |
| 26 | + Exception: If connection fails |
| 27 | + """ |
| 28 | + try: |
| 29 | + conn = psycopg2.connect(database_url) |
| 30 | + return conn |
| 31 | + except Exception as e: |
| 32 | + raise Exception(f"Error connecting to PostgreSQL database: {str(e)}") |
| 33 | + |
| 34 | +def run_query(conn: psycopg2.extensions.connection, query: str, |
| 35 | + params: Optional[tuple] = None) -> List[Dict[str, Any]]: |
| 36 | + """ |
| 37 | + Executes a SQL query and returns all results. |
| 38 | +
|
| 39 | + Args: |
| 40 | + conn (psycopg2.extensions.connection): PostgreSQL connection object |
| 41 | + query (str): SQL query to execute |
| 42 | + params (tuple, optional): Parameters to be used with the query for safe parameterization |
| 43 | +
|
| 44 | + Returns: |
| 45 | + List[Dict[str, Any]]: List of dictionaries, where each dictionary |
| 46 | + represents a row with column names as keys |
| 47 | +
|
| 48 | + Raises: |
| 49 | + Exception: If query execution fails |
| 50 | + """ |
| 51 | + try: |
| 52 | + with conn.cursor(cursor_factory=RealDictCursor) as cursor: |
| 53 | + cursor.execute(query, params) |
| 54 | + results = cursor.fetchall() |
| 55 | + return [dict(row) for row in results] |
| 56 | + except Exception as e: |
| 57 | + conn.rollback() |
| 58 | + raise Exception(f"Error executing query: {str(e)}") |
| 59 | + finally: |
| 60 | + # We don't close the connection here to allow for multiple queries |
| 61 | + # with the same connection. The caller should close it when done. |
| 62 | + pass |
| 63 | + |
10 | 64 | @mcp.tool() |
11 | 65 | def hello() -> str: |
12 | 66 | """Greets the user.""" |
13 | 67 | return f"Hello world!" |
14 | 68 |
|
| 69 | +@mcp.resource("classes://{name}/by_name") |
| 70 | +def get_classes_by_name(name: str) -> list[str]: |
| 71 | + conn = connect_to_postgres(DATABASE_URL) |
| 72 | + results = run_query(conn, "SELECT id FROM obj.class WHERE name LIKE %s", (f"%{name}%",)) |
| 73 | + result_list = [row["id"] for row in results] if len(results) > 0 else [] |
| 74 | + return result_list |
| 75 | + |
15 | 76 | if __name__ == "__main__": |
16 | 77 | print(""" |
17 | 78 | ██████ ██████ ██ ███████ ██████ ████████ ██ ███████ ██ ███████ ██████ |
|
0 commit comments