Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions python/packages/github_copilot/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,24 @@ pip install agent-framework-github-copilot --pre
## GitHub Copilot Agent

The GitHub Copilot agent enables integration with GitHub Copilot, allowing you to interact with Copilot's agentic capabilities through the Agent Framework.

### Native Copilot skills

You can load Copilot CLI-native skills by passing `skill_directories` in `default_options`:

```python
from copilot.session import PermissionRequestResult
from agent_framework_github_copilot import GitHubCopilotAgent


def approve_all(_request, _context):
return PermissionRequestResult(kind="approved")


agent = GitHubCopilotAgent(
default_options={
"on_permission_request": approve_all,
"skill_directories": ["./skills"],
}
)
```
160 changes: 129 additions & 31 deletions python/packages/github_copilot/agent_framework_github_copilot/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import logging
import sys
from collections.abc import AsyncIterable, Awaitable, Callable, MutableMapping, Sequence
from typing import Any, ClassVar, Generic, Literal, TypedDict, overload
from typing import Any, ClassVar, Generic, Literal, TypedDict, cast, overload

from agent_framework import (
AgentMiddlewareTypes,
Expand Down Expand Up @@ -126,6 +126,9 @@ class GitHubCopilotOptions(TypedDict, total=False):
instead of the default GitHub Copilot backend.
"""

skill_directories: list[str]
"""Directories containing Copilot-native ``SKILL.md`` files to load into the session."""


OptionsT = TypeVar(
"OptionsT",
Expand Down Expand Up @@ -239,6 +242,7 @@ def __init__(
on_permission_request: PermissionHandlerType | None = opts.pop("on_permission_request", None)
mcp_servers: dict[str, MCPServerConfig] | None = opts.pop("mcp_servers", None)
provider: ProviderConfig | None = opts.pop("provider", None)
skill_directories: list[str] | None = opts.pop("skill_directories", None)

self._settings = load_settings(
GitHubCopilotSettings,
Expand All @@ -255,6 +259,7 @@ def __init__(
self._permission_handler = on_permission_request
self._mcp_servers = mcp_servers
self._provider = provider
self._skill_directories = skill_directories
self._default_options = opts
self._started = False

Expand Down Expand Up @@ -403,7 +408,12 @@ async def _run_impl(

# NOTE: session is created after providers run so that future provider-contributed
# tools/config could be folded into runtime_options before session creation.
copilot_session = await self._get_or_create_session(session, streaming=False, runtime_options=opts)
copilot_session = await self._get_or_create_session(
session,
streaming=False,
runtime_options=opts,
session_tools=session_context.tools,
)

# Build the prompt from the full set of messages in the session context,
# so that any context/history provider-injected messages are included.
Expand Down Expand Up @@ -481,7 +491,12 @@ async def _stream_updates(

# NOTE: session is created after providers run so that future provider-contributed
# tools/config could be folded into runtime_options before session creation.
copilot_session = await self._get_or_create_session(session, streaming=True, runtime_options=opts)
copilot_session = await self._get_or_create_session(
session,
streaming=True,
runtime_options=opts,
session_tools=session_context.tools,
)

if _ctx_holder is not None:
_ctx_holder["session_context"] = session_context
Expand Down Expand Up @@ -685,18 +700,103 @@ async def handler(invocation: ToolInvocation) -> ToolResult:
parameters=ai_func.parameters(),
)

@staticmethod
def _get_tool_name(tool: ToolTypes | CopilotTool) -> str | None:
"""Extract a tool name for duplicate detection."""
if isinstance(tool, dict):
tool_dict = cast(dict[str, Any], tool)
func = tool_dict.get("function")
if isinstance(func, dict):
func_dict = cast(dict[str, Any], func)
name = func_dict.get("name")
return name if isinstance(name, str) else None
return None

name = getattr(tool, "name", None)
return name if isinstance(name, str) else None

@staticmethod
def _resolve_option(
opts: dict[str, Any],
key: str,
default: Any = None,
) -> Any:
"""Resolve a runtime option while preserving explicit empty collections."""
if key not in opts:
return default
value = opts[key]
return default if value is None else value

def _resolve_tools(
self,
session_tools: Sequence[ToolTypes | Callable[..., Any] | CopilotTool] | None = None,
) -> list[CopilotTool] | None:
"""Merge agent and provider tools using core uniqueness rules."""
merged_tools: list[ToolTypes | CopilotTool] = []
seen_tools: dict[str, ToolTypes | CopilotTool] = {}

for tool_group in (self._tools, normalize_tools(session_tools)):
for tool in tool_group:
tool_name = self._get_tool_name(tool)
if tool_name is None:
merged_tools.append(tool)
continue

existing = seen_tools.get(tool_name)
if existing is None:
seen_tools[tool_name] = tool
merged_tools.append(tool)
continue

if existing is tool:
continue

raise ValueError(f"Duplicate tool name '{tool_name}'. Tool names must be unique.")

return self._prepare_tools(merged_tools) if merged_tools else None

def _build_session_config(
self,
*,
streaming: bool,
runtime_options: dict[str, Any] | None = None,
session_tools: Sequence[ToolTypes | Callable[..., Any] | CopilotTool] | None = None,
) -> dict[str, Any]:
"""Build shared Copilot session configuration for create and resume paths."""
opts = runtime_options or {}
model = self._resolve_option(opts, "model", self._settings.get("model"))
system_message = self._resolve_option(opts, "system_message", self._default_options.get("system_message"))
permission_handler = self._resolve_option(opts, "on_permission_request", self._permission_handler)
mcp_servers = self._resolve_option(opts, "mcp_servers", self._mcp_servers)
provider = self._resolve_option(opts, "provider", self._provider)
skill_directories = self._resolve_option(opts, "skill_directories", self._skill_directories)
tools = self._resolve_tools(session_tools)

return {
"on_permission_request": permission_handler or _deny_all_permissions,
"streaming": streaming,
"model": model,
"system_message": system_message,
"tools": tools,
"mcp_servers": mcp_servers,
"provider": provider,
"skill_directories": skill_directories,
}

async def _get_or_create_session(
self,
agent_session: AgentSession,
streaming: bool = False,
runtime_options: dict[str, Any] | None = None,
session_tools: Sequence[ToolTypes | Callable[..., Any] | CopilotTool] | None = None,
) -> CopilotSession:
"""Get an existing session or create a new one for the session.

Args:
agent_session: The conversation session.
streaming: Whether to enable streaming for the session.
runtime_options: Runtime options from run that take precedence.
session_tools: Tools contributed for this invocation by context providers.

Returns:
A CopilotSession instance.
Expand All @@ -709,9 +809,14 @@ async def _get_or_create_session(

try:
if agent_session.service_session_id:
return await self._resume_session(agent_session.service_session_id, streaming)
return await self._resume_session(
agent_session.service_session_id,
streaming,
runtime_options=runtime_options,
session_tools=session_tools,
)

session = await self._create_session(streaming, runtime_options)
session = await self._create_session(streaming, runtime_options, session_tools=session_tools)
agent_session.service_session_id = session.session_id
return session
except Exception as ex:
Expand All @@ -721,49 +826,42 @@ async def _create_session(
self,
streaming: bool,
runtime_options: dict[str, Any] | None = None,
session_tools: Sequence[ToolTypes | Callable[..., Any] | CopilotTool] | None = None,
) -> CopilotSession:
"""Create a new Copilot session.

Args:
streaming: Whether to enable streaming for the session.
runtime_options: Runtime options that take precedence over default_options.
session_tools: Tools contributed for this invocation by context providers.
"""
if not self._client:
raise RuntimeError("GitHub Copilot client not initialized. Call start() first.")

opts = runtime_options or {}
model = opts.get("model") or self._settings.get("model") or None
system_message = opts.get("system_message") or self._default_options.get("system_message") or None
permission_handler: PermissionHandlerType = (
opts.get("on_permission_request") or self._permission_handler or _deny_all_permissions
)
mcp_servers = opts.get("mcp_servers") or self._mcp_servers or None
provider = opts.get("provider") or self._provider or None
tools = self._prepare_tools(self._tools) if self._tools else None

return await self._client.create_session(
on_permission_request=permission_handler,
streaming=streaming,
model=model or None,
system_message=system_message or None,
tools=tools or None,
mcp_servers=mcp_servers or None,
provider=provider or None,
**self._build_session_config(
streaming=streaming,
runtime_options=runtime_options,
session_tools=session_tools,
)
)

async def _resume_session(self, session_id: str, streaming: bool) -> CopilotSession:
async def _resume_session(
self,
session_id: str,
streaming: bool,
runtime_options: dict[str, Any] | None = None,
session_tools: Sequence[ToolTypes | Callable[..., Any] | CopilotTool] | None = None,
) -> CopilotSession:
"""Resume an existing Copilot session by ID."""
if not self._client:
raise RuntimeError("GitHub Copilot client not initialized. Call start() first.")

permission_handler: PermissionHandlerType = self._permission_handler or _deny_all_permissions
tools = self._prepare_tools(self._tools) if self._tools else None

return await self._client.resume_session(
session_id,
on_permission_request=permission_handler,
streaming=streaming,
tools=tools or None,
mcp_servers=self._mcp_servers or None,
provider=self._provider or None,
**self._build_session_config(
streaming=streaming,
runtime_options=runtime_options,
session_tools=session_tools,
),
)
Loading