Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions python/mcp-ollama-rag/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Ollama-MCP Function with RAG

A Knative Function implementing a Model Context Protocol (MCP) server that
provides integration with Ollama for LLM interactions. This function
exposes Ollama capabilities through standardized MCP tools, enabling the
interaction with hosted language models.

The communication flow is as follows:
`MCP client -> MCP Server (Function) -> Ollama Server`

1) Setup `ollama` server (`ollama serve`)
2) Run your function (MCP server) (`func run`)
3) Connect using MCP client in `client/` dir (`python client.py`)

## Architecture

This project implements an ASGI-based Knative function with the following key
components:

### Core Components
- **Function Class**: Main ASGI application entry point (This is your base
Function)
- **MCPServer Class**: FastMCP-based server implementing HTTP-streamable MCP
protocol
- **MCP Tools**: Three primary tools for Ollama interaction:
- `list_models`: Enumerate available models on the Ollama server
- `pull_model`: Download and install new models
- `call_model`: Send prompts to models and receive responses
- `rag_document`: RAG a document - accepts urls or text (strings)


## Setup

### Prerequisites

- Python 3.9 or higher
- Ollama server running locally or accessible via network

### Local Development Setup

1. **Install dependencies & setup env**
```bash

# optionally setup venv
pythom -m venv venv
source venv/bin/activate

# and install deps
pip install -e .
```

2. **Start Ollama server:**
```bash
# Install Ollama (if not already installed)
curl -fsSL https://ollama.com/install.sh | sh

# Start Ollama service (in different terminal/ in bg)
ollama serve

# Pull a model (optional, can be done via MCP tool)
ollama pull llama3.2:3b
```

Now you have a running Ollama Server

3. **Run the function:**
```bash
# Using func CLI (build via host builder)
func run --builder=host
```

Now you have a running MCP Server which has integration with ollama client tools
that will enable you to: embed some documents, pull a model available on the
ollama server and call the (now) specialized inference model with prompts.

4. **Run MCP client**
```bash
# In client/ directory.
# MODIFY THIS FILE
# By default it RAGs a document and prompts asking about it
python client.py
```

Now you've connected via MCP protocol to the running function, using an MCP client
which has embedded a document into vector space for RAG tooling and prompted the
model which can use the embeddings to answer your question (hopefuly) in a more
sophisticated manner.

### Deployment to cluster (not tested)

#### Knative Function Deployment

```bash
# Deploy to Knative cluster
func deploy

# Or build and deploy with custom image
func deploy --image your-registry/mcp-ollama-function
```

Here you would also need to ensure the access to the ollama server, using a pod
or portforwarding etc.

### Troubleshooting

**Connection Issues:**
- Ensure Ollama server is running and accessible
- Check firewall settings for port 11434 (Ollama default)
- Verify model availability with `ollama list`
- Confirm function is running on expected port (default: 8080)

46 changes: 46 additions & 0 deletions python/mcp-ollama-rag/client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
import json

from mcp.types import CallToolResult

def unload_list_models(models: CallToolResult) -> list[str]:
return [json.loads(item.text)["model"] for item in models.content if item.text.strip().startswith('{')] #pyright: ignore

async def main():
# check your running Function MCP Server, it will output where its available
# at during initialization.
async with streamablehttp_client("http://localhost:8080/mcp") as streams:
read_stream,write_stream = streams[0],streams[1]

async with ClientSession(read_stream,write_stream) as sess:
print("Initializing connection...",end="")
await sess.initialize()
print("done!\n")


# embed some documents
embed = await sess.call_tool(
name="embed_document",
arguments={
"data": [
"https://raw.githubusercontent.com/knative/func/main/docs/function-templates/python.md",
"https://context7.com/knative/docs/llms.txt?topic=functions",
],
}
)
print(embed.content[0].text) # pyright: ignore[reportAttributeAccessIssue]
print("-"*60)

# prompt the inference model
resp = await sess.call_tool(
name="call_model",
arguments={
"prompt": "What actually is a Knative Function?",
}
)
print(resp.content[0].text)

if __name__ == "__main__":
asyncio.run(main())
1 change: 1 addition & 0 deletions python/mcp-ollama-rag/function/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .func import new
227 changes: 227 additions & 0 deletions python/mcp-ollama-rag/function/func.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# function/func.py

# Function as an MCP Server implementation
import logging

from mcp.server.fastmcp import FastMCP
import ollama
import asyncio
import chromadb
import requests

def new():
""" New is the only method that must be implemented by a Function.
The instance returned can be of any name.
"""
return Function()

# Accepts any url link which points to a raw data (*.md/text files etc.)
# example: https://raw.githubusercontent.com/knative/func/main/docs/function-templates/python.md
def get_raw_content(url: str) -> str:
""" retrieve contents of github raw url as a text """
response = requests.get(url)
response.raise_for_status() # errors if bad response
print(f"fetch '{url}' - ok")
return response.text

class MCPServer:
"""
MCP server that exposes a chat with an LLM model running on Ollama server
as one of its tools.
"""

def __init__(self):
# Create FastMCP instance with stateless HTTP for Kubernetes deployment
self.mcp = FastMCP("MCP-Ollama server", stateless_http=True)

