Source code for mindroot.coreplugins.admin.mcp_catalog_routes
fromfastapiimportAPIRouter,HTTPExceptionfromtypingimportOptionalfromlib.route_decoratorsimportrequires_role# Import MCP components - use the actual MCP systemtry:frommindroot.coreplugins.mcp.modimportmcp_manager,MCPServerfrommindroot.coreplugins.mcp.catalog_managerimportMCPCatalogManagerexceptImportError:# Mock objects if MCP plugin is not fully installed, to prevent startup crashmcp_manager=NoneMCPServer=NoneMCPCatalogManager=None# Create router with admin role requirementrouter=APIRouter(dependencies=[requires_role('admin')])# --- MCP Catalog/Directory Routes ---
[docs]@router.get("/mcp/catalog")asyncdefget_mcp_catalog():"""Get the MCP server catalog/directory."""ifnotMCPCatalogManager:raiseHTTPException(status_code=501,detail="MCP Catalog not available")try:catalog_manager=MCPCatalogManager()catalog=catalog_manager.load_catalog()# Update running statuscatalog_manager.update_server_status()return{"success":True,"data":catalog}exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))
[docs]@router.get("/mcp/catalog/search")asyncdefsearch_mcp_catalog(query:str="",category:Optional[str]=None):"""Search the MCP server catalog."""ifnotMCPCatalogManager:raiseHTTPException(status_code=501,detail="MCP Catalog not available")try:catalog_manager=MCPCatalogManager()ifquery:results=catalog_manager.search_servers(query)elifcategory:results=catalog_manager.get_servers_by_category(category)else:catalog=catalog_manager.load_catalog()results=catalog.get("servers",{})return{"success":True,"data":results}exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))
[docs]@router.get("/mcp/catalog/categories")asyncdefget_mcp_categories():"""Get available MCP server categories."""ifnotMCPCatalogManager:raiseHTTPException(status_code=501,detail="MCP Catalog not available")try:catalog_manager=MCPCatalogManager()categories=catalog_manager.get_categories()return{"success":True,"data":categories}exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))
[docs]@router.post("/mcp/catalog/install")asyncdefinstall_from_catalog(server_name:str):"""Install an MCP server from the catalog."""ifnotMCPCatalogManagerornotmcp_manager:raiseHTTPException(status_code=501,detail="MCP system not available")try:catalog_manager=MCPCatalogManager()server_info=catalog_manager.get_server_info(server_name)ifnotserver_info:raiseHTTPException(status_code=404,detail=f"MCP server '{server_name}' not found in catalog")# Create MCPServer object from catalog infoserver=MCPServer(name=server_name,description=server_info.get("description",""),command=server_info.get("command",""),args=server_info.get("args",[]),env=server_info.get("env",{}),transport=server_info.get("transport","stdio"),url=server_info.get("url"))# Add to MCP managermcp_manager.add_server(server_name,server)# Mark as installed in catalogcatalog_manager.mark_server_installed(server_name,True)return{"success":True,"message":f"MCP server '{server_name}' installed from catalog successfully."}exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))
[docs]@router.get("/mcp/catalog/server/{server_name}")asyncdefget_catalog_server_info(server_name:str):"""Get detailed information about a specific server from the catalog."""ifnotMCPCatalogManager:raiseHTTPException(status_code=501,detail="MCP Catalog not available")try:catalog_manager=MCPCatalogManager()server_info=catalog_manager.get_server_info(server_name)ifnotserver_info:raiseHTTPException(status_code=404,detail=f"MCP server '{server_name}' not found in catalog")return{"success":True,"data":server_info}exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))
[docs]@router.post("/mcp/catalog/refresh")asyncdefrefresh_catalog_status():"""Refresh the running status of all servers in the catalog."""ifnotMCPCatalogManager:raiseHTTPException(status_code=501,detail="MCP Catalog not available")try:catalog_manager=MCPCatalogManager()updated_catalog=catalog_manager.update_server_status()return{"success":True,"message":"Catalog status refreshed successfully.","data":updated_catalog}exceptExceptionase:raiseHTTPException(status_code=500,detail=str(e))