"""MCP Registry Integration Routes
Handles browsing and installing MCP servers from the registry.
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import httpx
import json
import os
from lib.route_decorators import requires_role
from lib.providers.services import service_manager
# Create router with admin role requirement
router = APIRouter(
dependencies=[requires_role('admin')]
)
[docs]
class RegistryServerInstallRequest(BaseModel):
registry_id: str
server_name: Optional[str] = None # Override name if desired
[docs]
class RegistryBrowseRequest(BaseModel):
category: Optional[str] = None
search: Optional[str] = None
page: int = 1
limit: int = 20
[docs]
@router.get("/mcp/registry/browse")
async def browse_registry_servers(category: Optional[str] = None, search: Optional[str] = None, page: int = 1, limit: int = 20):
"""Browse MCP servers from the registry."""
try:
# Get registry settings
registry_url = "https://registry.mindroot.io" # Default
registry_token = None
try:
settings_file = 'data/registry_settings.json'
if os.path.exists(settings_file):
with open(settings_file, 'r') as f:
settings = json.load(f)
registry_url = settings.get("registry_url", registry_url)
registry_token = settings.get("registry_token")
# Try environment variable if no token in file
if not registry_token:
registry_token = os.getenv('REGISTRY_TOKEN')
except Exception as e:
print(f"Error reading registry settings: {e}")
# Build query parameters
params = {
"category": "mcp_server",
"page": page,
"limit": limit
}
if category:
params["filter_category"] = category
if search:
params["search"] = search
headers = {"Content-Type": "application/json"}
if registry_token:
headers["Authorization"] = f"Bearer {registry_token}"
# Query registry
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{registry_url}/browse",
params=params,
headers=headers
)
if response.status_code >= 400:
try:
error_detail = response.json().get("detail", response.text)
except:
error_detail = response.text
raise HTTPException(
status_code=response.status_code,
detail=f"Registry query failed: {error_detail}"
)
result = response.json()
# Filter and format MCP servers
servers = []
for item in result.get("items", []):
if item.get("category") == "mcp_server":
server_data = item.get("data", {})
servers.append({
"registry_id": item.get("id"),
"title": item.get("title"),
"description": item.get("description"),
"version": item.get("version"),
"author": item.get("author"),
"tags": item.get("tags", []),
"server_type": server_data.get("server_type", "unknown"),
"transport": server_data.get("transport", "unknown"),
"url": server_data.get("url"),
"tools": server_data.get("tools", []),
"auth_type": server_data.get("auth_type", "none"),
"created_at": item.get("created_at"),
"updated_at": item.get("updated_at")
})
return {
"success": True,
"servers": servers,
"total": len(servers),
"page": page,
"limit": limit
}
except HTTPException:
raise
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Registry browse failed: {str(e)}")
[docs]
@router.get("/mcp/registry/server/{registry_id}")
async def get_registry_server_details(registry_id: str):
"""Get detailed information about a specific registry server."""
try:
# Get registry settings
registry_url = "https://registry.mindroot.io" # Default
registry_token = None
try:
settings_file = 'data/registry_settings.json'
if os.path.exists(settings_file):
with open(settings_file, 'r') as f:
settings = json.load(f)
registry_url = settings.get("registry_url", registry_url)
registry_token = settings.get("registry_token")
if not registry_token:
registry_token = os.getenv('REGISTRY_TOKEN')
except Exception as e:
print(f"Error reading registry settings: {e}")
headers = {"Content-Type": "application/json"}
if registry_token:
headers["Authorization"] = f"Bearer {registry_token}"
# Get server details from registry
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{registry_url}/item/{registry_id}",
headers=headers
)
if response.status_code >= 400:
try:
error_detail = response.json().get("detail", response.text)
except:
error_detail = response.text
raise HTTPException(
status_code=response.status_code,
detail=f"Registry server lookup failed: {error_detail}"
)
item = response.json()
if item.get("category") != "mcp_server":
raise HTTPException(status_code=400, detail="Item is not an MCP server")
server_data = item.get("data", {})
return {
"success": True,
"server": {
"registry_id": item.get("id"),
"title": item.get("title"),
"description": item.get("description"),
"version": item.get("version"),
"author": item.get("author"),
"tags": item.get("tags", []),
"server_type": server_data.get("server_type", "unknown"),
"transport": server_data.get("transport", "unknown"),
"url": server_data.get("url"),
"command": server_data.get("command"),
"args": server_data.get("args", []),
"env": server_data.get("env", {}),
"tools": server_data.get("tools", []),
"resources": server_data.get("resources", []),
"prompts": server_data.get("prompts", []),
"auth_type": server_data.get("auth_type", "none"),
"created_at": item.get("created_at"),
"updated_at": item.get("updated_at")
}
}
except HTTPException:
raise
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Registry server lookup failed: {str(e)}")
[docs]
@router.post("/mcp/registry/install")
async def install_registry_server(request: RegistryServerInstallRequest):
"""Install an MCP server from the registry."""
try:
# First get server details from registry
server_details_response = await get_registry_server_details(request.registry_id)
server_info = server_details_response["server"]
# Determine server name
server_name = request.server_name or server_info["title"].lower().replace(" ", "_")
# Get MCP manager
mcp_manager = await service_manager.enhanced_mcp_manager_service()
if not mcp_manager:
raise HTTPException(status_code=500, detail="MCP manager service not available")
# Check if server already exists
if server_name in mcp_manager.servers:
raise HTTPException(status_code=400, detail=f"Server '{server_name}' already exists")
# Create server configuration based on type
if server_info["server_type"] == "remote":
# Remote server - use OAuth if needed
from mindroot.coreplugins.mcp_.mod import MCPServer
import os
# Get BASE_URL for callback
base_url = os.getenv('BASE_URL', 'http://localhost:3000')
server = MCPServer(
name=server_name,
description=server_info["description"],
command=None, # Not used for remote servers
transport="http",
url=server_info["url"],
auth_type=server_info.get("auth_type", "oauth2"),
redirect_uri=f"{base_url}/mcp_oauth_cb"
)
# Add server to manager
mcp_manager.add_server(server_name, server)
# Try to connect (will handle OAuth if needed)
success = await mcp_manager.connect_server(server_name)
if not success:
# Check if OAuth flow is pending
oauth_status = mcp_manager.get_oauth_status(server_name)
if "oauth_flow" in oauth_status and oauth_status["oauth_flow"]["status"] == "awaiting_authorization":
return {
"success": False,
"requires_oauth": True,
"auth_url": oauth_status["oauth_flow"]["auth_url"],
"flow_id": oauth_status["oauth_flow"]["flow_id"],
"server_name": server_name,
"message": "OAuth authorization required to complete installation."
}
else:
# Remove failed server
mcp_manager.remove_server(server_name)
raise HTTPException(status_code=500, detail="Failed to connect to remote server")
# Success - server is connected
tools_count = len(server.capabilities.get("tools", []))
# Mark as installed (will only save to config if local)
mcp_manager.mark_server_as_installed(server_name, request.registry_id)
is_local = mcp_manager._is_local_server(server)
return {
"success": True,
"message": f"Successfully installed '{server_name}' with {tools_count} tools.",
"server_name": server_name,
"tools_count": tools_count,
"server_type": "local" if is_local else "remote",
"persisted": is_local,
"installed": True
}
elif server_info["server_type"] == "local":
# Local server - use enhanced manager for installation
try:
enhanced_mcp_manager = await service_manager.enhanced_mcp_manager_service()
if not enhanced_mcp_manager:
raise HTTPException(status_code=500, detail="Enhanced MCP manager service not available")
from mindroot.coreplugins.mcp_.mod import MCPServer
# Create enhanced server configuration
enhanced_server = MCPServer( name=server_name,
description=server_info["description"],
command=server_info["command"],
args=server_info.get("args", []),
env=server_info.get("env", {}),
install_method="manual", # Registry servers are pre-configured
auto_install=False
)
# Add to enhanced manager
enhanced_mcp_manager.add_server(server_name, enhanced_server)
# Connect to server
success = await enhanced_mcp_manager.connect_server(server_name)
if not success:
enhanced_mcp_manager.remove_server(server_name)
raise HTTPException(status_code=500, detail="Failed to connect to local server")
# Get tools count
server = enhanced_mcp_manager.servers[server_name]
tools_count = len(server.capabilities.get("tools", []))
# Mark as installed (will only save to config if local)
enhanced_mcp_manager.mark_server_as_installed(server_name, request.registry_id)
is_local = enhanced_mcp_manager._is_local_server(server)
return {
"success": True,
"message": f"Successfully installed '{server_name}' with {tools_count} tools.",
"server_name": server_name,
"tools_count": tools_count,
"server_type": "local" if is_local else "remote",
"persisted": is_local,
"installed": True
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Local server installation failed: {str(e)}")
else:
raise HTTPException(status_code=400, detail=f"Unsupported server type: {server_info['server_type']}")
except HTTPException:
raise
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Registry server installation failed: {str(e)}")
[docs]
@router.post("/mcp/registry/complete-oauth")
async def complete_registry_oauth(server_name: str, code: str, state: Optional[str] = None):
"""Complete OAuth flow for registry server installation."""
try:
# Get the MCP manager service
mcp_manager = await service_manager.enhanced_mcp_manager_service()
if not mcp_manager:
raise HTTPException(status_code=500, detail="MCP manager service not available")
# Complete the OAuth flow
success = mcp_manager.complete_oauth_flow(server_name, code, state)
if not success:
raise HTTPException(status_code=400, detail="Failed to complete OAuth flow")
# Wait a moment for the OAuth flow to complete
import asyncio
await asyncio.sleep(1)
# Check if server is now connected
if server_name in mcp_manager.servers:
server = mcp_manager.servers[server_name]
if server.status == "connected":
tools = server.capabilities.get("tools", [])
resources = server.capabilities.get("resources", [])
prompts = server.capabilities.get("prompts", [])
return {
"success": True,
"message": f"OAuth completed successfully. Registry server '{server_name}' installed with {len(tools)} tools, {len(resources)} resources, {len(prompts)} prompts.",
"server_name": server_name,
"tools_count": len(tools),
"resources_count": len(resources),
"prompts_count": len(prompts)
}
return {
"success": True,
"message": "OAuth flow completed, but server connection is still pending.",
"server_name": server_name
}
except HTTPException:
raise
except Exception as e:
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"OAuth completion failed: {str(e)}")
[docs]
@router.get("/mcp/registry/installation-status")
async def get_installation_status():
"""Get installation status of all registry servers."""
try:
mcp_manager = await service_manager.enhanced_mcp_manager_service()
if not mcp_manager:
return {"success": False, "installed_servers": []}
installed_servers = []
for name, server in mcp_manager.servers.items():
# Include servers with registry_id OR servers that are marked as installed
has_registry_id = hasattr(server, 'registry_id') and server.registry_id
is_installed = hasattr(server, 'installed') and server.installed
if has_registry_id or is_installed:
installed_servers.append({
"registry_id": getattr(server, 'registry_id', None),
"server_name": name,
"status": server.status,
"server_type": "local" if mcp_manager._is_local_server(server) else "remote",
"persisted": mcp_manager._is_local_server(server),
"tools_count": len(server.capabilities.get("tools", []))
})
return {"success": True, "installed_servers": installed_servers}
except Exception as e:
import traceback
traceback.print_exc()
return {"success": False, "installed_servers": [], "error": str(e)}
[docs]
@router.get("/mcp/registry/categories")
async def get_registry_categories():
"""Get available MCP server categories from the registry."""
try:
# Get registry settings
registry_url = "https://registry.mindroot.io" # Default
registry_token = None
try:
settings_file = 'data/registry_settings.json'
if os.path.exists(settings_file):
with open(settings_file, 'r') as f:
settings = json.load(f)
registry_url = settings.get("registry_url", registry_url)
registry_token = settings.get("registry_token")
if not registry_token:
registry_token = os.getenv('REGISTRY_TOKEN')
except Exception as e:
print(f"Error reading registry settings: {e}")
headers = {"Content-Type": "application/json"}
if registry_token:
headers["Authorization"] = f"Bearer {registry_token}"
# Get categories from registry
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{registry_url}/categories",
headers=headers
)
if response.status_code >= 400:
# Fallback to default categories if endpoint doesn't exist
return {
"success": True,
"categories": ["utilities", "development", "database", "communication", "ai", "automation"]
}
result = response.json()
return {
"success": True,
"categories": result.get("categories", [])
}
except Exception as e:
# Fallback to default categories
return {
"success": True,
"categories": ["utilities", "development", "database", "communication", "ai", "automation"]
}
[docs]
@router.post("/mcp/registry/mark-installed")
async def mark_server_installed(server_name: str, registry_id: str = None):
"""Manually mark a server as installed (for testing/fixing)."""
try:
mcp_manager = await service_manager.enhanced_mcp_manager_service()
if not mcp_manager:
raise HTTPException(status_code=500, detail="MCP manager service not available")
if server_name not in mcp_manager.servers:
raise HTTPException(status_code=404, detail=f"Server '{server_name}' not found")
mcp_manager.mark_server_as_installed(server_name, registry_id)
return {
"success": True,
"message": f"Server '{server_name}' marked as installed"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))