# Get the ASGI app from FastMCP
self._app = self.mcp.streamable_http_app()

self.client = ollama.Client()

#init database stuff
self.dbClient = chromadb.Client()
self.collection = self.dbClient.create_collection(name="my_collection")
# default embedding model
self.embedding_model = "mxbai-embed-large"
# call this after self.embedding_model assignment, so its defined
self._register_tools()

def _register_tools(self):
"""Register MCP tools."""
@self.mcp.tool()
def list_models():
"""List all models currently available on the Ollama server"""
try:
models = self.client.list()
except Exception as e:
return f"Oops, failed to list models because: {str(e)}"
#return [model['name'] for model in models['models']]
return [model for model in models]

default_embedding_model = self.embedding_model
@self.mcp.tool()
def embed_document(data:list[str],model:str = default_embedding_model) -> str:
"""
RAG (Retrieval-augmented generation) tool.
Embeds documents provided in data.
Arguments:
- data: expected to be of type str|list.
- model: embedding model to use, examples below.

# example embedding models:
# mxbai-embed-large - 334M *default
# nomic-embed-text - 137M
# all-minilm - 23M
"""
count = 0

############ TODO -- import im a separate file
# documents generator
#documents_gen = parse_data_generator(data)
#### 1) GENERATE
# generate vector embeddings via embedding model
#for i, d in enumerate(documents_gen):
# response = ollama.embed(model=model,input=d)
# embeddings = response["embeddings"]
# self.collection.add(
# ids=[str(i)],
# embeddings=embeddings,
# documents=[d]
# )
# count += 1

# for simplicity (until the above is resolved, this accecpts only URLs)
for i, d in enumerate(data):
response = ollama.embed(model=model,input=get_raw_content(d))
embeddings = response["embeddings"]
self.collection.add(
ids=[str(i)],
embeddings=embeddings,
documents=[d]
)
count += 1
return f"ok - Embedded {count} documents"

@self.mcp.tool()
def pull_model(model: str) -> str:
"""Download and install an Ollama model into the running server"""
try:
_ = self.client.pull(model)
except Exception as e:
return f"Error occurred during pulling of a model: {str(e)}"
return f"Success! model {model} is available"

@self.mcp.tool()
def call_model(prompt: str,
model: str = "llama3.2:3b",
embed_model: str = self.embedding_model) -> str:
"""Send a prompt to a model being served on ollama server"""
#### 2) RETRIEVE
# we embed the prompt but dont save it into db, then we retrieve
# the most relevant document (most similar vectors)
try:
response = ollama.embed(
model=embed_model,
input=prompt
)
results = self.collection.query(
query_embeddings=response["embeddings"],
n_results=1
)
data = results['documents'][0][0]

#### 3) GENERATE
# generate answer given a combination of prompt and data retrieved
output = ollama.generate(
model=model,
prompt=f'Using data: {data}, respond to prompt: {prompt}'
)
print(output)
except Exception as e:
return f"Error occurred during calling the model: {str(e)}"
return output['response']

async def handle(self, scope, receive, send):
"""Handle ASGI requests - both lifespan and HTTP."""
await self._app(scope, receive, send)

class Function:
def __init__(self):
""" The init method is an optional method where initialization can be
performed. See the start method for a startup hook which includes
configuration.
"""
self.mcp_server = MCPServer()
self._mcp_initialized = False

async def handle(self, scope, receive, send):
"""
Main entry to your Function.
This handles all the incoming requests.
"""

# Initialize MCP server on first request
if not self._mcp_initialized:
await self._initialize_mcp()

# Route MCP requests
if scope['path'].startswith('/mcp'):
await self.mcp_server.handle(scope, receive, send)
return

# Default response for non-MCP requests
await self._send_default_response(send)

async def _initialize_mcp(self):
"""Initialize the MCP server by sending lifespan startup event."""
lifespan_scope = {'type': 'lifespan', 'asgi': {'version': '3.0'}}
startup_sent = False

async def lifespan_receive():
nonlocal startup_sent
if not startup_sent:
startup_sent = True
return {'type': 'lifespan.startup'}
await asyncio.Event().wait() # Wait forever for shutdown

async def lifespan_send(message):
if message['type'] == 'lifespan.startup.complete':
self._mcp_initialized = True
elif message['type'] == 'lifespan.startup.failed':
logging.error(f"MCP startup failed: {message}")

# Start lifespan in background
asyncio.create_task(self.mcp_server.handle(
lifespan_scope, lifespan_receive, lifespan_send
))

# Brief wait for startup completion
await asyncio.sleep(0.1)

async def _send_default_response(self, send):
"""
Send default OK response.
This is for your non MCP requests if desired.
"""
await send({
'type': 'http.response.start',
'status': 200,
'headers': [[b'content-type', b'text/plain']],
})
await send({
'type': 'http.response.body',
'body': b'OK',
})

def start(self, cfg):
logging.info("Function starting")

def stop(self):
logging.info("Function stopping")

def alive(self):
return True, "Alive"

def ready(self):
return True, "Ready"
Loading
Loading