Source code for mindroot.coreplugins.mcp_.catalog_manager

import json
import os
import psutil
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Any

[docs] class MCPCatalogManager: """Manages MCP server catalog and process detection""" def __init__(self, plugin_dir: str = None, working_dir: str = None): # Default plugin_dir to the directory containing this module if plugin_dir is None: plugin_dir = Path(__file__).parent.parent.parent self.plugin_dir = Path(plugin_dir) # Use process working directory for data files if working_dir is None: working_dir = os.getcwd() self.working_dir = Path(working_dir) self.catalog_file = self.working_dir / "data" / "mcp" / "servers.json" self.default_file = Path(__file__).parent / "data" / "default_mcp_servers.json" self.ensure_catalog_exists()
[docs] def ensure_catalog_exists(self): """Ensure catalog file exists, create from default if needed""" if not self.catalog_file.exists(): if self.default_file.exists(): self.init_catalog_from_default() else: self.create_empty_catalog()
[docs] def init_catalog_from_default(self): """Initialize catalog from default servers""" # Ensure the target directory exists self.catalog_file.parent.mkdir(parents=True, exist_ok=True) try: with open(self.default_file, 'r') as f: default_servers = json.load(f) catalog = { "_metadata": { "version": "1.0.0", "last_updated": "2025-06-02", "description": "MCP Server Catalog for MindRoot", "categories": ["utilities", "system", "development", "search", "database", "communication", "cloud", "automation", "ai"] }, "servers": {} } for name, server in default_servers.items(): server["status"] = "available" server["installed"] = False server["running"] = False catalog["servers"][name] = server with open(self.catalog_file, 'w') as f: json.dump(catalog, f, indent=2) except Exception as e: print(f"Error initializing catalog: {e}") self.create_empty_catalog()
[docs] def create_empty_catalog(self): """Create empty catalog structure""" catalog = { "_metadata": { "version": "1.0.0", "last_updated": "2025-06-02", "description": "MCP Server Catalog for MindRoot", "categories": [] }, "servers": {} } self.catalog_file.parent.mkdir(parents=True, exist_ok=True) with open(self.catalog_file, 'w') as f: json.dump(catalog, f, indent=2)
[docs] def load_catalog(self) -> Dict[str, Any]: """Load the server catalog""" try: with open(self.catalog_file, 'r') as f: return json.load(f) except Exception as e: print(f"Error loading catalog: {e}") return {"_metadata": {}, "servers": {}}
[docs] def save_catalog(self, catalog: Dict[str, Any]): """Save the server catalog""" try: with open(self.catalog_file, 'w') as f: json.dump(catalog, f, indent=2) except Exception as e: print(f"Error saving catalog: {e}")
[docs] def get_running_processes(self) -> List[Dict[str, Any]]: """Get list of running processes with details""" processes = [] try: for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: processes.append({ 'pid': proc.info['pid'], 'name': proc.info['name'], 'cmdline': proc.info['cmdline'] or [] }) except (psutil.NoSuchProcess, psutil.AccessDenied): continue except Exception as e: print(f"Error getting processes: {e}") return processes
[docs] def detect_running_servers(self) -> Dict[str, bool]: """Detect which MCP servers are currently running""" catalog = self.load_catalog() running_status = {} processes = self.get_running_processes() for server_name, server_info in catalog.get("servers", {}).items(): is_running = False process_names = server_info.get("process_names", []) for proc in processes: # Check process name if proc['name'] in process_names: is_running = True break # Check command line for MCP server patterns cmdline_str = ' '.join(proc['cmdline']) for process_name in process_names: if process_name in cmdline_str: is_running = True break # Check for specific patterns if server_name in cmdline_str or server_info.get('install_package', '') in cmdline_str: is_running = True break if is_running: break running_status[server_name] = is_running return running_status
[docs] def update_server_status(self): """Update running status for all servers in catalog""" catalog = self.load_catalog() running_status = self.detect_running_servers() for server_name, is_running in running_status.items(): if server_name in catalog.get("servers", {}): catalog["servers"][server_name]["running"] = is_running self.save_catalog(catalog) return catalog
[docs] def get_servers_by_category(self, category: str = None) -> Dict[str, Any]: """Get servers filtered by category""" catalog = self.load_catalog() servers = catalog.get("servers", {}) if category is None: return servers filtered = {} for name, server in servers.items(): if server.get("category") == category: filtered[name] = server return filtered
[docs] def get_server_info(self, server_name: str) -> Optional[Dict[str, Any]]: """Get detailed info for a specific server""" catalog = self.load_catalog() return catalog.get("servers", {}).get(server_name)
[docs] def mark_server_installed(self, server_name: str, installed: bool = True): """Mark a server as installed or not""" catalog = self.load_catalog() if server_name in catalog.get("servers", {}): catalog["servers"][server_name]["installed"] = installed self.save_catalog(catalog)
[docs] def add_custom_server(self, server_info: Dict[str, Any]): """Add a custom server to the catalog""" catalog = self.load_catalog() server_name = server_info.get("name") if server_name: # Add default fields if missing server_info.setdefault("status", "custom") server_info.setdefault("installed", False) server_info.setdefault("running", False) server_info.setdefault("category", "custom") server_info.setdefault("tags", []) catalog["servers"][server_name] = server_info self.save_catalog(catalog) return True return False
[docs] def remove_server(self, server_name: str): """Remove a server from the catalog""" catalog = self.load_catalog() if server_name in catalog.get("servers", {}): del catalog["servers"][server_name] self.save_catalog(catalog) return True return False
[docs] def search_servers(self, query: str) -> Dict[str, Any]: """Search servers by name, description, or tags""" catalog = self.load_catalog() servers = catalog.get("servers", {}) results = {} query_lower = query.lower() for name, server in servers.items(): # Search in name if query_lower in name.lower(): results[name] = server continue # Search in display name if query_lower in server.get("display_name", "").lower(): results[name] = server continue # Search in description if query_lower in server.get("description", "").lower(): results[name] = server continue # Search in tags tags = server.get("tags", []) if any(query_lower in tag.lower() for tag in tags): results[name] = server continue return results
[docs] def get_categories(self) -> List[str]: """Get list of all categories""" catalog = self.load_catalog() categories = set(catalog.get("_metadata", {}).get("categories", [])) # Add categories from servers for server in catalog.get("servers", {}).values(): category = server.get("category") if category: categories.add(category) return sorted(list(categories))